Source code for moptipy.evaluation.frequency

"""
Load the encounter frequencies or the set of different objective values.

This tool can load the different objective values that exist or are
encountered by optimization processes. This may be useful for statistical
evaluations or fitness landscape analyses.

This tool is based on code developed by Mr. Tianyu LIANG (梁天宇),
MSc student at the Institute of Applied Optimization (IAO,
应用优化研究所) of the School of Artificial Intelligence and
Big Data (人工智能与大数据学院) of Hefei University (合肥学院).
"""
import argparse
import os.path
from collections import Counter
from gc import collect
from math import isfinite
from typing import Any, Callable, Final, Iterable

from pycommons.io.console import logger
from pycommons.io.csv import CSV_SEPARATOR, SCOPE_SEPARATOR
from pycommons.io.path import Path
from pycommons.strings.string_conv import str_to_num
from pycommons.types import type_error

from moptipy.algorithms.so.fea1plus1 import H_LOG_SECTION
from moptipy.api.logging import (
    KEY_F_LOWER_BOUND,
    KEY_F_UPPER_BOUND,
    KEY_INSTANCE,
    PROGRESS_CURRENT_F,
    SCOPE_OBJECTIVE_FUNCTION,
    SECTION_PROGRESS,
)
from moptipy.evaluation.base import (
    MultiRunData,
    PerRunData,
)
from moptipy.evaluation.log_parser import SetupAndStateParser
from moptipy.utils.help import moptipy_argparser

#: the lower bound of the objective function
_FULL_KEY_LOWER_BOUND: Final[str] = \
    f"{SCOPE_OBJECTIVE_FUNCTION}{SCOPE_SEPARATOR}{KEY_F_LOWER_BOUND}"
#: the upper bound of the objective function
_FULL_KEY_UPPER_BOUND: Final[str] = \
    f"{SCOPE_OBJECTIVE_FUNCTION}{SCOPE_SEPARATOR}{KEY_F_UPPER_BOUND}"


