Source code for moptipy.examples.jssp.instance_selector

"""Code for selecting interesting instances for smaller-scale experiments."""
import multiprocessing as mp
from math import ceil, factorial, inf, log2, sqrt
from os import sched_getaffinity
from typing import Callable, Final

import numpy as np  # type: ignore
from numpy.random import Generator, RandomState  # type: ignore
from pycommons.io.console import logger
from pycommons.types import check_int_range, type_error
from sklearn.cluster import SpectralClustering  # type: ignore

from moptipy.algorithms.so.rls import RLS
from moptipy.api.execution import Execution
from moptipy.examples.jssp.gantt_space import GanttSpace
from moptipy.examples.jssp.instance import Instance
from moptipy.examples.jssp.makespan import Makespan
from moptipy.examples.jssp.ob_encoding import OperationBasedEncoding
from moptipy.operators.permutations.op0_shuffle import Op0Shuffle
from moptipy.operators.permutations.op1_swap2 import Op1Swap2
from moptipy.spaces.permutations import Permutations
from moptipy.utils.nputils import (
    DEFAULT_FLOAT,
    DEFAULT_INT,
    rand_generator,
    rand_seeds_from_str,
)


def __can_solve_instance(inst: Instance, seed: int,
                         queue: mp.Queue) -> None:
    """
    Execute one run of a RLS for 2.1min on the instance, return makespan.

    :param inst: the jssp instance to check
    :param seed: the seed of this run
    :param queue: the queue for the return values
    """
    x_space: Final[Permutations] = Permutations.with_repetitions(
        inst.jobs, inst.machines)
    y_space: Final[GanttSpace] = GanttSpace(inst)
    g: Final[OperationBasedEncoding] = OperationBasedEncoding(inst)
    o0: Final[Op0Shuffle] = Op0Shuffle(x_space)
    o1: Final[Op1Swap2] = Op1Swap2()
    f: Final[Makespan] = Makespan(inst)
    a: Final[RLS] = RLS(o0, o1)
    goal: Final[int] = inst.makespan_lower_bound

    ex: Final[Execution] = Execution()
    ex.set_search_space(x_space)
    ex.set_solution_space(y_space)
    ex.set_encoding(g)
    ex.set_algorithm(a)
    ex.set_objective(f)
    ex.set_max_time_millis(126_000)
    ex.set_goal_f(goal)
    ex.set_rand_seed(seed)
    with ex.execute() as process:
        queue.put(process.get_best_f())


def __is_instance_too_easy(inst: Instance) -> bool:
    """
    Check if an instance is too easy to be of any interest.

    We perform 16 runs of a (1+1)-EA on an instance, granting 2.1 minutes
    of runtime to each of them. An instance is considered as easy if one of
    the following conditions is fulfilled:

    - it is solved often enough (lower bound reached often)
    - there are not enough different results
    - the mean result is too close to the lower bound
    - the worst result is not far enough from the lower bound
    - the worst result is too close to the best result

    If any of these conditions hold, we consider the instance as too easy to
    be of any interest for our educational experiment.

    :param inst: the jssp instance to check
    :returns: `True` if the instance is too easy to be interesting
    """
    n_runs: Final[int] = 16
    seeds: Final[list[int]] = rand_seeds_from_str(string=inst.name,
                                                  n_seeds=n_runs)
    queue: Final[mp.Queue] = mp.Queue()

    while len(seeds) > 0:
        processes = [mp.Process(target=__can_solve_instance,
                                args=(inst, seeds.pop(), queue))
                     for _ in range(min(len(seeds), __DEFAULT_N_THREADS))]
        for p in processes:
            p.start()
        for p in processes:
            p.join()

    goal: Final[int] = inst.makespan_lower_bound
    num_solved: int = 0
    diff: set[int] = set()
    min_makespan: int = 1 << 62
    sum_makespans: int = 0
    max_makespan: int = 0

    for _ in range(n_runs):
        makespan: int = queue.get()
        diff.add(makespan)
        if makespan <= min_makespan:
            min_makespan = makespan
            if makespan <= goal:
                num_solved += 1
        sum_makespans += makespan
        max_makespan = max(max_makespan, makespan)

    mean_makespan: Final[float] = sum_makespans / n_runs
    mean_threshold: Final[float] = min(goal + 32.0,
                                       max(1.01 * goal, goal + 3))
    max_threshold: Final[float] = min(goal + 64.0,
                                      max(1.02 * goal, goal + 5))
    min_threshold: Final[float] = min(min_makespan + 48.0,
                                      max(1.015 * min_makespan,
                                          min_makespan + 4))
    n_diff: Final[int] = len(diff)
    diff_threshold: Final[int] = max(3, int(0.5 + ceil(log2(n_runs))))
    solved_threshold: Final[int] = int(ceil(n_runs / 3))

    result: Final[bool] = (num_solved >= solved_threshold) \
        or (mean_makespan < mean_threshold) \
        or (max_makespan < max_threshold) \
        or (min_threshold > max_makespan) \
        or (n_diff <= diff_threshold)
    logger(f"Instance {inst.name} is {'easy' if result else 'hard'}, "
           f"since {n_runs} runs of the (1+1)-EA had {n_diff} different "
           f"results (threshold {diff_threshold}), a best result of "
           f"{min_makespan}, a mean result of {mean_makespan} (threshold: "
           f"{mean_threshold}), a worst result of {max_makespan} "
           f"(thresholds: {max_threshold}, {min_threshold}), and "
           f"reached the makespan lower bound ({goal}) {num_solved} "
           f"times (threshold: {solved_threshold}).")
    return result


