Source code for moptipy.api.experiment

"""
The experiment execution API.

Via the function :func:`run_experiment`, you can execute a complex experiment
where multiple optimization algorithms are applied to multiple problem
instances, where log files with the results and progress information about the
runs are collected, and where multiprocessing is used to parallelize the
experiment execution.
Experiments are replicable, as random seeds are automatically generated based
on problem instance names in a replicable fashion.

The log files are structured according to the documentation in
https://thomasweise.github.io/moptipy/#file-names-and-folder-structure
and their contents follow the specification given in
https://thomasweise.github.io/moptipy/#log-file-sections.
"""
import copy
import gc
import multiprocessing as mp
import os.path
import platform
from contextlib import AbstractContextManager, nullcontext
from enum import IntEnum
from math import ceil
from typing import Any, Callable, Final, Iterable, Sequence, cast

import psutil  # type: ignore
from numpy.random import Generator, default_rng
from pycommons.ds.cache import str_is_new
from pycommons.io.console import logger
from pycommons.io.path import Path
from pycommons.types import check_int_range, type_error

from moptipy.api.execution import Execution
from moptipy.api.logging import FILE_SUFFIX
from moptipy.api.process import Process
from moptipy.utils.nputils import rand_seeds_from_str
from moptipy.utils.strings import sanitize_name, sanitize_names
from moptipy.utils.sys_info import get_sys_info, update_sys_info_cpu_affinity


def __run_experiment(base_dir: Path,
                     experiments: list[list[Callable]],
                     n_runs: list[int],
                     perform_warmup: bool,
                     warmup_fes: int,
                     perform_pre_warmup: bool,
                     pre_warmup_fes: int,
                     file_lock: AbstractContextManager,
                     stdio_lock: AbstractContextManager,
                     cache: Callable[[str], bool],
                     thread_id: str,
                     pre_warmup_barrier,
                     on_completion: Callable[[
                         Any, Path, Process], None]) -> None:
    """
    Execute a single thread of experiments.

    :param base_dir: the base directory
    :param experiments: the stream of experiment setups
    :param perform_warmup: should we perform a warm-up per instance?
    :param warmup_fes: the number of the FEs for the warm-up runs
    :param perform_pre_warmup: should we do one warmup run for each
        instance before we begin with the actual experiments?
    :param pre_warmup_fes: the FEs for the pre-warmup runs
    :param file_lock: the lock for file operations
    :param stdio_lock: the lock for log output
    :param cache: the cache
    :param thread_id: the thread id
    :param pre_warmup_barrier: a barrier to wait at after the pre-warmup
    :param on_completion: a function to be called for every completed run,
        receiving the instance, the path to the log file (before it is
        created) and the :class:`~moptipy.api.process.Process` of the run
        as parameters
    """
    random: Final[Generator] = default_rng()

    for warmup in ([True, False] if perform_pre_warmup else [False]):
        wss: str
        if warmup:
            wss = "pre-warmup"
        else:
            wss = "warmup"
            if perform_pre_warmup:
                gc.collect()  # do full garbage collection after pre-warmups
                gc.collect()  # one more, to be double-safe
                gc.freeze()  # whatever survived now, keep it permanently
                if pre_warmup_barrier:
                    logger(
                        "reached pre-warmup barrier.", thread_id, stdio_lock)
                    pre_warmup_barrier.wait()  # wait for all threads

        for runs in ([1] if warmup else n_runs):  # for each number of runs
            random.shuffle(cast(Sequence, experiments))  # shuffle experiments

            for setup in experiments:  # for each setup
                instance = setup[0]()  # load instance
                if instance is None:
                    raise TypeError("None is not an instance.")
                inst_name = sanitize_name(str(instance))

                exp = setup[1](instance)  # setup algorithm for instance
                if not isinstance(exp, Execution):
                    raise type_error(exp, "result of setup callable",
                                     Execution)
                # noinspection PyProtectedMember
                algo_name = sanitize_name(str(exp._algorithm))

                cd = Path(os.path.join(base_dir, algo_name, inst_name))
                cd.ensure_dir_exists()

                # generate sequence of seeds
                seeds: list[int] = [0] if warmup else \
                    rand_seeds_from_str(string=inst_name, n_seeds=runs)
                random.shuffle(seeds)
                needs_warmup = warmup or perform_warmup
                for seed in seeds:  # for every run

                    filename = sanitize_names(
                        [algo_name, inst_name, hex(seed)])
                    if warmup:
                        log_file = filename
                    else:
                        log_file = Path(
                            os.path.join(cd, filename + FILE_SUFFIX))

                        skip = True
                        with file_lock:
                            if cache(log_file):
                                skip = log_file.ensure_file_exists()
                        if skip:
                            continue  # run already done

                    exp.set_rand_seed(seed)

                    if needs_warmup:  # perform warmup run
                        needs_warmup = False
                        cpy: Execution = copy.copy(exp)
                        cpy.set_max_fes(
                            pre_warmup_fes if warmup else warmup_fes, True)
                        cpy.set_max_time_millis(3600000, True)
                        cpy.set_log_file(None)
                        cpy.set_log_improvements(False)
                        cpy.set_log_all_fes(False)
                        logger(
                            f"{wss} for {filename!r}.", thread_id, stdio_lock)
                        with cpy.execute():
                            pass
                        del cpy

                    if warmup:
                        continue

                    exp.set_log_file(log_file)
                    logger(filename, thread_id, stdio_lock)
                    with exp.execute() as process:  # run the experiment
                        on_completion(instance, cast(Path, log_file), process)