[docs] def from_logs( path: str, consumer: Callable[[PerRunData, Counter[ int | float]], Any], report_progress: bool = True, report_lower_bound: bool = False, report_upper_bound: bool = False, report_goal_f: bool = False, report_h: bool = True, per_instance_known: Callable[[ str], Iterable[int | float]] = lambda _: ()) -> None: """ Parse a path, pass all discovered objective values per-run to a consumer. This function parses the log files in a directory recursively. For each log file, it produces a `Counter` filled with all encountered objective values and their "pseudo" encounter frequencies. "pseudo" because the values returned depend very much on how the function is configured. First, if all other parameters are set to `False`, the function passes a `Counter` to the `consumer` where the best encountered objective value has frequency `1` and no other data is present. If `report_progress` is `True`, then each time any objective value is encountered in the `PROGRESS` section, its counter is incremented by `1`. If the `PROGRESS` section is present, that it. The best encountered objective value will have a count of at least one either way. If `report_goal_f`, `report_lower_bound`, or `report_upper_bound` are `True`, then it is ensured that the goal objective value of the optimization process, the lower bound of the objective function, or the upper bound of the objective function will have a corresponding count of at least `1` if they are present in the log files (in the `SETUP` section). If `report_h` is `True`, then a frequency fitness assignment `H` section is parsed, if present (see :mod:`~moptipy.algorithms.so.fea1plus1`). Such a section contains tuples of objective values and encounter frequencies. These encounter frequencies are added to the counter. This means that if you set *both* `report_progress` and `report_h` to `True`, you will get frequencies that are too high. Finally, the function `per_instance_known` may return a set of known objective values for a given instance (based on its parameter, the instance name). Each such objective value will have a frequency of at least `1`. Generally, if we want the actual encounter frequencies of objective values, we could log *all FEs* to the log files and set `report_progress` to `True` and everything else to `False`. Then we get correct encounter frequencies. Alternatively, if we have a purly FFA-based algorithm (see, again, :mod:`~moptipy.algorithms.so.fea1plus1`), then we can set `report_progress` to `True` and everything else to `False` to get a similar result, but the encounter frequencies then depend on the selection scheme. Alternatively, if we only care about whether an objective value was encountered or not, we can simply set both to `True`. Finally, if we want to get all *possible* objective values, then we may also set `report_goal_f`, `report_lower_bound`, or `report_upper_bound` to `True` **if** we are sure that the corresponding objective values do actually exist (and are not just bounds that can never be reached). :param path: the path to parse :param consumer: the consumer receiving, for each log file, an instance of :class:`~moptipy.evaluation.base.PerRunData` identifying the run and a dictionary with the objective values and lower bounds of their existence or encounter frequency. Warning: The dictionary will be cleared and re-used for all files. :param report_progress: should all values in the `PROGRESS` section be reported, if such section exists? :param report_lower_bound: should the lower bound reported, if any lower bound for the objective function is listed? :param report_upper_bound: should the upper bound reported, if any upper bound for the objective function is listed? :param report_h: should all values in the `H` section be reported, if such section exists? :param report_goal_f: should we report the goal objective value, if it is specified? :param per_instance_known: a function that returns a set of known objective values per instance """ __InnerLogParser(consumer, report_progress, report_lower_bound, report_upper_bound, report_h, report_goal_f, per_instance_known).parse(path)
[docs] def aggregate_from_logs( path: str, consumer: Callable[[MultiRunData, Counter[ int | float]], Any], per_instance: bool = True, per_algorithm_instance: bool = True, report_progress: bool = True, report_lower_bound: bool = False, report_upper_bound: bool = False, report_goal_f: bool = False, report_h: bool = True, per_instance_known: Callable[[ str], Iterable[int | float]] = lambda _: ()) -> None: """ Parse a path, aggregate all discovered objective values to a consumer. A version of :func:`from_logs` that aggregates results on a per-instance and/or per-algorithm-instance combination. The basic process of loading the data is described in :func:`from_logs`. :param path: the path to parse :param consumer: the consumer receiving the aggregated results :param per_instance: pass results to the consumer that are aggregated over all algorithms and setups and runs for a given instance :param per_algorithm_instance: pass results to the consumer that are aggregated over all runs and setups for a given algorithm-instance combination :param report_progress: see :func:`from_logs` :param report_lower_bound: see :func:`from_logs` :param report_upper_bound: see :func:`from_logs` :param report_h: see :func:`from_logs` :param report_goal_f: see :func:`from_logs` :param per_instance_known: see :func:`from_logs` """ if not callable(consumer): raise type_error(consumer, "consumer", call=True) if not isinstance(per_instance, bool): raise type_error(per_instance, "per_instance", bool) if not isinstance(per_algorithm_instance, bool): raise type_error( per_algorithm_instance, "per_algorithm_instance", bool) collection: Final[dict[tuple[str, int, str], Counter[int | float]]] = {} runs: Final[Counter[tuple[str, int, str]]] = Counter() def __consume(d: PerRunData, c: Counter[int | float]) -> None: nonlocal collection nonlocal runs nonlocal per_instance nonlocal per_algorithm_instance inst: Final[str] = d.instance ai: tuple[str, int, str] if per_instance: ai = (inst, 1, "") runs[ai] += 1 if ai in collection: collection[ai] += c else: collection[ai] = Counter(c) if per_algorithm_instance: ai = (inst, 0, d.algorithm) runs[ai] += 1 if ai in collection: collection[ai] += c else: collection[ai] = Counter(c) from_logs(path, __consume, report_progress, report_lower_bound, report_upper_bound, report_goal_f, report_h, per_instance_known) for key in sorted(collection.keys()): val = collection[key] del collection[key] consumer(MultiRunData(key[2] if key[2] else None, key[0], None, None, runs[key]), val) del val
[docs] def number_of_objective_values_to_csv( input_dir: str, output_file: str, per_instance: bool = True, per_algorithm_instance: bool = True, report_lower_bound: bool = False, report_upper_bound: bool = False, report_goal_f: bool = False, per_instance_known: Callable[[ str], Iterable[int | float]] = lambda _: ()) -> None: """ Print the number of unique objective values to a CSV file. A version of :func:`aggregate_from_logs` that collects the existing objective values and prints an overview to a file. :param input_dir: the path to parse :param output_file: the output file to generate :param per_instance: pass results to the consumer that are aggregated over all algorithms and setups and runs for a given instance :param per_algorithm_instance: pass results to the consumer that are aggregated over all runs and setups for a given algorithm-instance combination :param report_lower_bound: see :func:`from_logs` :param report_upper_bound: see :func:`from_logs` :param report_goal_f: see :func:`from_logs` :param per_instance_known: see :func:`from_logs` """ input_path: Final[Path] = Path(input_dir) output_path: Final[Path] = Path(output_file) logger(f"Collecting number of objective values from {input_path!r} " f"in {output_path!r}.") logger(f"Lower bounds will{'' if report_lower_bound else ' not'} be " "treated as existing objective values.") logger(f"Upper bounds wil {'' if report_upper_bound else ' not'} be " "treated as existing objective values.") logger("Goal objective values bounds will" f"{'' if report_upper_bound else ' not'} be treated as existing " "objective values.") Path(os.path.dirname(output_path)).ensure_dir_exists() data: Counter[tuple[str, str]] = Counter() instances: set[str] = set() algorithms: set[str] = set() def __collector(d: MultiRunData, c: Counter[int | float]) -> None: nonlocal data nonlocal instances nonlocal algorithms inst: str = "" if d.instance is not None: inst = d.instance instances.add(inst) algo: str = "" if d.algorithm is not None: algo = d.algorithm algorithms.add(algo) data[(inst, algo)] += len(c) aggregate_from_logs( input_path, __collector, per_instance, per_algorithm_instance, True, report_lower_bound, report_upper_bound, report_goal_f, True, per_instance_known) algos: Final[list[str]] = sorted(algorithms) logger(f"Now writing information gathered for {len(algos)} " f"algorithms and {len(instances)} to {output_path!r}.") with output_path.open_for_write() as ow: wrt = ow.write # write the header wrt(KEY_INSTANCE) for a in algos: wrt(CSV_SEPARATOR) wrt(a) wrt(CSV_SEPARATOR) wrt("all") wrt("\n") # write the output for instance in sorted(instances): wrt(instance) for a in algos: wrt(CSV_SEPARATOR) wrt(str(data[(instance, a)])) wrt(CSV_SEPARATOR) wrt(str(data[(instance, "")])) wrt("\n") logger(f"Finished writing {output_path!r}.")
class __InnerLogParser(SetupAndStateParser): """The internal log parser class for gathering objective values.""" def __init__(self, consumer: Callable[[PerRunData, Counter[int | float]], Any], report_progress: bool, report_lower_bound: bool, report_upper_bound: bool, report_h: bool, report_goal_f: bool, per_instance_known: Callable[[ str], Iterable[int | float]]): """ Create the internal log parser. :param consumer: the consumer :param report_progress: should all values in the `PROGRESS` section be reported, if such section exists? :param report_lower_bound: should the lower bound reported, if any lower bound for the objective function is listed? :param report_upper_bound: should the upper bound reported, if any upper bound for the objective function is listed? :param report_h: should all values in the `H` section be reported, if such section exists? :param report_goal_f: should we report the goal objective value, if it is specified? :param per_instance_known: a function that returns a set of known objective values per instance """ super().__init__() if not callable(consumer): raise type_error(consumer, "consumer", call=True) if not isinstance(report_progress, bool): raise type_error(report_progress, "report_progress", bool) if not isinstance(report_lower_bound, bool): raise type_error(report_lower_bound, "report_lower_bound", bool) if not isinstance(report_upper_bound, bool): raise type_error(report_upper_bound, "report_upper_bound", bool) if not isinstance(report_h, bool): raise type_error(report_h, "report_h", bool) if not isinstance(report_goal_f, bool): raise type_error(report_goal_f, "report_goal_f", bool) if not callable(per_instance_known): raise type_error( per_instance_known, "per_instance_known", call=True) #: the consumer self.__consumer: Final[Callable[[PerRunData, Counter], Any]] = consumer #: report the progress self.__report_progress: Final[bool] = report_progress #: report the frequency history self.__report_h: Final[bool] = report_h #: report the lower bound, if any self.__report_lower_bound: Final[bool] = report_lower_bound #: report the upper bound, if any self.__report_upper_bound: Final[bool] = report_upper_bound #: report the goal objective value self.__report_goal_f: Final[bool] = report_goal_f #: the per-instance known objective values self.__per_instance_known: Final[Callable[ [str], Iterable[int | float]]] = per_instance_known #: the internal counter self.__counter: Final[Counter[int | float]] = Counter() #: the internal state variable self.__state_progress: int = 0 #: the internal state variable self.__state_h: int = 0 def start_file(self, path: Path) -> bool: """ Begin parsing the file identified by `path`. :param path: the path identifying the file """ if not super().start_file(path): return False self.__state_progress = 0 if self.__report_progress else -1 self.__state_h = 0 if self.__report_h else -1 return True def parse_dir(self, path: str) -> bool: ret: Final[bool] = super().parse_dir(path) collect() return ret def process(self) -> None: counter: Final[Counter[int | float]] = self.__counter # report goal objective value, if encountered if (self.__report_goal_f and (self.goal_f is not None) and isfinite(self.goal_f)): counter[self.goal_f] = max(counter[self.goal_f], 1) # add the known values for val in self.__per_instance_known(self.instance): if isfinite(val): counter[val] = max(counter[val], 1) # add the best objective value if (self.best_f is not None) and isfinite(self.best_f): counter[self.best_f] = max(counter[self.best_f], 1) self.__consumer(PerRunData( algorithm=self.algorithm, instance=self.instance, objective=self.objective, encoding=self.encoding, rand_seed=self.rand_seed), counter) counter.clear() def start_section(self, title: str) -> bool: if title == SECTION_PROGRESS: if self.__state_progress >= 2: raise ValueError(f"Already did section {title}.") if self.__state_progress < 0: self.__state_progress = 2 return False self.__state_progress = 1 return True if title == H_LOG_SECTION: if self.__state_h >= 2: raise ValueError(f"Already did section {title}.") if self.__state_h < 0: self.__state_h = 2 return False self.__state_h = 1 return True return super().start_section(title) def setup_section(self, data: dict[str, str]) -> None: """ Parse the data from the `setup` section. :param data: the parsed data """ super().setup_section(data) if self.__report_lower_bound and (_FULL_KEY_LOWER_BOUND in data): lb: Final[int | float] = str_to_num( data[_FULL_KEY_LOWER_BOUND]) if isfinite(lb): self.__counter[lb] = max(self.__counter[lb], 1) if self.__report_upper_bound and (_FULL_KEY_UPPER_BOUND in data): ub: Final[int | float] = str_to_num( data[_FULL_KEY_UPPER_BOUND]) if isfinite(ub): self.__counter[ub] = max(self.__counter[ub], 1) def needs_more_lines(self) -> bool: return ((self.__state_h == 0) or (self.__state_progress == 0) or super().needs_more_lines()) def lines(self, lines: list[str]) -> bool: if not isinstance(lines, list): raise type_error(lines, "lines", list) counter: Counter[int | float] if self.__state_progress == 1: self.__state_progress = 2 columns = [c.strip() for c in lines[0].split(CSV_SEPARATOR)] f_col: Final[int] = columns.index(PROGRESS_CURRENT_F) counter = self.__counter for line in lines[1:]: f: int | float = str_to_num(line.split( CSV_SEPARATOR)[f_col]) counter[f] += 1 elif self.__state_h == 1: self.__state_h = 2 counter = self.__counter for line in lines: split = line.split(CSV_SEPARATOR) for i in range(0, len(split), 2): counter[str_to_num(split[i])] += int(split[i + 1]) else: return super().lines(lines) return self.needs_more_lines() # Print a CSV file if __name__ == "__main__": parser: Final[argparse.ArgumentParser] = moptipy_argparser( __file__, "Collecting the Number of Existing Objective Values.", "Gather all the existing objective values and store them in a " "CSV-formatted file.") parser.add_argument( "source", nargs="?", default="./results", help="the location of the experimental results, i.e., the root folder " "under which to search for log files", type=Path) parser.add_argument( "dest", help="the path to the end results CSV file to be created", type=Path, nargs="?", default="./evaluation/objective_values.txt") parser.add_argument( "--lb", help="count the lower bound of the " "objective as objective value", action="store_true") parser.add_argument( "--ub", help="count the upper bound of the " "objective as objective value", action="store_true") parser.add_argument( "--goal", help="count the goal objective value as " "existing objective value", action="store_true") args: Final[argparse.Namespace] = parser.parse_args() number_of_objective_values_to_csv( args.source, args.dest, per_instance=True, per_algorithm_instance=True, report_lower_bound=args.lb, report_upper_bound=args.ub, report_goal_f=args.goal)