Coverage for moptipy / examples / jssp / evaluation.py: 31%
281 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-24 08:49 +0000
1"""Evaluate the results of the example experiment."""
2import argparse
3import os.path as pp
4from math import log2
5from re import Match, Pattern
6from re import compile as _compile
7from statistics import median
8from typing import Callable, Final, Iterable, cast
10from pycommons.io.console import logger
11from pycommons.io.path import Path, directory_path
12from pycommons.strings.string_conv import num_to_str
13from pycommons.types import type_error
15from moptipy.evaluation.axis_ranger import AxisRanger
16from moptipy.evaluation.base import TIME_UNIT_FES, TIME_UNIT_MILLIS
17from moptipy.evaluation.end_results import EndResult
18from moptipy.evaluation.end_results import from_csv as end_results_from_csv
19from moptipy.evaluation.end_results import from_logs as end_results_from_logs
20from moptipy.evaluation.end_results import to_csv as end_results_to_csv
21from moptipy.evaluation.end_statistics import EndStatistics
22from moptipy.evaluation.end_statistics import (
23 from_end_results as es_from_end_results,
24)
25from moptipy.evaluation.end_statistics import to_csv as es_to_csv
26from moptipy.evaluation.tabulate_end_results import (
27 DEFAULT_ALGORITHM_INSTANCE_STATISTICS,
28 DEFAULT_ALGORITHM_SUMMARY_STATISTICS,
29 command_column_namer,
30 tabulate_end_results,
31)
32from moptipy.evaluation.tabulate_result_tests import tabulate_result_tests
33from moptipy.examples.jssp.experiment import ALGORITHMS, INSTANCES
34from moptipy.examples.jssp.instance import Instance
35from moptipy.examples.jssp.plots import (
36 plot_end_makespans,
37 plot_end_makespans_over_param,
38 plot_progresses,
39 plot_stat_gantt_charts,
40)
41from moptipy.spaces.permutations import Permutations
42from moptipy.utils.help import moptipy_argparser
43from moptipy.utils.lang import EN
44from moptipy.utils.logger import sanitize_name
45from moptipy.utils.strings import (
46 beautify_float_str,
47 name_str_to_num,
48)
50#: The letter mu
51LETTER_M: Final[str] = "\u03BC"
52#: The letter lambda
53LETTER_L: Final[str] = "\u03BB"
55#: the pre-defined instance sort keys
56__INST_SORT_KEYS: Final[dict[str, int]] = {
57 __n: __i for __i, __n in enumerate(INSTANCES)
58}
60#: the name of the mu+1 EA
61NAME_EA_MU_PLUS_1: Final[str] = f"{LETTER_M}+1_ea"
62#: the name of the mu+mu EA
63NAME_EA_MU_PLUS_MU: Final[str] = f"{LETTER_M}+{LETTER_M}_ea"
64#: the name of the mu+ld(mu) EA
65NAME_EA_MU_PLUS_LOG_MU: Final[str] = f"{LETTER_M}+log\u2082{LETTER_M}_ea"
66#: the name of the mu+sqrt(mu) EA
67NAME_EA_MU_PLUS_SQRT_MU: Final[str] = f"{LETTER_M}+\u221A{LETTER_M}_ea"
70def __make_all_names() -> tuple[str, ...]:
71 """
72 Create an immutable list of all algorithm names.
74 :returns: the tuple with all algorithm names
75 """
76 inst = Instance.from_resource("demo")
77 space = Permutations.with_repetitions(inst.jobs, inst.machines)
78 return tuple(str(a(inst, space)) for a in ALGORITHMS)
81#: the list of all algorithm names from the experiment
82ALL_NAMES: Final[tuple[str, ...]] = __make_all_names()
83del __make_all_names
86def ea_family(name: str) -> str:
87 """
88 Name the evolutionary algorithm setup.
90 :param name: the full name
91 :returns: the short name of the family
92 """
93 if name.startswith("ea_"):
94 ss: Final[list[str]] = name.split("_")
95 ll: Final[int] = len(ss)
96 if ll == 4:
97 lambda_ = int(ss[2])
98 if lambda_ == 1:
99 return NAME_EA_MU_PLUS_1
100 mu = int(ss[1])
101 if lambda_ == int(log2(mu)):
102 return NAME_EA_MU_PLUS_LOG_MU
103 if lambda_ == round(mu ** 0.5):
104 return NAME_EA_MU_PLUS_SQRT_MU
105 ratio = lambda_ // mu
106 if ratio * mu == lambda_:
107 if ratio == 1:
108 return NAME_EA_MU_PLUS_MU
109 return f"{LETTER_M}+{ratio}{LETTER_M}_ea"
110 elif (ll == 6) and (ss[-2] == "gap") and (ss[-1] == "swap2"):
111 return f"{ss[1]}+{ss[2]}_ea_br"
112 raise ValueError(f"Invalid EA name {name!r}.")
115def sa_family(name: str) -> str:
116 """
117 Name the simulated annealing setup.
119 :param name: the full name
120 :returns: the short name of the family
121 """
122 if name.startswith("sa_") and name.endswith("_swap2"):
123 return (f"sa_T\u2080_"
124 f"{beautify_float_str(name_str_to_num(name.split('_')[2]))}")
125 raise ValueError(f"Invalid SA name {name!r}.")
128def ma_family(name: str) -> str:
129 """
130 Compute the Memetic Algorithm family without the ls steps.
132 :param name: the algorithm name
133 :returns: the short name of the family
134 """
135 ss: Final[list[str]] = name.split("_")
136 s = "rls" if name.startswith("marls") else "sa"
137 return f"{ss[1]}+{ss[2]}_ma{s}"
140def __make_algo_names() -> tuple[dict[str, int], dict[str, str]]:
141 """
142 Create the algorithm sort keys and name decode.
144 :returns: the algorithm sort keys and name decode
145 """
146 names: list[str] = list(ALL_NAMES)
147 names_new = list(ALL_NAMES)
149 def __eacr(mtch: Match) -> str:
150 """Transform the EA name with crossover."""
151 br = name_str_to_num(mtch.group(3))
152 if not 0.0 < br < 1.0:
153 raise ValueError(f"{mtch} has invalid br={br}.")
154 a = log2(br)
155 b = int(a)
156 c = log2(1.0 - br)
157 d = int(c)
158 q = f"(1-2^{d})" if abs(c - d) < abs(a - b) else f"2^{b}"
159 q2 = num_to_str(br)
160 if len(q2) <= (len(q) + 1):
161 q = q2
162 return f"{mtch.group(1)}+{mtch.group(2)}_ea_{q}"
164 def __sa(mtch: Match) -> str:
165 """Transform the SA name."""
166 if mtch.group(0) == "sa_exp16_1em6_swap2":
167 return "sa"
168 return f"sa_{mtch.group(1)}_{mtch.group(2)}e-{mtch.group(3)}"
170 def __ma(mtch: Match, suffix: str) -> str:
171 """Transform the MA name."""
172 if mtch.group(0) == "ma_2_2_1048576_gap_sa_exp16_5d1em6_swap2":
173 return "2_masa_20"
174 if mtch.group(0) == "marls_8_8_16_gap_swap2":
175 return "8_marls_4"
176 ls = name_str_to_num(mtch.group(3))
177 if not 0 < ls < 1_000_000_000:
178 raise ValueError(f"{mtch} has invalid ls={ls}.")
179 e2 = int(log2(ls))
180 if ls != (2 ** e2):
181 raise ValueError(
182 f"{mtch} has invalid ls={ls} since {e2}=log2({ls}).")
183 e2s = f"2^{e2}"
184 lss = str(ls)
185 if len(lss) <= len(e2s):
186 e2s = lss
187 return f"{mtch.group(1)}+{mtch.group(2)}_ma{suffix}_{e2s}"
189 # fix some names using regular expressions
190 namer: dict[str, str] = {}
191 used_names: set[str] = set(names)
192 for pattern, repl in [("hc_swap2", "hc"), ("hc_swapn", "hcn"),
193 ("rls_swap2", "rls"), ("rls_swapn", "rlsn")]:
194 re: Pattern = _compile(pattern)
195 found = False
196 for s in names:
197 m: Match = re.match(s)
198 if m is not None:
199 ns = re.sub(cast("str | Callable[[Match], str]", repl), s)
200 if (ns is None) or (len(ns) <= 0):
201 raise ValueError(f"{s!r} -> {ns!r}?")
202 if ns == s:
203 continue
204 found = True
205 os = namer.get(s)
206 if os is not None:
207 if os == ns:
208 continue
209 raise ValueError(f"{s!r} -> {ns!r}, {os!r}?")
210 if ns in used_names:
211 raise ValueError(f"Already got {ns!r}.")
212 namer[s] = ns
213 names_new.insert(names_new.index(s), ns)
214 if not found:
215 raise ValueError(f"did not find {pattern!r}.")
217 return {__n: __i for __i, __n in enumerate(names_new)}, namer
220#: the pre-defined algorithm sort keys and name map
221__ALGO_SORT_KEYS, __ALGO_NAME_MAP = __make_algo_names()
224def instance_sort_key(name: str) -> int:
225 """
226 Get the instance sort key for a given instance name.
228 :param name: the instance name
229 :returns: the sort key
230 """
231 if not isinstance(name, str):
232 raise type_error(name, "name", str)
233 if not name:
234 raise ValueError("name must not be empty.")
235 if name in __INST_SORT_KEYS:
236 return __INST_SORT_KEYS[name]
237 return 1000
240def algorithm_sort_key(name: str) -> int:
241 """
242 Get the algorithm sort key for a given algorithm name.
244 :param name: the algorithm name
245 :returns: the sort key
246 """
247 if not isinstance(name, str):
248 raise type_error(name, "name", str)
249 if not name:
250 raise ValueError("name must not be empty.")
251 if name in __ALGO_SORT_KEYS:
252 return __ALGO_SORT_KEYS[name]
253 return 1000
256def algorithm_namer(name: str) -> str:
257 """
258 Rename algorithm setups.
260 :param name: the algorithm's original name
261 :returns: the new name of the algorithm
262 """
263 if not isinstance(name, str):
264 raise type_error(name, "name", str)
265 if not name:
266 raise ValueError("name must not be empty.")
267 if name in __ALGO_NAME_MAP:
268 return __ALGO_NAME_MAP[name]
269 return name
272def compute_end_results(results_dir: str,
273 dest_dir: str) -> Path:
274 """
275 Get the end results, compute them if necessary.
277 :param results_dir: the results directory
278 :param dest_dir: the destination directory
279 :returns: the path to the end results file.
280 """
281 dest: Final[Path] = directory_path(dest_dir)
282 results_file: Final[Path] = dest.resolve_inside("end_results.txt")
283 if results_file.is_file():
284 return results_file
286 source: Final[Path] = directory_path(results_dir)
287 logger(f"loading end results from path {source!r}.")
288 results: Final[list[EndResult]] = list(end_results_from_logs(source))
289 if not results:
290 raise ValueError(f"Could not find any logs in {source!r}.")
291 results.sort()
292 logger(f"found {len(results)} log files in path {source!r}, storing "
293 f"them in file {results_file!r}.")
294 rf: Path = end_results_to_csv(results, results_file)
295 if rf != results_file:
296 raise ValueError(
297 f"results file should be {results_file!r}, but is {rf!r}.")
298 results_file.enforce_file()
299 logger(f"finished writing file {results_file!r}.")
300 return results_file
303def get_end_results(
304 file: str,
305 insts: set[str] | Callable[[str], bool] | None = None,
306 algos: set[str] | Callable[[str], bool] | None = None) \
307 -> list[EndResult]:
308 """
309 Get a specific set of end results.
311 :param file: the end results file
312 :param insts: only these instances will be included if this parameter is
313 provided
314 :param algos: only these algorithms will be included if this parameter is
315 provided
316 """
318 def __filter(er: EndResult,
319 ins=None if insts is None else
320 insts.__contains__ if isinstance(insts, set)
321 else insts,
322 alg=None if algos is None else
323 algos.__contains__ if isinstance(algos, set)
324 else algos) -> bool:
325 if (ins is not None) and (not ins(er.instance)):
326 return False
327 return not ((alg is not None) and (not alg(er.algorithm)))
329 col: Final[list[EndResult]] = list(end_results_from_csv(
330 file=file, filterer=__filter))
331 if len(col) <= 0:
332 raise ValueError(
333 f"no end results for instances {insts} and algorithms {algos}.")
334 return col
337def compute_end_statistics(end_results_file: str,
338 dest_dir: str) -> Path:
339 """
340 Get the end result statistics, compute them if necessary.
342 :param end_results_file: the end results file
343 :param dest_dir: the destination directory
344 :returns: the path to the end result statistics file.
345 """
346 dest: Final[Path] = directory_path(dest_dir)
347 stats_file: Final[Path] = dest.resolve_inside("end_statistics.txt")
348 if stats_file.is_file():
349 return stats_file
351 results: Final[list[EndResult]] = get_end_results(end_results_file)
352 if len(results) <= 0:
353 raise ValueError("end results cannot be empty")
354 stats: Final[list[EndStatistics]] = list(es_from_end_results(results))
355 if len(stats) <= 0:
356 raise ValueError("end result statistics cannot be empty")
357 stats.sort()
359 sf: Path = es_to_csv(stats, stats_file)
360 if sf != stats_file:
361 raise ValueError(f"stats file should be {stats_file!r} but is {sf!r}")
362 stats_file.enforce_file()
363 logger(f"finished writing file {stats_file!r}.")
364 return stats_file
367def table(end_results: Path, algos: list[str], dest: Path,
368 swap_stats: Iterable[tuple[str, str]] | None = None) -> None:
369 """
370 Tabulate the end results.
372 :param end_results: the path to the end results
373 :param algos: the algorithms
374 :param dest: the directory
375 :param swap_stats: the statistics to swap out
376 """
377 EN.set_current()
378 n: Final[str] = algorithm_namer(algos[0])
379 algo_inst_stat: list | tuple = DEFAULT_ALGORITHM_INSTANCE_STATISTICS
380 algo_sum_stat: list | tuple = DEFAULT_ALGORITHM_SUMMARY_STATISTICS
381 if swap_stats is not None:
382 algo_inst_stat = list(algo_inst_stat)
383 algo_sum_stat = list(algo_sum_stat)
384 for old, swap in swap_stats:
385 found: bool = False
386 for lst in [algo_inst_stat, algo_sum_stat]:
387 for idx, elem in enumerate(lst):
388 if elem == old:
389 found = True
390 lst[idx] = swap
391 break
392 if not found:
393 raise ValueError(f"did not find {old!r} in {str(lst)!r}.")
395 tabulate_end_results(
396 end_results=get_end_results(end_results, algos=set(algos)),
397 file_name=sanitize_name(f"end_results_{n}"), dir_name=dest,
398 instance_sort_key=instance_sort_key,
399 algorithm_sort_key=algorithm_sort_key,
400 col_namer=command_column_namer,
401 algorithm_namer=algorithm_namer,
402 algorithm_instance_statistics=algo_inst_stat,
403 algorithm_summary_statistics=algo_sum_stat,
404 use_lang=False)
407def tests(end_results: Path, algos: list[str], dest: Path) -> None:
408 """
409 Tabulate the end result tests.
411 :param end_results: the path to the end results
412 :param algos: the algorithms
413 :param dest: the directory
414 """
415 EN.set_current()
416 n: Final[str] = algorithm_namer(algos[0])
417 tabulate_result_tests(
418 end_results=get_end_results(end_results, algos=set(algos)),
419 file_name=sanitize_name(f"tests_{n}"), dir_name=dest,
420 instance_sort_key=instance_sort_key,
421 algorithm_sort_key=algorithm_sort_key,
422 algorithm_namer=algorithm_namer,
423 use_lang=False)
426def makespans(end_results: Path, algos: list[str], dest: Path,
427 x_label_location: float = 1.0) -> None:
428 """
429 Plot the end makespans.
431 :param end_results: the path to the end results
432 :param algos: the algorithms
433 :param dest: the directory
434 :param x_label_location: the location of the label of the x-axis
435 """
436 n: Final[str] = algorithm_namer(algos[0])
437 plot_end_makespans(
438 end_results=get_end_results(end_results, algos=set(algos)),
439 name_base=sanitize_name(f"makespan_scaled_{n}"), dest_dir=dest,
440 instance_sort_key=instance_sort_key,
441 algorithm_sort_key=algorithm_sort_key,
442 algorithm_namer=algorithm_namer,
443 x_label_location=x_label_location)
446def gantt(end_results: Path, algo: str, dest: Path, source: Path,
447 best: bool = False,
448 insts: Iterable[str] | None = None) -> None:
449 """
450 Plot the median Gantt charts.
452 :param end_results: the path to the end results
453 :param algo: the algorithm
454 :param dest: the directory
455 :param source: the source directory
456 :param best: should we plot the best instance only (or, otherwise, the
457 median)
458 :param insts: the instances to use
459 """
460 n: Final[str] = algorithm_namer(algo)
461 plot_stat_gantt_charts(
462 get_end_results(end_results, algos={algo},
463 insts=None if insts is None else set(insts)),
464 name_base=sanitize_name(f"best_gantt_{n}" if best else f"gantt_{n}"),
465 dest_dir=dest,
466 results_dir=source,
467 instance_sort_key=instance_sort_key,
468 statistic=cast(
469 "Callable[[Iterable[int | float]], int | float]",
470 min if best else median))
473def progress(algos: list[str], dest: Path, source: Path,
474 log: bool = True, millis: bool = True) -> None:
475 """
476 Plot the median Gantt charts.
478 :param algos: the algorithms
479 :param dest: the directory
480 :param source: the source directory
481 :param log: is the time logarithmically scaled?
482 :param millis: is the time measured in milliseconds?
483 """
484 n: str = f"progress_{algorithm_namer(algos[0])}_"
485 if log:
486 n += "log_"
487 if millis:
488 unit = TIME_UNIT_MILLIS
489 n += "T"
490 else:
491 unit = TIME_UNIT_FES
492 n += "FEs"
493 plot_progresses(results_dir=source,
494 algorithms=algos,
495 name_base=n,
496 dest_dir=dest,
497 log_time=log,
498 time_unit=unit,
499 algorithm_namer=algorithm_namer,
500 instance_sort_key=instance_sort_key,
501 algorithm_sort_key=algorithm_sort_key)
504def makespans_over_param(
505 end_results: Path,
506 selector: Callable[[str], bool],
507 x_getter: Callable[[EndStatistics], int | float],
508 name_base: str,
509 dest_dir: str,
510 x_axis: AxisRanger | Callable[[], AxisRanger]
511 = AxisRanger,
512 x_label: str | None = None,
513 algo_getter: Callable[[str], str] | None = None,
514 title: str | None = None,
515 legend_pos: str = "right",
516 title_x: float = 0.5,
517 y_label_location: float = 1.0) -> list[Path]:
518 """
519 Plot the performance over a parameter.
521 :param end_results: the end results path
522 :param selector: the selector for algorithms
523 :param name_base: the basic name
524 :param dest_dir: the destination directory
525 :param x_getter: the function computing the x-value for each statistics
526 object
527 :param x_axis: the axis ranger
528 :param x_label: the x-axis label
529 :param algo_getter: the optional algorithm name getter (use `name_base` if
530 unspecified)
531 :param title: the optional title (use `name_base` if unspecified)
532 :param legend_pos: the legend position, set to "right"
533 :param title_x: the title x
534 :param y_label_location: the y label location
535 :returns: the list of generated files
536 """
538 def _algo_name_getter(es: EndStatistics,
539 n=name_base, g=algo_getter) -> str:
540 return n if g is None else g(es.algorithm)
542 return plot_end_makespans_over_param(
543 end_results=get_end_results(end_results, algos=selector),
544 x_getter=x_getter, name_base=sanitize_name(f"{name_base}_results"),
545 dest_dir=dest_dir, title=name_base if title is None else title,
546 algorithm_getter=_algo_name_getter, # type: ignore
547 instance_sort_key=instance_sort_key,
548 algorithm_sort_key=algorithm_sort_key,
549 x_axis=x_axis, x_label=x_label,
550 plot_single_instances=algo_getter is None,
551 legend_pos=legend_pos, title_x=title_x,
552 y_label_location=y_label_location)
555def evaluate_experiment(results_dir: str = pp.join(".", "results"),
556 dest_dir: str | None = None) -> None:
557 """
558 Evaluate the experiment.
560 :param results_dir: the results directory
561 :param dest_dir: the destination directory
562 """
563 source: Final[Path] = directory_path(results_dir)
564 dest: Final[Path] = Path(dest_dir or pp.join(source, "..", "evaluation"))
565 dest.ensure_dir_exists()
566 logger(f"Beginning evaluation from {source!r} to {dest!r}.")
568 end_results: Final[Path] = compute_end_results(source, dest)
569 if not end_results:
570 raise ValueError("End results path is empty??")
571 end_stats: Final[Path] = compute_end_statistics(end_results, dest)
572 if not end_stats:
573 raise ValueError("End stats path is empty??")
575 logger("Now evaluating the single random sampling algorithm `1rs`.")
576 table(end_results, ["1rs"], dest)
577 makespans(end_results, ["1rs"], dest)
578 gantt(end_results, "1rs", dest, source)
580 logger("Now evaluating the multi-random sampling algorithm `rs`.")
581 table(end_results, ["rs", "1rs"], dest)
582 makespans(end_results, ["rs", "1rs"], dest)
583 gantt(end_results, "rs", dest, source)
584 progress(["rs"], dest, source)
585 progress(["rs"], dest, source, log=False)
587 logger("Now evaluating the hill climbing algorithm `hc`.")
588 table(end_results, ["hc_swap2", "rs"], dest)
589 makespans(end_results, ["hc_swap2", "rs"], dest)
590 gantt(end_results, "hc_swap2", dest, source)
591 progress(["hc_swap2", "rs"], dest, source)
592 progress(["hc_swap2", "rs"], dest, source, millis=False)
594 logger(f"Finished evaluation from {source!r} to {dest!r}.")
597# Evaluate experiment if run as script
598if __name__ == "__main__":
599 parser: Final[argparse.ArgumentParser] = moptipy_argparser(
600 __file__, "Evaluate the results of the JSSP example experiment",
601 "This experiment evaluates all the results of the JSSP example"
602 " experiment and creates the figures and tables of the "
603 "'Optimization Algorithms' book (see "
604 "http://thomasweise.github.io/oa).")
605 parser.add_argument(
606 "source", nargs="?", default="./results", type=Path,
607 help="the directory with the results of the JSSP experiment")
608 parser.add_argument(
609 "dest", type=Path, nargs="?", default="./evaluation/",
610 help="the directory to write the evaluation results to")
611 args: Final[argparse.Namespace] = parser.parse_args()
612 evaluate_experiment(results_dir=args.source, dest_dir=args.dest)