"""Evaluate the results of the example experiment."""
import argparse
import os.path as pp
from math import log2
from re import Match, Pattern
from re import compile as _compile
from statistics import median
from typing import Callable, Final, Iterable, cast
from pycommons.io.console import logger
from pycommons.io.path import Path, directory_path
from pycommons.strings.string_conv import num_to_str
from pycommons.types import type_error
from moptipy.evaluation.axis_ranger import AxisRanger
from moptipy.evaluation.base import TIME_UNIT_FES, TIME_UNIT_MILLIS
from moptipy.evaluation.end_results import EndResult
from moptipy.evaluation.end_results import from_csv as end_results_from_csv
from moptipy.evaluation.end_results import from_logs as end_results_from_logs
from moptipy.evaluation.end_results import to_csv as end_results_to_csv
from moptipy.evaluation.end_statistics import EndStatistics
from moptipy.evaluation.end_statistics import (
from_end_results as es_from_end_results,
)
from moptipy.evaluation.end_statistics import to_csv as es_to_csv
from moptipy.evaluation.tabulate_end_results import (
DEFAULT_ALGORITHM_INSTANCE_STATISTICS,
DEFAULT_ALGORITHM_SUMMARY_STATISTICS,
command_column_namer,
tabulate_end_results,
)
from moptipy.evaluation.tabulate_result_tests import tabulate_result_tests
from moptipy.examples.jssp.experiment import ALGORITHMS, INSTANCES
from moptipy.examples.jssp.instance import Instance
from moptipy.examples.jssp.plots import (
plot_end_makespans,
plot_end_makespans_over_param,
plot_progresses,
plot_stat_gantt_charts,
)
from moptipy.spaces.permutations import Permutations
from moptipy.utils.help import moptipy_argparser
from moptipy.utils.lang import EN
from moptipy.utils.logger import sanitize_name
from moptipy.utils.strings import (
beautify_float_str,
name_str_to_num,
)
#: The letter mu
LETTER_M: Final[str] = "\u03BC"
#: The letter lambda
LETTER_L: Final[str] = "\u03BB"
#: the pre-defined instance sort keys
__INST_SORT_KEYS: Final[dict[str, int]] = {
__n: __i for __i, __n in enumerate(INSTANCES)
}
#: the name of the mu+1 EA
NAME_EA_MU_PLUS_1: Final[str] = f"{LETTER_M}+1_ea"
#: the name of the mu+mu EA
NAME_EA_MU_PLUS_MU: Final[str] = f"{LETTER_M}+{LETTER_M}_ea"
#: the name of the mu+ld(mu) EA
NAME_EA_MU_PLUS_LOG_MU: Final[str] = f"{LETTER_M}+log\u2082{LETTER_M}_ea"
#: the name of the mu+sqrt(mu) EA
NAME_EA_MU_PLUS_SQRT_MU: Final[str] = f"{LETTER_M}+\u221A{LETTER_M}_ea"
def __make_all_names() -> tuple[str, ...]:
"""
Create an immutable list of all algorithm names.
:returns: the tuple with all algorithm names
"""
inst = Instance.from_resource("demo")
space = Permutations.with_repetitions(inst.jobs, inst.machines)
return tuple([str(a(inst, space)) for a in ALGORITHMS])
#: the list of all algorithm names from the experiment
ALL_NAMES: Final[tuple[str, ...]] = __make_all_names()
del __make_all_names
[docs]
def ea_family(name: str) -> str:
"""
Name the evolutionary algorithm setup.
:param name: the full name
:returns: the short name of the family
"""
if name.startswith("ea_"):
ss: Final[list[str]] = name.split("_")
ll: Final[int] = len(ss)
if ll == 4:
lambda_ = int(ss[2])
if lambda_ == 1:
return NAME_EA_MU_PLUS_1
mu = int(ss[1])
if lambda_ == int(log2(mu)):
return NAME_EA_MU_PLUS_LOG_MU
if lambda_ == round(mu ** 0.5):
return NAME_EA_MU_PLUS_SQRT_MU
ratio = lambda_ // mu
if ratio * mu == lambda_:
if ratio == 1:
return NAME_EA_MU_PLUS_MU
return f"{LETTER_M}+{ratio}{LETTER_M}_ea"
elif (ll == 6) and (ss[-2] == "gap") and (ss[-1] == "swap2"):
return f"{ss[1]}+{ss[2]}_ea_br"
raise ValueError(f"Invalid EA name {name!r}.")
[docs]
def sa_family(name: str) -> str:
"""
Name the simulated annealing setup.
:param name: the full name
:returns: the short name of the family
"""
if name.startswith("sa_") and name.endswith("_swap2"):
return (f"sa_T\u2080_"
f"{beautify_float_str(name_str_to_num(name.split('_')[2]))}")
raise ValueError(f"Invalid SA name {name!r}.")
[docs]
def ma_family(name: str) -> str:
"""
Compute the Memetic Algorithm family without the ls steps.
:param name: the algorithm name
:returns: the short name of the family
"""
ss: Final[list[str]] = name.split("_")
s = "rls" if name.startswith("marls") else "sa"
return f"{ss[1]}+{ss[2]}_ma{s}"
def __make_algo_names() -> tuple[dict[str, int], dict[str, str]]:
"""
Create the algorithm sort keys and name decode.
:returns: the algorithm sort keys and name decode
"""
names: list[str] = list(ALL_NAMES)
names_new = list(ALL_NAMES)
def __eacr(mtch: Match) -> str:
"""Transform the EA name with crossover."""
br = name_str_to_num(mtch.group(3))
if not 0.0 < br < 1.0:
raise ValueError(f"{mtch} has invalid br={br}.")
a = log2(br)
b = int(a)
c = log2(1.0 - br)
d = int(c)
q = f"(1-2^{d})" if abs(c - d) < abs(a - b) else f"2^{b}"
q2 = num_to_str(br)
if len(q2) <= (len(q) + 1):
q = q2
return f"{mtch.group(1)}+{mtch.group(2)}_ea_{q}"
def __sa(mtch: Match) -> str:
"""Transform the SA name."""
if mtch.group(0) == "sa_exp16_1em6_swap2":
return "sa"
return f"sa_{mtch.group(1)}_{mtch.group(2)}e-{mtch.group(3)}"
def __ma(mtch: Match, suffix: str) -> str:
"""Transform the MA name."""
if mtch.group(0) == "ma_2_2_1048576_gap_sa_exp16_5d1em6_swap2":
return "2_masa_20"
if mtch.group(0) == "marls_8_8_16_gap_swap2":
return "8_marls_4"
ls = name_str_to_num(mtch.group(3))
if not 0 < ls < 1_000_000_000:
raise ValueError(f"{mtch} has invalid ls={ls}.")
e2 = int(log2(ls))
if ls != (2 ** e2):
raise ValueError(
f"{mtch} has invalid ls={ls} since {e2}=log2({ls}).")
e2s = f"2^{e2}"
lss = str(ls)
if len(lss) <= len(e2s):
e2s = lss
return f"{mtch.group(1)}+{mtch.group(2)}_ma{suffix}_{e2s}"
# fix some names using regular expressions
namer: dict[str, str] = {}
used_names: set[str] = set(names)
for pattern, repl in [("hc_swap2", "hc"), ("hc_swapn", "hcn"),
("rls_swap2", "rls"), ("rls_swapn", "rlsn")]:
re: Pattern = _compile(pattern)
found = False
for s in names:
m: Match = re.match(s)
if m is not None:
ns = re.sub(cast(str | Callable[[Match], str], repl), s)
if (ns is None) or (len(ns) <= 0):
raise ValueError(f"{s!r} -> {ns!r}?")
if ns == s:
continue
found = True
os = namer.get(s)
if os is not None:
if os == ns:
continue
raise ValueError(f"{s!r} -> {ns!r}, {os!r}?")
if ns in used_names:
raise ValueError(f"Already got {ns!r}.")
namer[s] = ns
names_new.insert(names_new.index(s), ns)
if not found:
raise ValueError(f"did not find {pattern!r}.")
return {__n: __i for __i, __n in enumerate(names_new)}, namer
#: the pre-defined algorithm sort keys and name map
__ALGO_SORT_KEYS, __ALGO_NAME_MAP = __make_algo_names()
[docs]
def instance_sort_key(name: str) -> int:
"""
Get the instance sort key for a given instance name.
:param name: the instance name
:returns: the sort key
"""
if not isinstance(name, str):
raise type_error(name, "name", str)
if not name:
raise ValueError("name must not be empty.")
if name in __INST_SORT_KEYS:
return __INST_SORT_KEYS[name]
return 1000
[docs]
def algorithm_sort_key(name: str) -> int:
"""
Get the algorithm sort key for a given algorithm name.
:param name: the algorithm name
:returns: the sort key
"""
if not isinstance(name, str):
raise type_error(name, "name", str)
if not name:
raise ValueError("name must not be empty.")
if name in __ALGO_SORT_KEYS:
return __ALGO_SORT_KEYS[name]
return 1000
[docs]
def algorithm_namer(name: str) -> str:
"""
Rename algorithm setups.
:param name: the algorithm's original name
:returns: the new name of the algorithm
"""
if not isinstance(name, str):
raise type_error(name, "name", str)
if not name:
raise ValueError("name must not be empty.")
if name in __ALGO_NAME_MAP:
return __ALGO_NAME_MAP[name]
return name
[docs]
def compute_end_results(results_dir: str,
dest_dir: str) -> Path:
"""
Get the end results, compute them if necessary.
:param results_dir: the results directory
:param dest_dir: the destination directory
:returns: the path to the end results file.
"""
dest: Final[Path] = directory_path(dest_dir)
results_file: Final[Path] = dest.resolve_inside("end_results.txt")
if results_file.is_file():
return results_file
results: Final[list[EndResult]] = []
source: Final[Path] = directory_path(results_dir)
logger(f"loading end results from path {source!r}.")
end_results_from_logs(source, results.append)
if not results:
raise ValueError(f"Could not find any logs in {source!r}.")
results.sort()
logger(f"found {len(results)} log files in path {source!r}, storing "
f"them in file {results_file!r}.")
rf: Path = end_results_to_csv(results, results_file)
if rf != results_file:
raise ValueError(
f"results file should be {results_file!r}, but is {rf!r}.")
results_file.enforce_file()
logger(f"finished writing file {results_file!r}.")
return results_file
[docs]
def get_end_results(
file: str,
insts: None | set[str] | Callable[[str], bool] = None,
algos: None | set[str] | Callable[[str], bool] = None) \
-> list[EndResult]:
"""
Get a specific set of end results.
:param file: the end results file
:param insts: only these instances will be included if this parameter is
provided
:param algos: only these algorithms will be included if this parameter is
provided
"""
def __filter(er: EndResult,
ins=None if insts is None else
insts.__contains__ if isinstance(insts, set)
else insts,
alg=None if algos is None else
algos.__contains__ if isinstance(algos, set)
else algos) -> bool:
if (ins is not None) and (not ins(er.instance)):
return False
return not ((alg is not None) and (not alg(er.algorithm)))
col: Final[list[EndResult]] = list(end_results_from_csv(
file=file, filterer=__filter))
if len(col) <= 0:
raise ValueError(
f"no end results for instances {insts} and algorithms {algos}.")
return col
[docs]
def compute_end_statistics(end_results_file: str,
dest_dir: str) -> Path:
"""
Get the end result statistics, compute them if necessary.
:param end_results_file: the end results file
:param dest_dir: the destination directory
:returns: the path to the end result statistics file.
"""
dest: Final[Path] = directory_path(dest_dir)
stats_file: Final[Path] = dest.resolve_inside("end_statistics.txt")
if stats_file.is_file():
return stats_file
results: Final[list[EndResult]] = get_end_results(end_results_file)
if len(results) <= 0:
raise ValueError("end results cannot be empty")
stats: Final[list[EndStatistics]] = []
es_from_end_results(results, stats.append)
if len(stats) <= 0:
raise ValueError("end result statistics cannot be empty")
stats.sort()
sf: Path = es_to_csv(stats, stats_file)
if sf != stats_file:
raise ValueError(f"stats file should be {stats_file!r} but is {sf!r}")
stats_file.enforce_file()
logger(f"finished writing file {stats_file!r}.")
return stats_file
[docs]
def table(end_results: Path, algos: list[str], dest: Path,
swap_stats: Iterable[tuple[str, str]] | None = None) -> None:
"""
Tabulate the end results.
:param end_results: the path to the end results
:param algos: the algorithms
:param dest: the directory
:param swap_stats: the statistics to swap out
"""
EN.set_current()
n: Final[str] = algorithm_namer(algos[0])
algo_inst_stat: list | tuple = DEFAULT_ALGORITHM_INSTANCE_STATISTICS
algo_sum_stat: list | tuple = DEFAULT_ALGORITHM_SUMMARY_STATISTICS
if swap_stats is not None:
algo_inst_stat = list(algo_inst_stat)
algo_sum_stat = list(algo_sum_stat)
for old, swap in swap_stats:
found: bool = False
for lst in [algo_inst_stat, algo_sum_stat]:
for idx, elem in enumerate(lst):
if elem == old:
found = True
lst[idx] = swap
break
if not found:
raise ValueError(f"did not find {old!r} in {str(lst)!r}.")
tabulate_end_results(
end_results=get_end_results(end_results, algos=set(algos)),
file_name=sanitize_name(f"end_results_{n}"), dir_name=dest,
instance_sort_key=instance_sort_key,
algorithm_sort_key=algorithm_sort_key,
col_namer=command_column_namer,
algorithm_namer=algorithm_namer,
algorithm_instance_statistics=algo_inst_stat,
algorithm_summary_statistics=algo_sum_stat,
use_lang=False)
[docs]
def tests(end_results: Path, algos: list[str], dest: Path) -> None:
"""
Tabulate the end result tests.
:param end_results: the path to the end results
:param algos: the algorithms
:param dest: the directory
"""
EN.set_current()
n: Final[str] = algorithm_namer(algos[0])
tabulate_result_tests(
end_results=get_end_results(end_results, algos=set(algos)),
file_name=sanitize_name(f"tests_{n}"), dir_name=dest,
instance_sort_key=instance_sort_key,
algorithm_sort_key=algorithm_sort_key,
algorithm_namer=algorithm_namer,
use_lang=False)
[docs]
def makespans(end_results: Path, algos: list[str], dest: Path,
x_label_location: float = 1.0) -> None:
"""
Plot the end makespans.
:param end_results: the path to the end results
:param algos: the algorithms
:param dest: the directory
:param x_label_location: the location of the label of the x-axis
"""
n: Final[str] = algorithm_namer(algos[0])
plot_end_makespans(
end_results=get_end_results(end_results, algos=set(algos)),
name_base=sanitize_name(f"makespan_scaled_{n}"), dest_dir=dest,
instance_sort_key=instance_sort_key,
algorithm_sort_key=algorithm_sort_key,
algorithm_namer=algorithm_namer,
x_label_location=x_label_location)
[docs]
def gantt(end_results: Path, algo: str, dest: Path, source: Path,
best: bool = False,
insts: Iterable[str] | None = None) -> None:
"""
Plot the median Gantt charts.
:param end_results: the path to the end results
:param algo: the algorithm
:param dest: the directory
:param source: the source directory
:param best: should we plot the best instance only (or, otherwise, the
median)
:param insts: the instances to use
"""
n: Final[str] = algorithm_namer(algo)
plot_stat_gantt_charts(
get_end_results(end_results, algos={algo},
insts=None if insts is None else set(insts)),
name_base=sanitize_name(f"best_gantt_{n}" if best else f"gantt_{n}"),
dest_dir=dest,
results_dir=source,
instance_sort_key=instance_sort_key,
statistic=cast(
Callable[[Iterable[int | float]], int | float],
min if best else median))
[docs]
def progress(algos: list[str], dest: Path, source: Path,
log: bool = True, millis: bool = True) -> None:
"""
Plot the median Gantt charts.
:param algos: the algorithms
:param dest: the directory
:param source: the source directory
:param log: is the time logarithmically scaled?
:param millis: is the time measured in milliseconds?
"""
n: str = f"progress_{algorithm_namer(algos[0])}_"
if log:
n = n + "log_"
if millis:
unit = TIME_UNIT_MILLIS
n = n + "T"
else:
unit = TIME_UNIT_FES
n = n + "FEs"
plot_progresses(results_dir=source,
algorithms=algos,
name_base=n,
dest_dir=dest,
log_time=log,
time_unit=unit,
algorithm_namer=algorithm_namer,
instance_sort_key=instance_sort_key,
algorithm_sort_key=algorithm_sort_key)
[docs]
def makespans_over_param(
end_results: Path,
selector: Callable[[str], bool],
x_getter: Callable[[EndStatistics], int | float],
name_base: str,
dest_dir: str,
x_axis: AxisRanger | Callable[[], AxisRanger]
= AxisRanger,
x_label: str | None = None,
algo_getter: Callable[[str], str] | None = None,
title: str | None = None,
legend_pos: str = "right",
title_x: float = 0.5,
y_label_location: float = 1.0) -> list[Path]:
"""
Plot the performance over a parameter.
:param end_results: the end results path
:param selector: the selector for algorithms
:param name_base: the basic name
:param dest_dir: the destination directory
:param x_getter: the function computing the x-value for each statistics
object
:param x_axis: the axis ranger
:param x_label: the x-axis label
:param algo_getter: the optional algorithm name getter (use `name_base` if
unspecified)
:param title: the optional title (use `name_base` if unspecified)
:param legend_pos: the legend position, set to "right"
:param title_x: the title x
:param y_label_location: the y label location
:returns: the list of generated files
"""
def _algo_name_getter(es: EndStatistics,
n=name_base, g=algo_getter) -> str:
return n if g is None else g(es.algorithm)
return plot_end_makespans_over_param(
end_results=get_end_results(end_results, algos=selector),
x_getter=x_getter, name_base=sanitize_name(f"{name_base}_results"),
dest_dir=dest_dir, title=name_base if title is None else title,
algorithm_getter=_algo_name_getter, # type: ignore
instance_sort_key=instance_sort_key,
algorithm_sort_key=algorithm_sort_key,
x_axis=x_axis, x_label=x_label,
plot_single_instances=algo_getter is None,
legend_pos=legend_pos, title_x=title_x,
y_label_location=y_label_location)
[docs]
def evaluate_experiment(results_dir: str = pp.join(".", "results"),
dest_dir: str | None = None) -> None:
"""
Evaluate the experiment.
:param results_dir: the results directory
:param dest_dir: the destination directory
"""
source: Final[Path] = directory_path(results_dir)
dest: Final[Path] = Path(dest_dir if dest_dir else
pp.join(source, "..", "evaluation"))
dest.ensure_dir_exists()
logger(f"Beginning evaluation from {source!r} to {dest!r}.")
end_results: Final[Path] = compute_end_results(source, dest)
if not end_results:
raise ValueError("End results path is empty??")
end_stats: Final[Path] = compute_end_statistics(end_results, dest)
if not end_stats:
raise ValueError("End stats path is empty??")
logger("Now evaluating the single random sampling algorithm `1rs`.")
table(end_results, ["1rs"], dest)
makespans(end_results, ["1rs"], dest)
gantt(end_results, "1rs", dest, source)
logger("Now evaluating the multi-random sampling algorithm `rs`.")
table(end_results, ["rs", "1rs"], dest)
makespans(end_results, ["rs", "1rs"], dest)
gantt(end_results, "rs", dest, source)
progress(["rs"], dest, source)
progress(["rs"], dest, source, log=False)
logger("Now evaluating the hill climbing algorithm `hc`.")
table(end_results, ["hc_swap2", "rs"], dest)
makespans(end_results, ["hc_swap2", "rs"], dest)
gantt(end_results, "hc_swap2", dest, source)
progress(["hc_swap2", "rs"], dest, source)
progress(["hc_swap2", "rs"], dest, source, millis=False)
logger(f"Finished evaluation from {source!r} to {dest!r}.")
# Evaluate experiment if run as script
if __name__ == "__main__":
parser: Final[argparse.ArgumentParser] = moptipy_argparser(
__file__, "Evaluate the results of the JSSP example experiment",
"This experiment evaluates all the results of the JSSP example"
" experiment and creates the figures and tables of the "
"'Optimization Algorithms' book (see "
"http://thomasweise.github.io/oa).")
parser.add_argument(
"source", nargs="?", default="./results", type=Path,
help="the directory with the results of the JSSP experiment")
parser.add_argument(
"dest", type=Path, nargs="?", default="./evaluation/",
help="the directory to write the evaluation results to")
args: Final[argparse.Namespace] = parser.parse_args()
evaluate_experiment(results_dir=args.source, dest_dir=args.dest)