#: the number of logical CPU cores
_CPU_LOGICAL_CORES: Final[int] = psutil.cpu_count(logical=True)
#: the number of phyiscal CPU cores
_CPU_PHYSICAL_CORES: Final[int] = psutil.cpu_count(logical=False)
#: the logical cores per physical core
_CPU_LOGICAL_PER_PHYSICAL: Final[int] = \
    max(1, int(ceil(_CPU_LOGICAL_CORES / _CPU_PHYSICAL_CORES)))


[docs] class Parallelism(IntEnum): """ An enumeration of parallel thread counts. The `value` of each element of this enumeration equal the number of threads that would be used. Thus, the values are different on different systems. Currently, only Linux is supported for parallelism. For all other systems, you need to manually start the program as often as you want it to run in parallel. On other systems, all values of this enumeration are `1`. >>> Parallelism.SINGLE_THREAD.value 1 """ #: use only a single thread SINGLE_THREAD = 1 #: Use as many threads as accurate time measurement permits. This equals #: to using one logical core on each physical core and leaving one #: physical core unoccupied (but always using at least one thread, #: obviously). If you have four physical cores with two logical cores #: each, this would mean using three threads on Linux. #: On Windows, parallelism is not supported yet, so this would equal using #: only one core on Windows. ACCURATE_TIME_MEASUREMENTS = max(1, _CPU_PHYSICAL_CORES - 1) \ if "Linux" in platform.system() else 1 #: Use all but one logical core. This *will* mess up time measurements but #: should produce the maximum performance while not impeding the system of #: doing stuff like garbage collection or other bookkeeping and overhead #: tasks. This is the most reasonable option if you want to execute one #: experiment as quickly as possible. If you have four physical cores with #: two logical cores each, this would mean using seven threads on Linux. #: On Windows, parallelism is not supported yet, so this would equal using #: only one core on Windows. PERFORMANCE = max(1, _CPU_LOGICAL_CORES - 1) \ if "Linux" in platform.system() else 1 #: Use every single logical core available, which may mess up your system. #: We run the experiment as quickly as possible, but the system may not be #: usable while the experiment is running. Background tasks like garbage #: collection, moving the mouse course, accepting user input, or network #: communication may come to a halt. Seriously, why would you use this? #: If you have four physical cores with two logical cores each, this would #: mean using eight threads on Linux. #: On Windows, parallelism is not supported yet, so this would equal using #: only one core on Windows. RECKLESS = max(1, _CPU_LOGICAL_CORES) \ if "Linux" in platform.system() else 1
def __waiting_run_experiment( base_dir: Path, experiments: list[list[Callable]], n_runs: list[int], perform_warmup: bool, warmup_fes: int, perform_pre_warmup: bool, pre_warmup_fes: int, file_lock: AbstractContextManager, stdio_lock: AbstractContextManager, cache: Callable, thread_id: str, event, pre_warmup_barrier, on_completion: Callable[[Any, Path, Process], None]) -> None: """Wait until event is set, then run experiment.""" logger("waiting for start signal", thread_id, stdio_lock) if not event.wait(): raise ValueError("Wait terminated unexpectedly.") logger("got start signal, beginning experiment", thread_id, stdio_lock) update_sys_info_cpu_affinity() gc.collect() __run_experiment(base_dir, experiments, n_runs, perform_warmup, warmup_fes, perform_pre_warmup, pre_warmup_fes, file_lock, stdio_lock, cache, thread_id, pre_warmup_barrier, on_completion) def __no_complete(_: Any, __: Path, ___: Process) -> None: """Do nothing."""
[docs] def run_experiment( base_dir: str, instances: Iterable[Callable[[], Any]], setups: Iterable[Callable[[Any], Execution]], n_runs: int | Iterable[int] = 11, n_threads: int = Parallelism.ACCURATE_TIME_MEASUREMENTS, perform_warmup: bool = True, warmup_fes: int = 20, perform_pre_warmup: bool = True, pre_warmup_fes: int = 20, on_completion: Callable[[Any, Path, Process], None] = __no_complete) \ -> Path: """ Run an experiment and store the log files into the given folder. This function will automatically run an experiment, i.e., apply a set `setups` of algorithm setups to a set `instances` of problem instances for `n_runs` each. It will collect log files and store them into an appropriate folder structure under the path `base_dir`. It will automatically draw random seeds for all algorithm runs using :func:`moptipy.utils.nputils.rand_seeds_from_str` based on the names of the problem instances to solve. This yields replicable experiments, i.e., running the experiment program twice will yield exactly the same runs in exactly the same file structure (give and take clock-time dependent issues, which obviously cannot be controlled in a deterministic fashion). This function will use `n_threads` separate processes to parallelize the whole experiment (if you do not specify `n_threads`, it will be chosen automatically). Note for Windows users: The parallelization will not work under Windows. However, you can achieve *almost* the same effect and performance as for `n_threads=N` if you set `n_threads=1` and simply start the program `N` times separately (in separate terminals and in parallel). Of course, all `N` processes must have the same `base_dir` parameter. They will then automatically share the workload. :param base_dir: the base directory where to store the results :param instances: an iterable of callables, each of which should return an object representing a problem instance, whose `__str__` representation is a valid name :param setups: an iterable of callables, each receiving an instance (as returned by instances) as input and producing an :class:`moptipy.api.execution.Execution` as output :param n_runs: the number of runs per algorithm-instance combination :param n_threads: the number of parallel threads of execution to use. This parameter only works under Linux! It should be set to 1 under all other operating systems. Under Linux, by default, we will use the number of physical cores - 1 processes. The default value for `n_threads` is computed in \ :py:const:`~moptipy.api.experiment.Parallelism.ACCURATE_TIME_MEASUREMENTS`, which will be different for different machines(!). We will try to distribute the threads over different logical and physical cores to minimize their interactions. If n_threads is less or equal the number of physical cores, then multiple logical cores will be assigned to each process. If less threads than the number of physical cores are spawned, we will leave one physical core unoccupied. This core may be used by the operating system or other processes for their work, thus reducing interference of the os with our experiments. :param perform_warmup: should we perform a warm-up for each instance? If this parameter is `True`, then before the very first run of a thread on an instance, we will execute the algorithm for just a few function evaluations without logging and discard the results. The idea is that during this warm-up, things such as JIT compilation or complicated parsing can take place. While this cannot mitigate time measurement problems for JIT compilations taking place late in runs, it can at least somewhat solve the problem of delayed first FEs caused by compilation and parsing. :param warmup_fes: the number of the FEs for the warm-up runs :param perform_pre_warmup: should we do one warmup run for each instance before we begin with the actual experiments? This complements the warmups defined by `perform_warmup`. It could be that, for some reason, JIT or other activities may lead to stalls between multiple processes when code is encountered for the first time. This may or may not still cause strange timing issues even if `perform_warmup=True`. We therefore can do one complete round of warmups before starting the actual experiment. After that, we perform one garbage collection run and then freeze all objects surviving it to prevent them from future garbage collection runs. All processes that execute the experiment in parallel will complete their pre-warmup and only after all of them have completed it, the actual experiment will begin. I am not sure whether this makes sense or not, but it also would not hurt. :param pre_warmup_fes: the FEs for the pre-warmup runs :param on_completion: a function to be called for every completed run, receiving the instance, the path to the log file (before it is created) and the :class:`~moptipy.api.process.Process` of the run as parameters :returns: the canonicalized path to `base_dir` """ if not isinstance(instances, Iterable): raise type_error(instances, "instances", Iterable) if not isinstance(setups, Iterable): raise type_error(setups, "setups", Iterable) if not isinstance(perform_warmup, bool): raise type_error(perform_warmup, "perform_warmup", bool) if not isinstance(perform_pre_warmup, bool): raise type_error(perform_pre_warmup, "perform_pre_warmup", bool) check_int_range(warmup_fes, "warmup_fes", 1, 1_000_000) check_int_range(pre_warmup_fes, "pre_warmup_fes", 1, 1_000_000) check_int_range(n_threads, "n_threads", 1, 16384) instances = list(instances) if len(instances) <= 0: raise ValueError("Instance enumeration is empty.") for instance in instances: if not callable(instance): raise type_error(instance, "all instances", call=True) sysinfo_check: str = get_sys_info() if not isinstance(sysinfo_check, str): raise type_error(sysinfo_check, "system information", str) if len(sysinfo_check) <= 0: raise ValueError(f"invalid system info {sysinfo_check!r}!") setups = list(setups) if len(setups) <= 0: raise ValueError("Setup enumeration is empty.") for setup in setups: if not callable(setup): raise type_error(setup, "all setups", call=True) experiments: Final[list[list[Callable]]] = \ [[ii, ss] for ii in instances for ss in setups] del instances del setups if len(experiments) <= 0: raise ValueError("No experiments found?") n_runs = [n_runs] if isinstance(n_runs, int) else list(n_runs) last = 0 for run in n_runs: last = check_int_range(run, "n_runs", last + 1) cache: Final[Callable[[str], bool]] = str_is_new() use_dir: Final[Path] = Path(base_dir) use_dir.ensure_dir_exists() stdio_lock: AbstractContextManager if n_threads > 1: file_lock: AbstractContextManager = mp.Lock() stdio_lock = mp.Lock() logger(f"starting experiment with {n_threads} threads " f"on {_CPU_LOGICAL_CORES} logical cores, " f"{_CPU_PHYSICAL_CORES} physical cores (i.e.," f" {_CPU_LOGICAL_PER_PHYSICAL} logical cores per physical " "core).", "", stdio_lock) event: Final = mp.Event() pre_warmup_barrier: Final = mp.Barrier(n_threads) \ if perform_pre_warmup else None processes: Final[list[mp.Process]] = \ [mp.Process(target=__waiting_run_experiment, args=(use_dir, experiments.copy(), n_runs, perform_warmup, warmup_fes, perform_pre_warmup, pre_warmup_fes, file_lock, stdio_lock, cache, ":" + hex(i)[2:], event, pre_warmup_barrier, on_completion)) for i in range(n_threads)] for i, p in enumerate(processes): p.start() logger(f"started processes {hex(i)[2:]} in waiting state.", "", stdio_lock) # try to distribute the load evenly over all cores n_cpus: int = _CPU_PHYSICAL_CORES core_ofs: int = 0 if n_threads < n_cpus: n_cpus -= 1 core_ofs = _CPU_LOGICAL_PER_PHYSICAL n_cores: Final[int] = n_cpus * _CPU_LOGICAL_PER_PHYSICAL n_cores_per_thread: Final[int] = max(1, n_cores // n_threads) last_core: int = 0 for i, p in enumerate(processes): pid: int = int(p.pid) aff: list[int] = [] for _ in range(n_cores_per_thread): aff.append(int((last_core + core_ofs) % _CPU_LOGICAL_CORES)) last_core += 1 psutil.Process(pid).cpu_affinity(aff) logger(f"set affinity of processes {hex(i)[2:]} with " f"pid {pid} ({hex(pid)}) to {aff}.", "", stdio_lock) logger("now releasing lock and starting all processes.", "", stdio_lock) event.set() for i, p in enumerate(processes): p.join() logger(f"processes {hex(i)[2:]} has finished.", "", stdio_lock) else: logger(f"starting experiment with single thread " f"on {_CPU_LOGICAL_CORES} logical cores, " f"{_CPU_PHYSICAL_CORES} physical cores (i.e.," f" {_CPU_LOGICAL_PER_PHYSICAL} logical cores per physical " "core).") stdio_lock = nullcontext() __run_experiment(base_dir=use_dir, experiments=experiments, n_runs=n_runs, perform_warmup=perform_warmup, warmup_fes=warmup_fes, perform_pre_warmup=perform_pre_warmup, pre_warmup_fes=pre_warmup_fes, file_lock=nullcontext(), stdio_lock=stdio_lock, cache=cache, thread_id="", pre_warmup_barrier=None, on_completion=on_completion) logger("finished experiment.", "", stdio_lock) return use_dir