#: The default number of threads to be used
__DEFAULT_N_THREADS: Final[int] = max(1, min(len(sched_getaffinity(0)), 128))


[docs] def compute_instances_that_are_too_easy() -> tuple[str, ...]: """ Compute the set of instances that are too easy to be of any interest. For our educational experiments, we need instances that are not always solved to optimality by simple algorithms. Otherwise, for example, the box plots of the end results will collapse to single lines. Then, it will also be impossible to really say which algorithm performs best on the instance. See the documentation of `__is_instance_too_easy` for the conditions that lead to the rejection of an instance :returns: the tuple of instance names that should not be included in the experiment """ logger("We now test the instances whether they are too easy.") easy: Final[list[str]] = [] for inst_name in Instance.list_resources(): if __is_instance_too_easy(Instance.from_resource(inst_name)): easy.append(inst_name) easy.sort() logger(f"Finished the test, found {len(easy)} easy instances: {easy}.") return tuple(easy)
#: This is the set of instances that are too easy to be interesting and shall #: therefore be skipped from our experiments. #: For a documentation about what constitutes 'too easy', see #: :func:`compute_instances_that_are_too_easy` __TOO_EASY: Final[set[str]] = \ {"demo", "dmu21", "dmu22", "dmu24", "dmu25", "dmu31", "dmu32", "dmu33", "dmu34", "dmu35", "ft06", "ft20", "la01", "la02", "la03", "la04", "la05", "la06", "la07", "la08", "la09", "la10", "la11", "la12", "la13", "la14", "la15", "la17", "la18", "la23", "la26", "la28", "la30", "la31", "la32", "la33", "la34", "la35", "swv16", "swv17", "swv18", "swv19", "swv20", "ta51", "ta52", "ta53", "ta54", "ta55", "ta56", "ta57", "ta58", "ta59", "ta60", "ta61", "ta63", "ta68", "ta69", "ta71", "ta72", "ta73", "ta74", "ta75", "ta76", "ta77", "ta78", "ta79", "ta80"} def __get_instances() -> list[Instance]: """ Get the instances. :return: a tuple of instances """ return [Instance.from_resource(name) for name in Instance.list_resources() if name not in __TOO_EASY] def __optimize_clusters(cluster_groups: tuple[tuple[int, ...], ...], n_groups: int, extreme_groups: tuple[tuple[tuple[int, int], ...], ...], random: Generator) -> tuple[int, ...]: """ Try to find optimal cluster-to-group assignments. We perform hill climbing with restarts. :param cluster_groups: the groups per cluster :param n_groups: the number of groups :param extreme_groups: the extreme groups :param random: the random number generator :return: the selected groups """ n: Final[int] = len(cluster_groups) current: Final[np.ndarray] = np.zeros(n, DEFAULT_INT) current_f: tuple[int, int, int, int, float] best: Final[np.ndarray] = np.zeros(n, DEFAULT_INT) best_f: tuple[int, int, int, int, float] total_best: Final[np.ndarray] = np.zeros(n, DEFAULT_INT) total_best_f: tuple[int, int, int, int, float] = (-1, -1, -1, -1, -1) run_last_improved: int = 1 run_current: int = 0 run_max_none_improved: Final[int] = 4 step_max_none_improved: Final[int] = int((2 + (n_groups * n)) ** 2) done: Final[np.ndarray] = np.zeros(n_groups, DEFAULT_INT) # noqa extremes: Final[set[int]] = set() # noqa logger(f"Beginning to optimize the assignment of {len(cluster_groups)} " f"clusters to {n_groups} groups. The minimum groups are " f"{extreme_groups[0]}, the maximum groups are {extreme_groups[1]}." f" We permit {run_max_none_improved} runs without improvement " f"before termination and {step_max_none_improved} FEs without " "improvement before stopping a run.") def __f(sol: np.ndarray) -> tuple[int, int, int, int, float]: """ Compute the quality: The internal objective function. We maximize five things: First, we maximize the number of group assignments that allow us to include the extreme instances in the result. These are the smallest and largest-scale instances. Second, we maximize the number of instance sets covered. Third, we maximize the minimum number of usages of the instance sets. Fourth, we minimize the maximum number of usages of the instance sets with the goal to balance the usages evenly over different sets. Fifth, we minimize the standard deviation of the instance set usages, again with the goal to balance the usages evenly over different sets. :param sol: the solution :return: the objective values """ nonlocal done # the number of uses per set nonlocal extreme_groups # the tuple-tuple with the extreme groups nonlocal extremes # the extreme groups already picked # first, we count how often each group has been used done.fill(0) for group in sol: done[group] += 1 # second, we check which cluster-to-group assignment permits # using an extreme instance extremes.clear() for groups in extreme_groups: for group in groups: if (sol[group[0]] == group[1]) and (group[0] not in extremes): extremes.add(group[0]) break return len(extremes), int(np.sum(done > 0)), done.min(), \ -done.max(), -np.std(done) # The outer loop: the restarts of the hill climber. # We continue to execute runs until a given number of successive runs did # not improve the overall best solutions. Then we stop. while (run_current - run_last_improved) <= run_max_none_improved: run_current += 1 # generate an initial best solution for i in range(n): cg = cluster_groups[i] best[i] = cg[random.integers(len(cg))] best_f = __f(best) # compute quality use_f = best_f if best_f > total_best_f: # accept solution if it is better total_best_f = best_f run_last_improved = run_current # new global best np.copyto(total_best, best) logger(f"New global best {tuple(best)} with quality " f"{best_f} found in run {run_current}.") else: logger(f"Run {run_current} starts with quality {best_f}.") # local search step # We continue the local search until a given number of successive # steps did not yield any improvement. Then we terminate the run. step_current: int = 0 step_last_improved: int = 1 while (step_current - step_last_improved) <= step_max_none_improved: step_current += 1 np.copyto(current, best) while True: # perform at least one move i = random.integers(n) cg = cluster_groups[i] lcg = len(cg) if lcg <= 1: continue while True: # perform the search move new_v = cg[random.integers(lcg)] if new_v != best[i]: current[i] = new_v break # 50/50 chance to perform more moves if random.integers(2) <= 0: break current_f = __f(current) # evaluate quality # accept if better or with small probability if (current_f >= use_f) or (random.integers(50) < 1): use_f = current_f np.copyto(best, current) if current_f > best_f: best_f = current_f step_last_improved = step_current if current_f > total_best_f: # new optimum total_best_f = current_f run_last_improved = run_current np.copyto(total_best, current) logger(f"New global best {tuple(best)} with quality" f" {total_best_f} found in run {run_current} " f"after {step_current} FEs.") logger(f"Run {run_current} finished with quality {best_f}.") result = tuple(total_best) logger(f"Finished after {run_current} runs with solution {result} of " f"quality {total_best_f}.") return result def __optimize_scales(scale_choices: list[list[tuple[int, int]]], random: Generator) -> list[int]: """ Pick a diverse scale choice. :param scale_choices: the scale choices we have per group :param random: the random number generator :returns: one chosen scale per group """ n: Final[int] = len(scale_choices) x_total_best: list[int] = [0] * n opt_idx: list[tuple[int, int]] = [] # the relevant indexes opt_sum: int = 0 for idx, choices in enumerate(scale_choices): if len(choices) > 1: opt_sum += len(choices) opt_idx.append((idx, len(choices))) if not opt_idx: return x_total_best def __f(xx: list[int]) -> tuple[float, float, int]: """ Compute the quality of a suggestion, bigger is better. :param xx: the candidate solution :returns: a tuple with minimum distance, distance sum, and difference count """ dist_min: float = inf dist_sum: float = 0 diff_cnt: int = 0 nonlocal scale_choices nonlocal n for i in range(n): a = scale_choices[i][xx[i]] for j in range(i): b = scale_choices[j][xx[j]] if a[0] == b[0]: d0 = 0 else: diff_cnt += 1 d0 = a[0] - b[0] if a[1] == b[1]: d1 = 0 else: diff_cnt += 1 d1 = a[1] - b[1] d = sqrt((d0 * d0) + (d1 * d1)) dist_min = min(dist_min, d) dist_sum += d return dist_min, dist_sum, diff_cnt f_total_best: tuple[float, float, int] = __f(x_total_best) logger(f"The following index-choices tuple exist: {opt_idx} and " f"the initial choice is {x_total_best} at quality {f_total_best}.") x_cur_best: list[int] = [0] * n x_cur: list[int] = [0] * n for _ in range(int(opt_sum ** 2.35)): for ii, sc in enumerate(scale_choices): x_cur_best[ii] = random.integers(len(sc)) f_cur_best: tuple[float, float, int] = __f(x_cur_best) if f_cur_best > f_total_best: f_total_best = f_cur_best x_total_best.clear() x_total_best.extend(x_cur_best) logger(f"Improved to {x_total_best} at quality {f_total_best}.") for _ in range(int(opt_sum ** 2.35)): idx, choicen = opt_idx[random.integers(len(opt_idx))] old = x_cur[idx] while old == x_cur[idx]: x_cur[idx] = random.integers(choicen) f_cur = __f(x_cur) if f_cur >= f_cur_best: f_cur_best = f_cur x_cur_best.clear() x_cur_best.extend(x_cur) if f_cur_best > f_total_best: f_total_best = f_cur_best x_total_best.clear() x_total_best.extend(x_cur_best) logger(f"Improved to {x_total_best} " f"at quality {f_total_best}.") return x_total_best def __scale(jobs: int, machines: int) -> int: """ Compute the scale of a JSSP instance. :param jobs: the jobs :param machines: the machines :returns: the scale """ return factorial(jobs) ** machines
[docs] def propose_instances(n: int, get_instances: Callable = __get_instances) -> \ tuple[str, ...]: """ Propose a set of instances to be used for our experiment. This function is used to obtain the instances chosen for the JSSP experiment. You can also use it for other experiments, by using your own instance source and/or for selecting more or less JSSP instances. Basically, it will accept a function `get_instances`, which must produce a sequence of Instance objects. Each instance has a name (e.g., `dmu14`) and a group, where the group is the name without any numerical suffix (e.g., `dmu`). This function will then select `n` instances from the instance set with the goal to maximize the diversity of the instances, to include instances from as many groups as possible, and to include one instance of the smallest and one of the largest scale. The diversity is measured in terms of the numbers of jobs and machines, the instance scale, the minimum and maximum operation length, the standard deviation of the mean operation lengths over the jobs, the makespan bounds, and so on. First, features are computed for each instance. Second, the instances are clustered into `n` clusters. Third, we try to pick groups for each cluster such that a) the minimum and maximum-scale instances can be included and b) that instances from as many groups as possible are picked. Third, we then randomly pick one instance for each cluster from the selected group (while trying to pick the minimum and maximum-scale instances). Finally, the chosen instance names are listed as sorted tuple and returned. :param n: the number of instances to be proposed :param get_instances: a function returning an iterable of instances. :return: a tuple with the instance names """ check_int_range(n, "n", 1, 1000) if not callable(get_instances): raise type_error(get_instances, "get_instances", call=True) instances = list(get_instances()) n_instances: Final[int] = len(instances) if n_instances <= 0: raise ValueError("No instance returned by get_instance.") for i in instances: if isinstance(i, Instance): raise type_error( i, "value in list returned by get_instances", Instance) if n_instances <= n: if n_instances == n: # nothing to do here return tuple(sorted([inst.get_name() for inst in instances])) raise ValueError(f"{n} instances requested, but only " f"{n_instances} available.") logger(f"We will pick {n} instances out of {n_instances}.") random: Final[Generator] = rand_generator(0) # the random number generator random.shuffle(instances) inst_names: Final[tuple[str, ...]] = tuple([ # get the instance names inst.get_name() for inst in instances]) inst_sizes: Final[list[tuple[int, int]]] =\ [(inst.jobs, inst.machines) for inst in instances] # the group to which the instances belong rm = str.maketrans("", "", "0123456789") inst_groups: Final[tuple[str, ...]] = tuple([ # get the instance groups inst.translate(rm) for inst in inst_names]) # create bi-directional mapping between group names and integer IDs group_ids: Final[dict[str, int]] = {} id_groups: Final[dict[int, str]] = {} for group in inst_groups: if group in group_ids: continue gid = len(group_ids) group_ids[group] = gid id_groups[gid] = group n_groups: Final[int] = len(group_ids) logger(f"The {n_instances} instances belong to the {n_groups} " f"groups {sorted(group_ids.keys())}.") # compute the feature matrix base_features: Final[int] = 9 features = np.zeros((len(instances), base_features + n_groups), DEFAULT_FLOAT) min_scale_val = inf min_scale_inst: set[int] = set() max_scale_val = -inf max_scale_inst: set[int] = set() for i, inst in enumerate(instances): features[i, 0] = inst.jobs features[i, 1] = inst.machines features[i, 2] = inst.jobs / inst.machines scale = __scale(inst.jobs, inst.machines) if scale <= min_scale_val: if scale < min_scale_val: min_scale_val = scale min_scale_inst.clear() min_scale_inst.add(i) if scale >= max_scale_val: if scale > max_scale_val: max_scale_inst.clear() max_scale_val = scale max_scale_inst.add(i) features[i, 3] = log2(log2(scale)) m = inst.matrix[:, 1::2] features[i, 4] = np.std(np.mean(m, axis=0)) features[i, 5] = np.min(m) features[i, 6] = np.max(m) features[i, 7] = inst.makespan_lower_bound features[i, 8] = inst.makespan_upper_bound / inst.makespan_lower_bound features[i, base_features + group_ids[inst_groups[i]]] = 1 del instances logger(f"We computed {base_features + n_groups} features for each " f"instance, namely {base_features} features and {n_groups}" f" group features. The instances with the smallest scale " f" {min_scale_val} are {[inst_names[i] for i in min_scale_inst]} " f" (encoded as {min_scale_inst}) and those of the largest scale " f"{max_scale_val} are {[inst_names[i] for i in max_scale_inst]} " f"(encoded as {max_scale_inst}). Now we will cluster the " f"instances.") # standardize feature columns for i in range(base_features): features[:, i] = (features[:, i] - np.mean(features[:, i])) \ / np.std(features[:, i]) # so now we have the following things: # 1. a list `instances` of instance names # 2. the list `instance_groups` with the corresponding groups # 3. set `groups` with the group names # 4. the matrix `features` with instance features # 5. the bi-directional mapping between instance groups and group IDs # 6. the instance/group indexes for the smallest and largest-scale # instances # now we cluster the instances model = SpectralClustering(n_clusters=n, n_init=100, random_state=RandomState( random.integers(2 ** 32))) clusters = model.fit_predict(features) if len(clusters) != n_instances: raise ValueError( f"Invalid number {len(clusters)} of cluster " f"assignments to {n_instances} instances.") if (max(clusters) - min(clusters) + 1) != n: raise ValueError(f"Expected {n} clusters, but got " f"{max(clusters) - min(clusters) + 1}.") logger(f"Found clusters {clusters}. The minimum instances are in" f"{[clusters[i] for i in min_scale_inst]}. The maximum instances " f"are in {[clusters[i] for i in max_scale_inst]}. now assigning" f"clusters to groups.") # find which groups belong to which cluster cluster_groups: Final[tuple[tuple[int, ...], ...]] = tuple([ tuple(sorted({group_ids[inst_groups[j]] for j in np.where(clusters == i)[0]})) for i in range(n)]) logger(f"The groups available per cluster are {cluster_groups}.") # Now we need to pick the extreme groups. extreme_groups = tuple(tuple(sorted( {(clusters[xx], group_ids[inst_groups[xx]]) for xx in ex})) for ex in [min_scale_inst, max_scale_inst]) logger(f"The extreme groups are {extreme_groups}.") # With this method, we choose one instance group for each cluster. chosen_groups = __optimize_clusters(cluster_groups=cluster_groups, extreme_groups=extreme_groups, n_groups=n_groups, random=random) logger(f"The instance groups {chosen_groups} were chosen for the" f" {n} clusters.") # OK, we have picked one instance group per cluster. # Now we need to pick the instance scales from these groups. # The goal is to now select instances with (jobs, machines) settings as # diverse as possible, while adhering to the selected instance groups. # For example, a selected group may be of the family "la", but there # might be instances of different (job, machines) settings in the cluster # for this group. # In this case, we want to pick those which do not already occur in other # clusters. # If we can, we will pick the instances with the minimum and the maximum # scales. # In a first step, we find out needs_min_scale = True needs_max_scale = True scale_choices: list[list[tuple[int, int]]] = [] inst_choices: list[list[str]] = [] for i in range(n): elements = np.where(clusters == i)[0] sgroup: str = id_groups[chosen_groups[i]] possibility: set[int] = {i for i in elements if inst_groups[i] == sgroup} cur_scale_choices: list[tuple[int, int]] = [] scale_choices.append(cur_scale_choices) cur_inst_choices: list[str] = [] inst_choices.append(cur_inst_choices) can_skip: bool = False if needs_min_scale: test = possibility.intersection(min_scale_inst) if len(test) > 0: logger( f"Choosing from groups {[inst_names[t] for t in test]} " f"for cluster {i} to facilitate minimum-scale instance.") sel = list(test) seli = sel[random.integers(len(sel))] possibility = {seli} cur_scale_choices.append(inst_sizes[seli]) cur_inst_choices.append(inst_names[seli]) can_skip = True needs_min_scale = False if needs_max_scale: test = possibility.intersection(max_scale_inst) if len(test) > 0: logger( f"Choosing from groups {[inst_names[t] for t in test]} " f"for cluster {i} to facilitate maximum-scale instance.") needs_max_scale = False if can_skip: continue sel = list(test) seli = sel[random.integers(len(sel))] cur_scale_choices.append(inst_sizes[seli]) cur_inst_choices.append(inst_names[seli]) continue if can_skip: continue scales: set[tuple[int, int]] = set() sel = list(possibility) random.shuffle(sel) for ii in sel: tup = inst_sizes[ii] if tup not in scales: scales.add(tup) cur_inst_choices.append(inst_names[ii]) cur_scale_choices.append(tup) logger(f"Got the scale choices {scale_choices} resulting from the " f"possible instances {inst_choices}.") # do the actual scale optimization final_sel = __optimize_scales(scale_choices, random) res_tmp: Final[list[tuple[int, str]]] = \ [(__scale(scale_choices[i][k][0], scale_choices[i][k][1]), inst_choices[i][k]) for i, k in enumerate(final_sel)] res_tmp.sort() result: Final[list[str]] = [sey[1] for sey in res_tmp] # Finally, we sort and finalize the set of chosen instances. logger(f"Found final instance selection {result}.") return tuple(result)