Source code for moptipy.evaluation.end_results

"""
Record for EndResult as well as parsing, serialization, and parsing.

When doing experiments with `moptipy`, you apply algorithm setups to problem
instances. For each `setup x instance` combination, you may conduct a series
of repetitions (so-called runs) with different random seeds. Each single run
of an algorithm setup on a problem instances can produce a separate log file.
From each log file, we can load a :class:`EndResult` instance, which
represents, well, the end result of the run, i.e., information such as the
best solution quality reached, when it was reached, and the termination
criterion. These end result records then can be the basis for, e.g., computing
summary statistics via :mod:`~moptipy.evaluation.end_statistics` or for
plotting the end result distribution via
:mod:`~moptipy.evaluation.plot_end_results`.
"""
import argparse
from dataclasses import dataclass
from math import inf, isfinite
from typing import Any, Callable, Final, Iterable, cast

from pycommons.io.console import logger
from pycommons.io.csv import (
    CSV_SEPARATOR,
    SCOPE_SEPARATOR,
    csv_column,
    csv_column_or_none,
    csv_read,
    csv_scope,
    csv_str_or_none,
    csv_val_or_none,
    csv_write,
)
from pycommons.io.path import Path, file_path, line_writer
from pycommons.strings.string_conv import (
    int_or_none_to_str,
    num_or_none_to_str,
    num_to_str,
    str_to_num,
)
from pycommons.types import (
    check_int_range,
    check_to_int_range,
    type_error,
)

from moptipy.api.logging import (
    FILE_SUFFIX,
    KEY_ALGORITHM,
    KEY_BEST_F,
    KEY_GOAL_F,
    KEY_INSTANCE,
    KEY_LAST_IMPROVEMENT_FE,
    KEY_LAST_IMPROVEMENT_TIME_MILLIS,
    KEY_MAX_FES,
    KEY_MAX_TIME_MILLIS,
    KEY_RAND_SEED,
    KEY_TOTAL_FES,
    KEY_TOTAL_TIME_MILLIS,
    PROGRESS_CURRENT_F,
    PROGRESS_FES,
    PROGRESS_TIME_MILLIS,
    SECTION_PROGRESS,
)
from moptipy.evaluation._utils import (
    _check_max_time_millis,
)
from moptipy.evaluation.base import (
    DESC_ALGORITHM,
    DESC_ENCODING,
    DESC_INSTANCE,
    DESC_OBJECTIVE_FUNCTION,
    F_NAME_NORMALIZED,
    F_NAME_RAW,
    F_NAME_SCALED,
    KEY_ENCODING,
    KEY_OBJECTIVE_FUNCTION,
    PerRunData,
    _csv_motipy_footer,
)
from moptipy.evaluation.log_parser import SetupAndStateParser
from moptipy.utils.help import moptipy_argparser
from moptipy.utils.math import try_float_div, try_int, try_int_div
from moptipy.utils.strings import (
    sanitize_names,
)

#: a description of the random seed
DESC_RAND_SEED: Final[str] = (
    "the value of the seed of the random number generator used in the run. "
    f"Random seeds are in 0..{int((1 << (8 * 8)) - 1)} and the random "
    f"number generators are those from numpy.")
#: the description of best-F
DESC_BEST_F: Final[str] = (
    " the best (smallest) objective value ever encountered during the run ("
    "regardless whether the algorithm later forgot it again or not).")
#: the description of the last improvement FE
DESC_LAST_IMPROVEMENT_FE: Final[str] = (
    "the objective function evaluation (FE) when the last improving move took"
    " place. 1 FE corresponds to the construction and evaluation "
    "of one solution. The first FE has index 1. With 'last "
    "improving move' we mean the last time when a solution was "
    "discovered that was better than all previous solutions. This "
    "time / FE index is the one when the solution with objective "
    f"value {KEY_BEST_F} was discovered.")
#: the description of the last improvement time milliseconds
DESC_LAST_IMPROVEMENT_TIME_MILLIS: Final[str] = (
    "the clock time in milliseconds after the begin of the run when "
    "the last improving search move took place.")
#: the description of the total FEs
DESC_TOTAL_FES: Final[str] = (
    "the total number of objective function evaluations (FEs) that were "
    "performed during the run.")
#: the total consumed time in milliseconds
DESC_TOTAL_TIME_MILLIS: Final[str] = (
    "the clock time in milliseconds that has passed between the begin of the "
    "run and the end of the run.")
#: the description of the goal objective value
DESC_GOAL_F: Final[str] = (
    "the goal objective value. A run will stop as soon as a solution was"
    "discovered which has an objective value less than or equal to "
    f"{KEY_GOAL_F}. In other words, as soon as {KEY_BEST_F} reaches or dips "
    f"under {KEY_GOAL_F}, the algorithm will stop. If {KEY_GOAL_F} is not "
    "reached, the run will continue until other budget limits are exhausted. "
    "If a lower bound for the objective function is known, this is often used"
    " as a goal objective value. If o goal objective value is specified, this"
    " field is empty.")
