Coverage for moptipy / examples / jssp / evaluation.py: 31%
281 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 03:05 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 03:05 +0000
1"""Evaluate the results of the example experiment."""
2import argparse
3import os.path as pp
4from math import log2
5from re import Match, Pattern
6from re import compile as _compile
7from statistics import median
8from typing import Callable, Final, Iterable, cast
10from pycommons.io.console import logger
11from pycommons.io.path import Path, directory_path
12from pycommons.strings.string_conv import num_to_str
13from pycommons.types import type_error
15from moptipy.evaluation.axis_ranger import AxisRanger
16from moptipy.evaluation.base import TIME_UNIT_FES, TIME_UNIT_MILLIS
17from moptipy.evaluation.end_results import EndResult
18from moptipy.evaluation.end_results import from_csv as end_results_from_csv
19from moptipy.evaluation.end_results import from_logs as end_results_from_logs
20from moptipy.evaluation.end_results import to_csv as end_results_to_csv
21from moptipy.evaluation.end_statistics import EndStatistics
22from moptipy.evaluation.end_statistics import (
23 from_end_results as es_from_end_results,
24)
25from moptipy.evaluation.end_statistics import to_csv as es_to_csv
26from moptipy.evaluation.tabulate_end_results import (
27 DEFAULT_ALGORITHM_INSTANCE_STATISTICS,
28 DEFAULT_ALGORITHM_SUMMARY_STATISTICS,
29 command_column_namer,
30 tabulate_end_results,
31)
32from moptipy.evaluation.tabulate_result_tests import tabulate_result_tests
33from moptipy.examples.jssp.experiment import ALGORITHMS, INSTANCES
34from moptipy.examples.jssp.instance import Instance
35from moptipy.examples.jssp.plots import (
36 plot_end_makespans,
37 plot_end_makespans_over_param,
38 plot_progresses,
39 plot_stat_gantt_charts,
40)
41from moptipy.spaces.permutations import Permutations
42from moptipy.utils.help import moptipy_argparser
43from moptipy.utils.lang import EN
44from moptipy.utils.logger import sanitize_name
45from moptipy.utils.strings import (
46 beautify_float_str,
47 name_str_to_num,
48)
50#: The letter mu
51LETTER_M: Final[str] = "\u03BC"
52#: The letter lambda
53LETTER_L: Final[str] = "\u03BB"
55#: the pre-defined instance sort keys
56__INST_SORT_KEYS: Final[dict[str, int]] = {
57 __n: __i for __i, __n in enumerate(INSTANCES) # noqa: RUF052
58}
60#: the name of the mu+1 EA
61NAME_EA_MU_PLUS_1: Final[str] = f"{LETTER_M}+1_ea"
62#: the name of the mu+mu EA
63NAME_EA_MU_PLUS_MU: Final[str] = f"{LETTER_M}+{LETTER_M}_ea"
64#: the name of the mu+ld(mu) EA
65NAME_EA_MU_PLUS_LOG_MU: Final[str] = f"{LETTER_M}+log\u2082{LETTER_M}_ea"
66#: the name of the mu+sqrt(mu) EA
67NAME_EA_MU_PLUS_SQRT_MU: Final[str] = f"{LETTER_M}+\u221A{LETTER_M}_ea"
70def __make_all_names() -> tuple[str, ...]:
71 """
72 Create an immutable list of all algorithm names.
74 :returns: the tuple with all algorithm names
75 """
76 inst = Instance.from_resource("demo")
77 space = Permutations.with_repetitions(inst.jobs, inst.machines)
78 return tuple(str(a(inst, space)) for a in ALGORITHMS)
81#: the list of all algorithm names from the experiment
82ALL_NAMES: Final[tuple[str, ...]] = __make_all_names()
83del __make_all_names
86def ea_family(name: str) -> str:
87 """
88 Name the evolutionary algorithm setup.
90 :param name: the full name
91 :returns: the short name of the family
92 """
93 if name.startswith("ea_"):
94 ss: Final[list[str]] = name.split("_")
95 ll: Final[int] = len(ss)
96 if ll == 4:
97 lambda_ = int(ss[2])
98 if lambda_ == 1:
99 return NAME_EA_MU_PLUS_1
100 mu = int(ss[1])
101 if lambda_ == int(log2(mu)):
102 return NAME_EA_MU_PLUS_LOG_MU
103 if lambda_ == round(mu ** 0.5):
104 return NAME_EA_MU_PLUS_SQRT_MU
105 ratio = lambda_ // mu
106 if ratio * mu == lambda_:
107 if ratio == 1:
108 return NAME_EA_MU_PLUS_MU
109 return f"{LETTER_M}+{ratio}{LETTER_M}_ea"
110 elif (ll == 6) and (ss[-2] == "gap") and (ss[-1] == "swap2"):
111 return f"{ss[1]}+{ss[2]}_ea_br"
112 raise ValueError(f"Invalid EA name {name!r}.")
115def sa_family(name: str) -> str:
116 """
117 Name the simulated annealing setup.
119 :param name: the full name
120 :returns: the short name of the family
121 """
122 if name.startswith("sa_") and name.endswith("_swap2"):
123 return (f"sa_T\u2080_"
124 f"{beautify_float_str(name_str_to_num(name.split('_')[2]))}")
125 raise ValueError(f"Invalid SA name {name!r}.")
128def ma_family(name: str) -> str:
129 """
130 Compute the Memetic Algorithm family without the ls steps.
132 :param name: the algorithm name
133 :returns: the short name of the family
134 """
135 ss: Final[list[str]] = name.split("_")
136 s = "rls" if name.startswith("marls") else "sa"
137 return f"{ss[1]}+{ss[2]}_ma{s}"
140def __make_algo_names() -> tuple[dict[str, int], dict[str, str]]:
141 """
142 Create the algorithm sort keys and name decode.
144 :returns: the algorithm sort keys and name decode
145 """
146 names: list[str] = list(ALL_NAMES)
147 names_new = list(ALL_NAMES)
149 def __eacr(mtch: Match) -> str:
150 """Transform the EA name with crossover."""
151 br = name_str_to_num(mtch.group(3))
152 if not 0.0 < br < 1.0:
153 raise ValueError(f"{mtch} has invalid br={br}.")
154 a = log2(br)
155 b = int(a)
156 c = log2(1.0 - br)
157 d = int(c)
158 q = f"(1-2^{d})" if abs(c - d) < abs(a - b) else f"2^{b}"
159 q2 = num_to_str(br)
160 if len(q2) <= (len(q) + 1):
161 q = q2
162 return f"{mtch.group(1)}+{mtch.group(2)}_ea_{q}"
164 def __sa(mtch: Match) -> str:
165 """Transform the SA name."""
166 if mtch.group(0) == "sa_exp16_1em6_swap2":
167 return "sa"
168 return f"sa_{mtch.group(1)}_{mtch.group(2)}e-{mtch.group(3)}"
170 def __ma(mtch: Match, suffix: str) -> str:
171 """Transform the MA name."""
172 if mtch.group(0) == "ma_2_2_1048576_gap_sa_exp16_5d1em6_swap2":
173 return "2_masa_20"
174 if mtch.group(0) == "marls_8_8_16_gap_swap2":
175 return "8_marls_4"
176 ls = name_str_to_num(mtch.group(3))
177 if not 0 < ls < 1_000_000_000:
178 raise ValueError(f"{mtch} has invalid ls={ls}.")
179 e2 = int(log2(ls))
180 if ls != (2 ** e2):
181 raise ValueError(
182 f"{mtch} has invalid ls={ls} since {e2}=log2({ls}).")
183 e2s = f"2^{e2}"
184 lss = str(ls)
185 if len(lss) <= len(e2s):
186 e2s = lss
187 return f"{mtch.group(1)}+{mtch.group(2)}_ma{suffix}_{e2s}"
189 # fix some names using regular expressions
190 namer: dict[str, str] = {}
191 used_names: set[str] = set(names)
192 for pattern, repl in [("hc_swap2", "hc"), ("hc_swapn", "hcn"),
193 ("rls_swap2", "rls"), ("rls_swapn", "rlsn")]:
194 re: Pattern = _compile(pattern)
195 found = False
196 for s in names:
197 m: Match = re.match(s)
198 if m is not None:
199 ns = re.sub(cast("str | Callable[[Match], str]", repl), s)
200 if (ns is None) or (len(ns) <= 0):
201 raise ValueError(f"{s!r} -> {ns!r}?")
202 if ns == s:
203 continue
204 found = True
205 os = namer.get(s)
206 if os is not None:
207 if os == ns:
208 continue
209 raise ValueError(f"{s!r} -> {ns!r}, {os!r}?")
210 if ns in used_names:
211 raise ValueError(f"Already got {ns!r}.")
212 namer[s] = ns
213 names_new.insert(names_new.index(s), ns)
214 if not found:
215 raise ValueError(f"did not find {pattern!r}.")
217 return {__n: __i for __i, __n in enumerate( # noqa: RUF052
218 names_new)}, namer # noqa: RUF052
221#: the pre-defined algorithm sort keys and name map
222__ALGO_SORT_KEYS, __ALGO_NAME_MAP = __make_algo_names()
225def instance_sort_key(name: str) -> int:
226 """
227 Get the instance sort key for a given instance name.
229 :param name: the instance name
230 :returns: the sort key
231 """
232 if not isinstance(name, str):
233 raise type_error(name, "name", str)
234 if not name:
235 raise ValueError("name must not be empty.")
236 if name in __INST_SORT_KEYS:
237 return __INST_SORT_KEYS[name]
238 return 1000
241def algorithm_sort_key(name: str) -> int:
242 """
243 Get the algorithm sort key for a given algorithm name.
245 :param name: the algorithm name
246 :returns: the sort key
247 """
248 if not isinstance(name, str):
249 raise type_error(name, "name", str)
250 if not name:
251 raise ValueError("name must not be empty.")
252 if name in __ALGO_SORT_KEYS:
253 return __ALGO_SORT_KEYS[name]
254 return 1000
257def algorithm_namer(name: str) -> str:
258 """
259 Rename algorithm setups.
261 :param name: the algorithm's original name
262 :returns: the new name of the algorithm
263 """
264 if not isinstance(name, str):
265 raise type_error(name, "name", str)
266 if not name:
267 raise ValueError("name must not be empty.")
268 if name in __ALGO_NAME_MAP:
269 return __ALGO_NAME_MAP[name]
270 return name
273def compute_end_results(results_dir: str,
274 dest_dir: str) -> Path:
275 """
276 Get the end results, compute them if necessary.
278 :param results_dir: the results directory
279 :param dest_dir: the destination directory
280 :returns: the path to the end results file.
281 """
282 dest: Final[Path] = directory_path(dest_dir)
283 results_file: Final[Path] = dest.resolve_inside("end_results.txt")
284 if results_file.is_file():
285 return results_file
287 source: Final[Path] = directory_path(results_dir)
288 logger(f"loading end results from path {source!r}.")
289 results: Final[list[EndResult]] = list(end_results_from_logs(source))
290 if not results:
291 raise ValueError(f"Could not find any logs in {source!r}.")
292 results.sort()
293 logger(f"found {len(results)} log files in path {source!r}, storing "
294 f"them in file {results_file!r}.")
295 rf: Path = end_results_to_csv(results, results_file)
296 if rf != results_file:
297 raise ValueError(
298 f"results file should be {results_file!r}, but is {rf!r}.")
299 results_file.enforce_file()
300 logger(f"finished writing file {results_file!r}.")
301 return results_file
304def get_end_results(
305 file: str,
306 insts: set[str] | Callable[[str], bool] | None = None,
307 algos: set[str] | Callable[[str], bool] | None = None) \
308 -> list[EndResult]:
309 """
310 Get a specific set of end results.
312 :param file: the end results file
313 :param insts: only these instances will be included if this parameter is
314 provided
315 :param algos: only these algorithms will be included if this parameter is
316 provided
317 """
319 def __filter(er: EndResult,
320 ins=None if insts is None else
321 insts.__contains__ if isinstance(insts, set)
322 else insts,
323 alg=None if algos is None else
324 algos.__contains__ if isinstance(algos, set)
325 else algos) -> bool:
326 if (ins is not None) and (not ins(er.instance)):
327 return False
328 return not ((alg is not None) and (not alg(er.algorithm)))
330 col: Final[list[EndResult]] = list(end_results_from_csv(
331 file=file, filterer=__filter))
332 if len(col) <= 0:
333 raise ValueError(
334 f"no end results for instances {insts} and algorithms {algos}.")
335 return col
338def compute_end_statistics(end_results_file: str,
339 dest_dir: str) -> Path:
340 """
341 Get the end result statistics, compute them if necessary.
343 :param end_results_file: the end results file
344 :param dest_dir: the destination directory
345 :returns: the path to the end result statistics file.
346 """
347 dest: Final[Path] = directory_path(dest_dir)
348 stats_file: Final[Path] = dest.resolve_inside("end_statistics.txt")
349 if stats_file.is_file():
350 return stats_file
352 results: Final[list[EndResult]] = get_end_results(end_results_file)
353 if len(results) <= 0:
354 raise ValueError("end results cannot be empty")
355 stats: Final[list[EndStatistics]] = list(es_from_end_results(results))
356 if len(stats) <= 0:
357 raise ValueError("end result statistics cannot be empty")
358 stats.sort()
360 sf: Path = es_to_csv(stats, stats_file)
361 if sf != stats_file:
362 raise ValueError(f"stats file should be {stats_file!r} but is {sf!r}")
363 stats_file.enforce_file()
364 logger(f"finished writing file {stats_file!r}.")
365 return stats_file
368def table(end_results: Path, algos: list[str], dest: Path,
369 swap_stats: Iterable[tuple[str, str]] | None = None) -> None:
370 """
371 Tabulate the end results.
373 :param end_results: the path to the end results
374 :param algos: the algorithms
375 :param dest: the directory
376 :param swap_stats: the statistics to swap out
377 """
378 EN.set_current()
379 n: Final[str] = algorithm_namer(algos[0])
380 algo_inst_stat: list | tuple = DEFAULT_ALGORITHM_INSTANCE_STATISTICS
381 algo_sum_stat: list | tuple = DEFAULT_ALGORITHM_SUMMARY_STATISTICS
382 if swap_stats is not None:
383 algo_inst_stat = list(algo_inst_stat)
384 algo_sum_stat = list(algo_sum_stat)
385 for old, swap in swap_stats:
386 found: bool = False
387 for lst in [algo_inst_stat, algo_sum_stat]:
388 for idx, elem in enumerate(lst):
389 if elem == old:
390 found = True
391 lst[idx] = swap
392 break
393 if not found:
394 raise ValueError(f"did not find {old!r} in {str(lst)!r}.")
396 tabulate_end_results(
397 end_results=get_end_results(end_results, algos=set(algos)),
398 file_name=sanitize_name(f"end_results_{n}"), dir_name=dest,
399 instance_sort_key=instance_sort_key,
400 algorithm_sort_key=algorithm_sort_key,
401 col_namer=command_column_namer,
402 algorithm_namer=algorithm_namer,
403 algorithm_instance_statistics=algo_inst_stat,
404 algorithm_summary_statistics=algo_sum_stat,
405 use_lang=False)
408def tests(end_results: Path, algos: list[str], dest: Path) -> None:
409 """
410 Tabulate the end result tests.
412 :param end_results: the path to the end results
413 :param algos: the algorithms
414 :param dest: the directory
415 """
416 EN.set_current()
417 n: Final[str] = algorithm_namer(algos[0])
418 tabulate_result_tests(
419 end_results=get_end_results(end_results, algos=set(algos)),
420 file_name=sanitize_name(f"tests_{n}"), dir_name=dest,
421 instance_sort_key=instance_sort_key,
422 algorithm_sort_key=algorithm_sort_key,
423 algorithm_namer=algorithm_namer,
424 use_lang=False)
427def makespans(end_results: Path, algos: list[str], dest: Path,
428 x_label_location: float = 1.0) -> None:
429 """
430 Plot the end makespans.
432 :param end_results: the path to the end results
433 :param algos: the algorithms
434 :param dest: the directory
435 :param x_label_location: the location of the label of the x-axis
436 """
437 n: Final[str] = algorithm_namer(algos[0])
438 plot_end_makespans(
439 end_results=get_end_results(end_results, algos=set(algos)),
440 name_base=sanitize_name(f"makespan_scaled_{n}"), dest_dir=dest,
441 instance_sort_key=instance_sort_key,
442 algorithm_sort_key=algorithm_sort_key,
443 algorithm_namer=algorithm_namer,
444 x_label_location=x_label_location)
447def gantt(end_results: Path, algo: str, dest: Path, source: Path,
448 best: bool = False,
449 insts: Iterable[str] | None = None) -> None:
450 """
451 Plot the median Gantt charts.
453 :param end_results: the path to the end results
454 :param algo: the algorithm
455 :param dest: the directory
456 :param source: the source directory
457 :param best: should we plot the best instance only (or, otherwise, the
458 median)
459 :param insts: the instances to use
460 """
461 n: Final[str] = algorithm_namer(algo)
462 plot_stat_gantt_charts(
463 get_end_results(end_results, algos={algo},
464 insts=None if insts is None else set(insts)),
465 name_base=sanitize_name(f"best_gantt_{n}" if best else f"gantt_{n}"),
466 dest_dir=dest,
467 results_dir=source,
468 instance_sort_key=instance_sort_key,
469 statistic=cast(
470 "Callable[[Iterable[int | float]], int | float]",
471 min if best else median))
474def progress(algos: list[str], dest: Path, source: Path,
475 log: bool = True, millis: bool = True) -> None:
476 """
477 Plot the median Gantt charts.
479 :param algos: the algorithms
480 :param dest: the directory
481 :param source: the source directory
482 :param log: is the time logarithmically scaled?
483 :param millis: is the time measured in milliseconds?
484 """
485 n: str = f"progress_{algorithm_namer(algos[0])}_"
486 if log:
487 n += "log_"
488 if millis:
489 unit = TIME_UNIT_MILLIS
490 n += "T"
491 else:
492 unit = TIME_UNIT_FES
493 n += "FEs"
494 plot_progresses(results_dir=source,
495 algorithms=algos,
496 name_base=n,
497 dest_dir=dest,
498 log_time=log,
499 time_unit=unit,
500 algorithm_namer=algorithm_namer,
501 instance_sort_key=instance_sort_key,
502 algorithm_sort_key=algorithm_sort_key)
505def makespans_over_param(
506 end_results: Path,
507 selector: Callable[[str], bool],
508 x_getter: Callable[[EndStatistics], int | float],
509 name_base: str,
510 dest_dir: str,
511 x_axis: AxisRanger | Callable[[], AxisRanger]
512 = AxisRanger,
513 x_label: str | None = None,
514 algo_getter: Callable[[str], str] | None = None,
515 title: str | None = None,
516 legend_pos: str = "right",
517 title_x: float = 0.5,
518 y_label_location: float = 1.0) -> list[Path]:
519 """
520 Plot the performance over a parameter.
522 :param end_results: the end results path
523 :param selector: the selector for algorithms
524 :param name_base: the basic name
525 :param dest_dir: the destination directory
526 :param x_getter: the function computing the x-value for each statistics
527 object
528 :param x_axis: the axis ranger
529 :param x_label: the x-axis label
530 :param algo_getter: the optional algorithm name getter (use `name_base` if
531 unspecified)
532 :param title: the optional title (use `name_base` if unspecified)
533 :param legend_pos: the legend position, set to "right"
534 :param title_x: the title x
535 :param y_label_location: the y label location
536 :returns: the list of generated files
537 """
539 def _algo_name_getter(es: EndStatistics,
540 n=name_base, g=algo_getter) -> str:
541 return n if g is None else g(es.algorithm)
543 return plot_end_makespans_over_param(
544 end_results=get_end_results(end_results, algos=selector),
545 x_getter=x_getter, name_base=sanitize_name(f"{name_base}_results"),
546 dest_dir=dest_dir, title=name_base if title is None else title,
547 algorithm_getter=_algo_name_getter, # type: ignore
548 instance_sort_key=instance_sort_key,
549 algorithm_sort_key=algorithm_sort_key,
550 x_axis=x_axis, x_label=x_label,
551 plot_single_instances=algo_getter is None,
552 legend_pos=legend_pos, title_x=title_x,
553 y_label_location=y_label_location)
556def evaluate_experiment(results_dir: str = pp.join(".", "results"),
557 dest_dir: str | None = None) -> None:
558 """
559 Evaluate the experiment.
561 :param results_dir: the results directory
562 :param dest_dir: the destination directory
563 """
564 source: Final[Path] = directory_path(results_dir)
565 dest: Final[Path] = Path(dest_dir or pp.join(source, "..", "evaluation"))
566 dest.ensure_dir_exists()
567 logger(f"Beginning evaluation from {source!r} to {dest!r}.")
569 end_results: Final[Path] = compute_end_results(source, dest)
570 if not end_results:
571 raise ValueError("End results path is empty??")
572 end_stats: Final[Path] = compute_end_statistics(end_results, dest)
573 if not end_stats:
574 raise ValueError("End stats path is empty??")
576 logger("Now evaluating the single random sampling algorithm `1rs`.")
577 table(end_results, ["1rs"], dest)
578 makespans(end_results, ["1rs"], dest)
579 gantt(end_results, "1rs", dest, source)
581 logger("Now evaluating the multi-random sampling algorithm `rs`.")
582 table(end_results, ["rs", "1rs"], dest)
583 makespans(end_results, ["rs", "1rs"], dest)
584 gantt(end_results, "rs", dest, source)
585 progress(["rs"], dest, source)
586 progress(["rs"], dest, source, log=False)
588 logger("Now evaluating the hill climbing algorithm `hc`.")
589 table(end_results, ["hc_swap2", "rs"], dest)
590 makespans(end_results, ["hc_swap2", "rs"], dest)
591 gantt(end_results, "hc_swap2", dest, source)
592 progress(["hc_swap2", "rs"], dest, source)
593 progress(["hc_swap2", "rs"], dest, source, millis=False)
595 logger(f"Finished evaluation from {source!r} to {dest!r}.")
598# Evaluate experiment if run as script
599if __name__ == "__main__":
600 parser: Final[argparse.ArgumentParser] = moptipy_argparser(
601 __file__, "Evaluate the results of the JSSP example experiment",
602 "This experiment evaluates all the results of the JSSP example"
603 " experiment and creates the figures and tables of the "
604 "'Optimization Algorithms' book (see "
605 "http://thomasweise.github.io/oa).")
606 parser.add_argument(
607 "source", nargs="?", default="./results", type=Path,
608 help="the directory with the results of the JSSP experiment")
609 parser.add_argument(
610 "dest", type=Path, nargs="?", default="./evaluation/",
611 help="the directory to write the evaluation results to")
612 args: Final[argparse.Namespace] = parser.parse_args()
613 evaluate_experiment(results_dir=args.source, dest_dir=args.dest)