#: a description of the budget as the maximum objective function evaluation
DESC_MAX_FES: Final[str] = (
    "the maximum number of permissible FEs per run. As soon as this limit is "
    f"reached, the run will stop. In other words, {KEY_TOTAL_FES} will never "
    f"be more than {KEY_MAX_FES}. A run may stop earlier if some other "
    "termination criterion is reached, but never later.")
#: a description of the budget in terms of maximum runtime
DESC_MAX_TIME_MILLIS: Final[str] = (
    "the maximum number of milliseconds of clock time that a run is permitted"
    " to use as computational budget before being terminated. This limit is "
    "more of a soft limit, as we cannot physically stop a run at arbitrary "
    "points without causing mayhem. Thus, it may be that some runs consume "
    "slightly more runtime than this limit. But the rule is that the "
    "algorithm gets told to stop (via should_terminate() becoming True) as "
    f"soon as this time has elapsed. But generally, {KEY_TOTAL_TIME_MILLIS}<="
    f"{KEY_MAX_TIME_MILLIS} approximately holds.")


[docs] @dataclass(frozen=True, init=False, order=False, eq=False) class EndResult(PerRunData): """ An immutable end result record of one run of one algorithm on one problem. This record provides the information of the outcome of one application of one algorithm to one problem instance in an immutable way. """ #: The best objective value encountered. best_f: int | float #: The index of the function evaluation when best_f was reached. last_improvement_fe: int #: The time when best_f was reached. last_improvement_time_millis: int #: The total number of performed FEs. total_fes: int #: The total time consumed by the run. total_time_millis: int #: The goal objective value if provided goal_f: int | float | None #: The (optional) maximum permitted FEs. max_fes: int | None #: The (optional) maximum runtime. max_time_millis: int | None def __init__(self, algorithm: str, instance: str, objective: str, encoding: str | None, rand_seed: int, best_f: int | float, last_improvement_fe: int, last_improvement_time_millis: int, total_fes: int, total_time_millis: int, goal_f: int | float | None, max_fes: int | None, max_time_millis: int | None): """ Create a consistent instance of :class:`EndResult`. :param algorithm: the algorithm name :param instance: the instance name :param objective: the name of the objective function :param encoding: the name of the encoding that was used, if any, or `None` if no encoding was used :param rand_seed: the random seed :param best_f: the best reached objective value :param last_improvement_fe: the FE when best_f was reached :param last_improvement_time_millis: the time when best_f was reached :param total_fes: the total FEs :param total_time_millis: the total runtime :param goal_f: the goal objective value, if provide :param max_fes: the optional maximum FEs :param max_time_millis: the optional maximum runtime :raises TypeError: if any parameter has a wrong type :raises ValueError: if the parameter values are inconsistent """ super().__init__(algorithm, instance, objective, encoding, rand_seed) object.__setattr__(self, "best_f", try_int(best_f)) object.__setattr__( self, "last_improvement_fe", check_int_range( last_improvement_fe, "last_improvement_fe", 1, 1_000_000_000_000_000)) object.__setattr__( self, "last_improvement_time_millis", check_int_range( last_improvement_time_millis, "last_improvement_time_millis", 0, 100_000_000_000)) object.__setattr__( self, "total_fes", check_int_range( total_fes, "total_fes", last_improvement_fe, 1_000_000_000_000_000)) object.__setattr__( self, "total_time_millis", check_int_range( total_time_millis, "total_time_millis", last_improvement_time_millis, 100_000_000_000)) if goal_f is not None: goal_f = None if goal_f <= -inf else try_int(goal_f) object.__setattr__(self, "goal_f", goal_f) if max_fes is not None: check_int_range(max_fes, "max_fes", total_fes, 1_000_000_000_000_000_000) object.__setattr__(self, "max_fes", max_fes) if max_time_millis is not None: check_int_range( max_time_millis, "max_time_millis", 1, 100_000_000_000) _check_max_time_millis(max_time_millis, total_fes, total_time_millis) object.__setattr__(self, "max_time_millis", max_time_millis) def _tuple(self) -> tuple[Any, ...]: """ Get the tuple representation of this object used in comparisons. :return: the comparison-relevant data of this object in a tuple """ return (self.__class__.__name__, "" if self.algorithm is None else self.algorithm, "" if self.instance is None else self.instance, "" if self.objective is None else self.objective, "" if self.encoding is None else self.encoding, 1, self.rand_seed, "", "", inf if self.goal_f is None else self.goal_f, inf if self.max_fes is None else self.max_fes, inf if self.max_time_millis is None else self.max_time_millis, self.best_f, self.last_improvement_fe, self.last_improvement_time_millis, self.total_fes, self.total_time_millis)
[docs] def success(self) -> bool: """ Check if a run is successful. This method returns `True` if and only if `goal_f` is defined and `best_f <= goal_f` (and `False` otherwise). :return: `True` if and only if `best_f<=goal_f` """ return False if self.goal_f is None else self.best_f <= self.goal_f
[docs] def path_to_file(self, base_dir: str) -> Path: """ Get the path that would correspond to the log file of this end result. Obtain a path that would correspond to the log file of this end result, resolved from a base directory `base_dir`. :param base_dir: the base directory :returns: the path to a file corresponding to the end result record """ return Path(base_dir).resolve_inside( self.algorithm).resolve_inside(self.instance).resolve_inside( sanitize_names([self.algorithm, self.instance, hex(self.rand_seed)]) + FILE_SUFFIX)
[docs] def get_best_f(self) -> int | float: """ Get the best objective value reached. :returns: the best objective value reached """ if not isinstance(self, EndResult): raise type_error(self, "self", EndResult) return self.best_f
[docs] def get_last_improvement_fe(self) -> int: """ Get the index of the function evaluation when `best_f` was reached. :returns: the index of the function evaluation when `best_f` was reached """ if not isinstance(self, EndResult): raise type_error(self, "self", EndResult) return self.last_improvement_fe
[docs] def get_last_improvement_time_millis(self) -> int: """ Get the milliseconds when `best_f` was reached. :returns: the milliseconds when `best_f` was reached """ if not isinstance(self, EndResult): raise type_error(self, "self", EndResult) return self.last_improvement_time_millis
[docs] def get_total_fes(self) -> int: """ Get the total number of performed FEs. :returns: the total number of performed FEs """ if not isinstance(self, EndResult): raise type_error(self, "self", EndResult) return self.total_fes
[docs] def get_total_time_millis(self) -> int: """ Get the total time consumed by the run. :returns: the total time consumed by the run """ if not isinstance(self, EndResult): raise type_error(self, "self", EndResult) return self.total_time_millis
[docs] def get_goal_f(self) -> int | float | None: """ Get the goal objective value, if any. :returns: the goal objective value, if any """ if not isinstance(self, EndResult): raise type_error(self, "self", EndResult) return self.goal_f
[docs] def get_max_fes(self) -> int | None: """ Get the maximum number of FEs permissible. :returns: the maximum number of FEs permissible """ if not isinstance(self, EndResult): raise type_error(self, "self", EndResult) return self.max_fes
[docs] def get_max_time_millis(self) -> int | None: """ Get the maximum permissible milliseconds permitted. :returns: the maximum permissible milliseconds permitted """ if not isinstance(self, EndResult): raise type_error(self, "self", EndResult) return self.max_time_millis
[docs] def get_normalized_best_f(self) -> int | float | None: """ Get the normalized f. :returns: the normalized f """ g: Final[int | float | None] = EndResult.get_goal_f(self) if (g is None) or (g <= 0): return None return try_float_div(self.best_f - g, g)
[docs] def get_scaled_best_f(self) -> int | float | None: """ Get the normalized f. :returns: the normalized f """ g: Final[int | float | None] = EndResult.get_goal_f(self) if (g is None) or (g <= 0): return None return try_float_div(self.best_f, g)
[docs] def get_fes_per_time_milli(self) -> int | float: """ Get the fes per time milliseconds. :returns: the fes per time milliseconds """ return try_int_div(EndResult.get_total_fes(self), max( 1, EndResult.get_total_time_millis(self)))
#: A set of getters for accessing variables of the end result __PROPERTIES: Final[Callable[[str], Callable[[ EndResult], int | float | None]]] = { KEY_LAST_IMPROVEMENT_FE: EndResult.get_last_improvement_fe, "last improvement FE": EndResult.get_last_improvement_fe, KEY_LAST_IMPROVEMENT_TIME_MILLIS: EndResult.get_last_improvement_time_millis, "last improvement ms": EndResult.get_last_improvement_time_millis, KEY_TOTAL_FES: EndResult.get_total_fes, "fes": EndResult.get_total_fes, KEY_TOTAL_TIME_MILLIS: EndResult.get_total_time_millis, "ms": EndResult.get_total_time_millis, KEY_GOAL_F: EndResult.get_goal_f, F_NAME_RAW: EndResult.get_best_f, KEY_BEST_F: EndResult.get_best_f, "f": EndResult.get_best_f, F_NAME_SCALED: EndResult.get_scaled_best_f, "bestFscaled": EndResult.get_scaled_best_f, F_NAME_NORMALIZED: EndResult.get_normalized_best_f, "bestFnormalized": EndResult.get_normalized_best_f, KEY_MAX_FES: EndResult.get_max_fes, "budgetFEs": EndResult.get_max_fes, KEY_MAX_TIME_MILLIS: EndResult.get_max_time_millis, "budgetMS": EndResult.get_max_time_millis, "fesPerTimeMilli": EndResult.get_fes_per_time_milli, }.get
[docs] def getter(dimension: str) -> Callable[[EndResult], int | float | None]: """ Produce a function that obtains the given dimension from EndResults. The following dimensions are supported: 1. `lastImprovementFE`: :attr:`~EndResult.last_improvement_fe` 2. `lastImprovementTimeMillis`: :attr:`~EndResult.last_improvement_time_millis` 3. `totalFEs`: :attr:`~EndResult.total_fes` 4. `totalTimeMillis`: :attr:`~EndResult.total_time_millis` 5. `goalF`: :attr:`~EndResult.goal_f` 6. `plainF`, `bestF`: :attr:`~EndResult.best_f` 7. `scaledF`: :attr:`~EndResult.best_f`/:attr:`~EndResult.goal_f` 8. `normalizedF`: (:attr:`~EndResult.best_f`-attr:`~EndResult.goal_f`)/ :attr:`~EndResult.goal_f` 9. `maxFEs`: :attr:`~EndResult.max_fes` 10. `maxTimeMillis`: :attr:`~EndResult.max_time_millis` 11. `fesPerTimeMilli`: :attr:`~EndResult.total_fes` /:attr:`~EndResult.total_time_millis` :param dimension: the dimension :returns: a callable that returns the value corresponding to the dimension from its input value, which must be an :class:`EndResult` """ result: Callable[[EndResult], int | float] | None = __PROPERTIES( str.strip(dimension)) if result is None: raise ValueError(f"Unknown EndResult dimension {dimension!r}.") return result
[docs] def from_logs( path: str, consumer: Callable[[EndResult], Any], max_fes: int | None | Callable[ [str, str], int | None] = None, max_time_millis: int | None | Callable[ [str, str], int | None] = None, goal_f: int | float | None | Callable[ [str, str], int | float | None] = None) -> None: """ Parse a given path and pass all end results found to the consumer. If `path` identifies a file with suffix `.txt`, then this file is parsed. The appropriate :class:`EndResult` is created and appended to the `collector`. If `path` identifies a directory, then this directory is parsed recursively for each log file found, one record is passed to the `consumer`. As `consumer`, you could pass any `callable` that accepts instances of :class:`EndResult`, e.g., the `append` method of a :class:`list`. Via the parameters `max_fes`, `max_time_millis`, and `goal_f`, you can set virtual limits for the objective function evaluations, the maximum runtime, and the objective value. The :class:`EndResult` records will then not represent the actual final state of the runs but be synthesized from the logged progress information. This, of course, requires such information to be present. It will also raise a `ValueError` if the goals are invalid, e.g., if a runtime limit is specified that is before the first logged points. There is one caveat when specifying `max_time_millis`: Let's say that the log files only log improvements. Then you might have a log point for 7000 FEs, 1000ms, and f=100. The next log point could be 8000 FEs, 1200ms, and f=90. Now if your time limit specified is 1100ms, we know that the end result is f=100 (because f=90 was reached too late) and that the total runtime is 1100ms, as this is the limit you specified and it was also reached. But we do not know the number of consumed FEs. We know you consumed at least 7000 FEs, but you did not consume 8000 FEs. It would be wrong to claim that 7000 FEs were consumed, since it could have been more. We therefore set a virtual end point at 7999 FEs. In terms of performance metrics such as the :mod:`~moptipy.evaluation.ert`, this would be the most conservative choice in that it does not over-estimate the speed of the algorithm. It can, however, lead to very big deviations from the actual values. For example, if your algorithm quickly converged to a local optimum and there simply is no log point that exceeds the virtual time limit but the original run had a huge FE-based budget while your virtual time limit was small, this could lead to an estimate of millions of FEs taking part within seconds... :param path: the path to parse :param consumer: the consumer :param max_fes: the maximum FEs, a callable to compute the maximum FEs from the algorithm and instance name, or `None` if unspecified :param max_time_millis: the maximum runtime in milliseconds, a callable to compute the maximum runtime from the algorithm and instance name, or `None` if unspecified :param goal_f: the goal objective value, a callable to compute the goal objective value from the algorithm and instance name, or `None` if unspecified """ need_goals: bool = False if max_fes is not None: if not callable(max_fes): max_fes = check_int_range( max_fes, "max_fes", 1, 1_000_000_000_000_000) need_goals = True if max_time_millis is not None: if not callable(max_time_millis): max_time_millis = check_int_range( max_time_millis, "max_time_millis", 1, 1_000_000_000_000) need_goals = True if goal_f is not None: if callable(goal_f): need_goals = True else: if not isinstance(goal_f, int | float): raise type_error(goal_f, "goal_f", (int, float, None)) if isfinite(goal_f): need_goals = True elif goal_f <= -inf: goal_f = None else: raise ValueError(f"goal_f={goal_f} is not permissible.") if need_goals: __InnerProgressLogParser( max_fes, max_time_millis, goal_f, consumer).parse(path) else: __InnerLogParser(consumer).parse(path)
[docs] def to_csv(results: Iterable[EndResult], file: str) -> Path: """ Write a sequence of end results to a file in CSV format. :param results: the end results :param file: the path :return: the path of the file that was written """ path: Final[Path] = Path(file) logger(f"Writing end results to CSV file {path!r}.") path.ensure_parent_dir_exists() with path.open_for_write() as wt: csv_write(data=sorted(results), consumer=line_writer(wt), setup=CsvWriter().setup, get_column_titles=CsvWriter.get_column_titles, get_row=CsvWriter.get_row, get_header_comments=CsvWriter.get_header_comments, get_footer_comments=CsvWriter.get_footer_comments) logger(f"Done writing end results to CSV file {path!r}.") return path
[docs] def from_csv(file: str, consumer: Callable[[EndResult], Any], filterer: Callable[[EndResult], bool] = lambda x: True) -> None: """ Parse a given CSV file to get :class:`EndResult` Records. :param file: the path to parse :param consumer: the collector, can be the `append` method of a :class:`list` :param filterer: an optional filter function """ if not callable(consumer): raise type_error(consumer, "consumer", call=True) path: Final[Path] = file_path(file) logger(f"Now reading CSV file {path!r}.") def __cons(r: EndResult, __c=consumer, __f=filterer) -> None: """Consume a record.""" if __f(r): __c(r) with path.open_for_read() as rd: csv_read(rows=rd, setup=CsvReader, parse_row=CsvReader.parse_row, consumer=__cons) logger(f"Done reading CSV file {path!r}.")
[docs] class CsvWriter: """A class for CSV writing of :class:`EndResult`.""" def __init__(self, scope: str | None = None) -> None: """ Initialize the csv writer. :param scope: the prefix to be pre-pended to all columns """ #: an optional scope self.scope: Final[str | None] = ( str.strip(scope)) if scope is not None else None #: has this writer been set up? self.__setup: bool = False #: do we need the encoding? self.__needs_encoding: bool = False #: do we need the max FEs? self.__needs_max_fes: bool = False #: do we need the max millis? self.__needs_max_ms: bool = False #: do we need the goal F? self.__needs_goal_f: bool = False
[docs] def setup(self, data: Iterable[EndResult]) -> "CsvWriter": """ Set up this csv writer based on existing data. :param data: the data to setup with :returns: this writer """ if self.__setup: raise ValueError( "EndResults CsvWriter has already been set up.") self.__setup = True no_encoding: bool = True no_max_fes: bool = True no_max_ms: bool = True no_goal_f: bool = True check: int = 4 for er in data: if no_encoding and (er.encoding is not None): no_encoding = False self.__needs_encoding = True check -= 1 if check <= 0: return self if no_max_fes and (er.max_fes is not None): self.__needs_max_fes = True no_max_fes = False check -= 1 if check <= 0: return self if no_max_ms and (er.max_time_millis is not None): self.__needs_max_ms = True no_max_ms = False check -= 1 if check <= 0: return self if no_goal_f and (er.goal_f is not None) and ( isfinite(er.goal_f)): self.__needs_goal_f = True no_goal_f = False check -= 1 if check <= 0: return self return self
[docs] def get_column_titles(self, dest: Callable[[str], None]) -> None: """ Get the column titles. :param dest: the destination string consumer """ p: Final[str] = self.scope dest(csv_scope(p, KEY_ALGORITHM)) dest(csv_scope(p, KEY_INSTANCE)) dest(csv_scope(p, KEY_OBJECTIVE_FUNCTION)) if self.__needs_encoding: dest(csv_scope(p, KEY_ENCODING)) dest(csv_scope(p, KEY_RAND_SEED)) dest(csv_scope(p, KEY_BEST_F)) dest(csv_scope(p, KEY_LAST_IMPROVEMENT_FE)) dest(csv_scope(p, KEY_LAST_IMPROVEMENT_TIME_MILLIS)) dest(csv_scope(p, KEY_TOTAL_FES)) dest(csv_scope(p, KEY_TOTAL_TIME_MILLIS)) if self.__needs_goal_f: dest(csv_scope(p, KEY_GOAL_F)) if self.__needs_max_fes: dest(csv_scope(p, KEY_MAX_FES)) if self.__needs_max_ms: dest(csv_scope(p, KEY_MAX_TIME_MILLIS))
[docs] def get_row(self, data: EndResult, dest: Callable[[str], None]) -> None: """ Render a single end result record to a CSV row. :param data: the end result record :param dest: the string consumer """ dest(data.algorithm) dest(data.instance) dest(data.objective) if self.__needs_encoding: dest(data.encoding if data.encoding else "") dest(hex(data.rand_seed)) dest(num_to_str(data.best_f)) dest(str(data.last_improvement_fe)) dest(str(data.last_improvement_time_millis)) dest(str(data.total_fes)) dest(str(data.total_time_millis)) if self.__needs_goal_f: dest(num_or_none_to_str(data.goal_f)) if self.__needs_max_fes: dest(int_or_none_to_str(data.max_fes)) if self.__needs_max_ms: dest(int_or_none_to_str(data.max_time_millis))
[docs] def get_header_comments(self, dest: Callable[[str], None]) -> None: """ Get any possible header comments. :param dest: the destination """ dest("Experiment End Results") dest("See the description at the bottom of the file.")
[docs] class CsvReader: """A csv parser for end results.""" def __init__(self, columns: dict[str, int]) -> None: """ Create a CSV parser for :class:`EndResult`. :param columns: the columns """ super().__init__() if not isinstance(columns, dict): raise type_error(columns, "columns", dict) #: the index of the algorithm column, if any self.__idx_algorithm: Final[int] = csv_column(columns, KEY_ALGORITHM) #: the index of the instance column, if any self.__idx_instance: Final[int] = csv_column(columns, KEY_INSTANCE) #: the index of the objective function column, if any self.__idx_objective: Final[int] = csv_column( columns, KEY_OBJECTIVE_FUNCTION) #: the index of the encoding column, if any self.__idx_encoding = csv_column_or_none(columns, KEY_ENCODING) #: the index of the random seed column self.__idx_seed: Final[int] = csv_column(columns, KEY_RAND_SEED) #: the column with the last improvement FE self.__idx_li_fe: Final[int] = csv_column( columns, KEY_LAST_IMPROVEMENT_FE) #: the column with the last improvement time milliseconds self.__idx_li_ms: Final[int] = csv_column( columns, KEY_LAST_IMPROVEMENT_TIME_MILLIS) #: the column with the best obtained objective value self.__idx_best_f: Final[int] = csv_column(columns, KEY_BEST_F) #: the column with the total time in FEs self.__idx_tt_fe: Final[int] = csv_column(columns, KEY_TOTAL_FES) #: the column with the total time in milliseconds self.__idx_tt_ms: Final[int] = csv_column( columns, KEY_TOTAL_TIME_MILLIS) #: the column with the goal objective value, if any self.__idx_goal_f: Final[int | None] = csv_column_or_none( columns, KEY_GOAL_F) #: the column with the maximum FEs, if any such budget constraint was #: defined self.__idx_max_fes: Final[int | None] = csv_column_or_none( columns, KEY_MAX_FES) #: the column with the maximum runtime in milliseconds, if any such #: budget constraint was defined self.__idx_max_ms: Final[int | None] = csv_column_or_none( columns, KEY_MAX_TIME_MILLIS)
[docs] def parse_row(self, data: list[str]) -> EndResult: """ Parse a row of data. :param data: the data row :return: the end result statistics """ return EndResult( data[self.__idx_algorithm], # algorithm data[self.__idx_instance], # instance data[self.__idx_objective], # objective csv_str_or_none(data, self.__idx_encoding), # encoding int((data[self.__idx_seed])[2:], 16), # rand seed str_to_num(data[self.__idx_best_f]), # best_f int(data[self.__idx_li_fe]), # last_improvement_fe int(data[self.__idx_li_ms]), # last_improvement_time_millis int(data[self.__idx_tt_fe]), # total_fes int(data[self.__idx_tt_ms]), # total_time_millis csv_val_or_none(data, self.__idx_goal_f, str_to_num), csv_val_or_none(data, self.__idx_max_fes, int), # max_fes csv_val_or_none(data, self.__idx_max_ms, int)) # max_time_ms
class __InnerLogParser(SetupAndStateParser): """The internal log parser class.""" def __init__(self, consumer: Callable[[EndResult], Any]): """ Create the internal log parser. :param consumer: the consumer accepting the parsed data """ super().__init__() if not callable(consumer): raise type_error(consumer, "consumer", call=True) self.__consumer: Final[Callable[[EndResult], Any]] = consumer def process(self) -> None: self.__consumer(EndResult(self.algorithm, self.instance, self.objective, self.encoding, self.rand_seed, self.best_f, self.last_improvement_fe, self.last_improvement_time_millis, self.total_fes, self.total_time_millis, self.goal_f, self.max_fes, self.max_time_millis)) def _join_goals(vlimit, vgoal, select): # noqa if vlimit is None: return vgoal if vgoal is None: return vlimit return select(vlimit, vgoal) class __InnerProgressLogParser(SetupAndStateParser): """The internal log parser class for virtual end results.""" def __init__( self, max_fes: int | None | Callable[[str, str], int | None], max_time_millis: int | None | Callable[[str, str], int | None], goal_f: int | float | None | Callable[ [str, str], int | float | None], consumer: Callable[[EndResult], Any]): """ Create the internal log parser. :param consumer: the consumer :param max_fes: the maximum FEs, or `None` if unspecified :param max_time_millis: the maximum runtime in milliseconds, or `None` if unspecified :param goal_f: the goal objective value, or `None` if unspecified """ super().__init__() if not callable(consumer): raise type_error(consumer, "consumer", call=True) self.__consumer: Final[Callable[[EndResult], Any]] = consumer self.__src_limit_ms: Final[ int | None | Callable[[str, str], int | None]] = max_time_millis self.__src_limit_fes: Final[ int | None | Callable[[str, str], int | None]] = max_fes self.__src_limit_f: Final[ int | float | None | Callable[ [str, str], int | float | None]] = goal_f self.__limit_ms: int | float = inf self.__limit_ms_n: int | None = None self.__limit_fes: int | float = inf self.__limit_fes_n: int | None = None self.__limit_f: int | float = -inf self.__limit_f_n: int | float | None = None self.__stop_fes: int | None = None self.__stop_ms: int | None = None self.__stop_f: int | float | None = None self.__stop_li_fe: int | None = None self.__stop_li_ms: int | None = None self.__hit_goal: bool = False self.__state: int = 0 def end_file(self) -> bool: if self.__state != 2: raise ValueError( "Illegal state, log file must have a " f"{SECTION_PROGRESS!r} section.") self.__state = 0 return super().end_file() def process(self) -> None: hit_goal = self.__hit_goal stop_fes: int = self.__stop_fes stop_ms: int = self.__stop_ms if not hit_goal: stop_ms = max(stop_ms, cast(int, min( self.total_time_millis, self.__limit_ms))) ul_fes = self.total_fes if stop_ms < self.total_time_millis: ul_fes = ul_fes - 1 stop_fes = max(stop_fes, cast(int, min( ul_fes, self.__limit_fes))) self.__consumer(EndResult( algorithm=self.algorithm, instance=self.instance, objective=self.objective, encoding=self.encoding, rand_seed=self.rand_seed, best_f=self.__stop_f, last_improvement_fe=self.__stop_li_fe, last_improvement_time_millis=self.__stop_li_ms, total_fes=stop_fes, total_time_millis=stop_ms, goal_f=_join_goals(self.__limit_f_n, self.goal_f, max), max_fes=_join_goals(self.__limit_fes_n, self.max_fes, min), max_time_millis=_join_goals( self.__limit_ms_n, self.max_time_millis, min))) self.__stop_fes = None self.__stop_ms = None self.__stop_f = None self.__stop_li_fe = None self.__stop_li_ms = None self.__limit_fes_n = None self.__limit_fes = inf self.__limit_ms_n = None self.__limit_ms = inf self.__limit_f_n = None self.__limit_f = -inf self.__hit_goal = False def start_file(self, path: Path) -> bool: if super().start_file(path): if (self.algorithm is None) or (self.instance is None): raise ValueError( f"Invalid state: algorithm={self.algorithm!r}, " f"instance={self.instance!r}.") fes = self.__src_limit_fes(self.algorithm, self.instance) \ if callable(self.__src_limit_fes) else self.__src_limit_fes self.__limit_fes_n = None if fes is None else \ check_int_range(fes, "limit_fes", 1, 1_000_000_000_000_000) self.__limit_fes = inf if self.__limit_fes_n is None \ else self.__limit_fes_n time = self.__src_limit_ms(self.algorithm, self.instance) \ if callable(self.__src_limit_ms) else self.__src_limit_ms self.__limit_ms_n = None if time is None else \ check_int_range(time, "limit_ms", 1, 1_000_000_000_000) self.__limit_ms = inf if self.__limit_ms_n is None \ else self.__limit_ms_n self.__limit_f_n = self.__src_limit_f( self.algorithm, self.instance) \ if callable(self.__src_limit_f) else self.__src_limit_f if self.__limit_f_n is not None: if not isinstance(self.__limit_f_n, int | float): raise type_error(self.__limit_f_n, "limit_f", ( int, float)) if not isfinite(self.__limit_f_n): if self.__limit_f_n <= -inf: self.__limit_f_n = None else: raise ValueError( f"invalid limit f={self.__limit_f_n} for " f"{self.algorithm} on {self.instance}") self.__limit_f = -inf if self.__limit_f_n is None \ else self.__limit_f_n return True return False def start_section(self, title: str) -> bool: if title == SECTION_PROGRESS: if self.__state != 0: raise ValueError(f"Already did section {title}.") self.__state = 1 return True return super().start_section(title) def needs_more_lines(self) -> bool: return (self.__state < 2) or super().needs_more_lines() def lines(self, lines: list[str]) -> bool: if not isinstance(lines, list): raise type_error(lines, "lines", list) if self.__state != 1: return super().lines(lines) self.__state = 2 n_rows = len(lines) if n_rows < 2: raise ValueError("lines must contain at least two elements," f"but contains {n_rows}.") columns = [c.strip() for c in lines[0].split(CSV_SEPARATOR)] fe_col: Final[int] = columns.index(PROGRESS_FES) ms_col: Final[int] = columns.index(PROGRESS_TIME_MILLIS) f_col: Final[int] = columns.index(PROGRESS_CURRENT_F) current_fes: int = -1 current_ms: int = -1 current_f: int | float = inf current_li_fe: int | None = None current_li_ms: int | None = None stop_fes: int | None = None stop_ms: int | None = None stop_f: int | float | None = None stop_li_fe: int | None = None stop_li_ms: int | None = None limit_fes: Final[int | float] = self.__limit_fes limit_ms: Final[int | float] = self.__limit_ms limit_f: Final[int | float] = self.__limit_f for line in lines[1:]: values = line.split(CSV_SEPARATOR) current_fes = check_to_int_range( values[fe_col], "fes", current_fes, 1_000_000_000_000_000) current_ms = check_to_int_range( values[ms_col], "ms", current_ms, 1_000_000_000_00) f: int | float = str_to_num(values[f_col]) if (current_fes <= limit_fes) and (current_ms <= limit_ms): if f < current_f: # can only update best within budget current_f = f current_li_fe = current_fes current_li_ms = current_ms stop_ms = current_ms stop_fes = current_fes stop_f = current_f stop_li_fe = current_li_fe stop_li_ms = current_li_ms if (current_fes >= limit_fes) or (current_ms >= limit_ms) or \ (current_f <= limit_f): self.__hit_goal = True break # we can stop parsing the stuff if (stop_fes is None) or (stop_ms is None) or (stop_f is None) \ or (current_fes <= 0) or (not isfinite(current_f)): raise ValueError( "Illegal state, no fitting data point found: stop_fes=" f"{stop_fes}, stop_ms={stop_ms}, stop_f={stop_f}, " f"current_fes={current_fes}, current_ms={current_ms}, " f"current_f={current_f}.") if current_fes >= limit_fes: stop_fes = max(stop_fes, min( cast(int, limit_fes), current_fes)) elif current_ms > limit_ms: stop_fes = max(stop_fes, current_fes - 1) else: stop_fes = max(stop_fes, current_fes) if current_ms >= limit_ms: stop_ms = max(stop_ms, min(cast(int, limit_ms), current_ms)) else: stop_ms = max(stop_ms, current_ms) self.__stop_fes = stop_fes self.__stop_ms = stop_ms self.__stop_f = stop_f self.__stop_li_fe = stop_li_fe self.__stop_li_ms = stop_li_ms return self.needs_more_lines() # Run log files to end results if executed as script if __name__ == "__main__": parser: Final[argparse.ArgumentParser] = moptipy_argparser( __file__, "Convert log files obtained with moptipy to the end results CSV " "format that can be post-processed or exported to other tools.", "This program recursively parses a folder hierarchy created by" " the moptipy experiment execution facility. This folder " "structure follows the scheme of algorithm/instance/log_file " "and has one log file per run. As result of the parsing, one " "CSV file (where columns are separated by ';') is created with" " one row per log file. This row contains the end-of-run state" " loaded from the log file. Whereas the log files may store " "the complete progress of one run of one algorithm on one " "problem instance as well as the algorithm configuration " "parameters, instance features, system settings, and the final" " results, the end results CSV file will only represent the " "final result quality, when it was obtained, how long the runs" " took, etc. This information is much denser and smaller and " "suitable for importing into other tools such as Excel or for " "postprocessing.") parser.add_argument( "source", nargs="?", default="./results", help="the location of the experimental results, i.e., the root folder " "under which to search for log files", type=Path) parser.add_argument( "dest", help="the path to the end results CSV file to be created", type=Path, nargs="?", default="./evaluation/end_results.txt") parser.add_argument( "--maxFEs", help="the maximum permitted FEs", type=int, nargs="?", default=None) parser.add_argument( "--maxTime", help="the maximum permitted time in milliseconds", type=int, nargs="?", default=None) parser.add_argument( "--goalF", help="the goal objective value", type=str_to_num, nargs="?", default=None) args: Final[argparse.Namespace] = parser.parse_args() end_results: Final[list[EndResult]] = [] from_logs(args.source, end_results.append, args.maxFEs, args.maxTime, args.goalF) to_csv(end_results, args.dest)