Coverage for moptipy / api / _process_base.py: 86%
321 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 10:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 10:36 +0000
1"""An internal module with the base class for implementing Processes."""
2from io import StringIO
3from math import inf, isfinite
4from threading import Lock, Timer
5from time import time_ns
6from traceback import print_tb
7from typing import Any, Callable, Final, Iterable, cast
9from numpy.random import Generator
10from pycommons.io.path import Path
11from pycommons.types import type_error, type_name_of
13from moptipy.api.algorithm import Algorithm, check_algorithm
14from moptipy.api.improvement_logger import ImprovementLogger
15from moptipy.api.logging import (
16 _ALL_SECTIONS,
17 ERROR_SECTION_PREFIX,
18 KEY_BEST_F,
19 KEY_EXCEPTION_STACK_TRACE,
20 KEY_EXCEPTION_TYPE,
21 KEY_EXCEPTION_VALUE,
22 KEY_GOAL_F,
23 KEY_LAST_IMPROVEMENT_FE,
24 KEY_LAST_IMPROVEMENT_TIME_MILLIS,
25 KEY_MAX_FES,
26 KEY_MAX_TIME_MILLIS,
27 KEY_RAND_BIT_GENERATOR_TYPE,
28 KEY_RAND_GENERATOR_TYPE,
29 KEY_RAND_SEED,
30 KEY_TOTAL_FES,
31 KEY_TOTAL_TIME_MILLIS,
32 SCOPE_ALGORITHM,
33 SCOPE_OBJECTIVE_FUNCTION,
34 SCOPE_PROCESS,
35 SCOPE_SOLUTION_SPACE,
36 SECTION_ERROR_BEST_F,
37 SECTION_ERROR_IN_CONTEXT,
38 SECTION_ERROR_IN_LOG,
39 SECTION_ERROR_IN_RUN,
40 SECTION_ERROR_INVALID_X,
41 SECTION_ERROR_INVALID_Y,
42 SECTION_ERROR_TIMING,
43 SECTION_FINAL_STATE,
44 SECTION_RESULT_Y,
45 SECTION_SETUP,
46)
47from moptipy.api.objective import Objective, check_objective
48from moptipy.api.process import (
49 Process,
50 check_goal_f,
51 check_max_fes,
52 check_max_time_millis,
53)
54from moptipy.api.space import Space, check_space
55from moptipy.utils.logger import (
56 SECTION_END,
57 SECTION_START,
58 FileLogger,
59 KeyValueLogSection,
60 Logger,
61)
62from moptipy.utils.nputils import (
63 rand_generator,
64 rand_seed_check,
65 rand_seed_generate,
66)
67from moptipy.utils.sys_info import log_sys_info
70def _error_1(logger: Logger, title: str, exception_type,
71 exception_value, traceback,
72 error_repl: str = f"{ERROR_SECTION_PREFIX!r}") -> None:
73 """
74 Create a text section with error information as from a contextmanager.
76 :param logger: the logger to write to
77 :param title: the title of the section with error information to be
78 created
79 :param exception_type: the exception type
80 :param exception_value: the exception value
81 :param traceback: the traceback
82 :param error_repl: a replacement for the error section prefix
83 """
84 if exception_type or exception_value or traceback:
85 with logger.text(title=title) as ts:
86 wt: Final[Callable[[str], None]] = ts.write
87 if exception_type:
88 if isinstance(exception_type, str):
89 if exception_type.startswith("<class '"):
90 exception_type = exception_type[8:-2]
91 else:
92 exception_type = type_name_of(exception_type)
93 wt(f"{KEY_EXCEPTION_TYPE}: {str.strip(exception_type)}")
94 if exception_value:
95 exception_value = str.strip(str(exception_value))
96 wt(f"{KEY_EXCEPTION_VALUE}: {exception_value}")
97 if traceback:
98 got: Final[list[str]] = []
99 sio: Final[StringIO] = StringIO()
100 print_tb(traceback, file=sio)
101 for line in str.splitlines(sio.getvalue()):
102 ll: str = str.strip(line)
103 if str.__len__(ll) <= 0:
104 continue
105 got.append(str.replace(
106 ll, ERROR_SECTION_PREFIX, error_repl))
107 if list.__len__(got) > 0:
108 wt(f"{KEY_EXCEPTION_STACK_TRACE}:")
109 for ll in got:
110 wt(ll)
113def _error_2(logger: Logger, title: str, exception: Exception) -> None:
114 """
115 Log an exception.
117 :param logger: the logger to write to
118 :param title: the title of the section with error information to be
119 created
120 :param exception: the exception
122 >>> from moptipy.utils.logger import Logger
123 >>> def __do_print(s: str) -> None:
124 ... s = str.strip(s)
125 ... if "~~^~~" not in s:
126 ... print(s)
127 >>> ime = Logger("pl", __do_print)
128 >>> def k():
129 ... 1 / 0
130 >>> try:
131 ... k()
132 ... except Exception as be:
133 ... _error_2(ime, "ERROR", be)
134 BEGIN_ERROR
135 exceptionType: ZeroDivisionError
136 exceptionValue: division by zero
137 exceptionStackTrace:
138 File "<doctest moptipy.api._process_base._error_2[4]>", line 2, in \
139<module>
140 k()
141 File "<doctest moptipy.api._process_base._error_2[3]>", line 2, in k
142 1 / 0
143 END_ERROR
144 """
145 _error_1(logger, title, exception_type=exception,
146 exception_value=str(exception),
147 traceback=exception.__traceback__)
150#: the function used to get the time
151_TIME_IN_NS: Final[Callable[[], int]] = time_ns
154def _ns_to_ms(nanos: int) -> int:
155 """
156 Convert nanoseconds to milliseconds by rounding up.
158 :param int nanos: the nanoseconds
159 :returns: the corresponding milliseconds, rounded up
161 >>> _ns_to_ms(0)
162 0
163 >>> _ns_to_ms(1)
164 1
165 >>> _ns_to_ms(999_999)
166 1
167 >>> _ns_to_ms(1_000_000)
168 1
169 >>> _ns_to_ms(1_000_001)
170 2
171 >>> _ns_to_ms(1_999_999)
172 2
173 >>> _ns_to_ms(2_000_000)
174 2
175 >>> _ns_to_ms(2_000_001)
176 3
177 """
178 return (nanos + 999_999) // 1_000_000
181class _ProcessBase(Process):
182 """The internal base class for implementing optimization processes."""
184 def __init__(self,
185 solution_space: Space,
186 objective: Objective,
187 algorithm: Algorithm,
188 log_file: Path | None = None,
189 rand_seed: int | None = None,
190 max_fes: int | None = None,
191 max_time_millis: int | None = None,
192 goal_f: int | float | None = None,
193 improvement_logger: ImprovementLogger | None = None) -> None:
194 """
195 Perform the internal initialization. Do not call directly.
197 :param solution_space: the search- and solution space.
198 :param objective: the objective function
199 :param algorithm: the optimization algorithm
200 :param log_file: the optional log file
201 :param rand_seed: the optional random seed
202 :param max_fes: the maximum permitted function evaluations
203 :param max_time_millis: the maximum runtime in milliseconds
204 :param goal_f: the goal objective value. if it is reached, the process
205 is terminated
206 :param improvement_logger: an improvement logger, whose
207 :meth:`~ImprovementLogger.log_improvement` method will be invoked
208 whenever the process has registered an improvement
209 """
210 super().__init__()
211 #: This will be `True` after :meth:`terminate` has been called.
212 self._terminated: bool = False
213 #: This becomes `True` when :meth:`should_terminate` returned `True`.
214 self._knows_that_terminated: bool = False
215 #: The internal lock, needed to protect :meth:`terminate`.
216 self.__lock: Final[Lock] = Lock()
217 #: The maximum FEs.
218 self._max_fes: Final[int | None] = check_max_fes(max_fes, True)
219 #: A version of :attr:`_max_fes` that can be used in comparisons.
220 self._end_fes: Final[int] = 9_223_372_036_854_775_800 \
221 if (self._max_fes is None) else self._max_fes
222 #: The goal objective value.
223 self._goal_f: Final[int | float | None] = \
224 check_goal_f(goal_f, True)
225 #: A comparable version of :attr:`self._goal_f`.
226 self._end_f: int | float = \
227 -inf if (self._goal_f is None) else self._goal_f
228 #: The currently consumed nanoseconds.
229 self._current_time_nanos: int = 0
230 #: The currently consumed objective function evaluations (FEs).
231 self._current_fes: int = 0
232 #: The time (in nanoseconds) when the last improvement was made.
233 self._last_improvement_time_nanos: int = -1
234 #: The FE when the last improvement was made.
235 self._last_improvement_fe: int = -1
237 #: The solution space, i.e., the data structure of possible solutions.
238 self._solution_space: Final[Space] = check_space(solution_space)
239 #: The objective function rating candidate solutions.
240 self.__objective: Final[Objective] = check_objective(objective)
241 #: the internal invoker for the objective function
242 self._f: Final[Callable[[Any], int | float]] = \
243 self.__objective.evaluate
244 #: The algorithm to be applied.
245 self.__algorithm: Final[Algorithm] = check_algorithm(algorithm)
246 #: The random seed.
247 self.__rand_seed: Final[int] = rand_seed_generate() \
248 if rand_seed is None \
249 else rand_seed_check(rand_seed)
250 #: The random number generator.
251 self.__random: Final[Generator] = rand_generator(self.__rand_seed)
252 #: The current best solution.
253 self._current_best_y = solution_space.create()
254 #: The current best objective value
255 self._current_best_f: int | float = inf
256 #: The log file, or `None` is needed
257 if (log_file is not None) and (not isinstance(log_file, Path)):
258 raise type_error(log_file, "log_file", Path)
259 self.__log_file: Final[Path | None] = log_file
260 #: the method for copying y
261 self._copy_y: Final[Callable] = solution_space.copy
262 #: set up the method forwards
263 self.lower_bound = self.__objective.lower_bound # type: ignore
264 if self._end_f <= -inf:
265 self._end_f = check_goal_f(self.lower_bound())
266 self.lower_bound = lambda: self._end_f # type: ignore
267 self.upper_bound = objective.upper_bound # type: ignore
268 self.is_always_integer = objective.is_always_integer # type: ignore
269 self.create = solution_space.create # type: ignore
270 self.copy = solution_space.copy # type: ignore
271 self.to_str = solution_space.to_str # type: ignore
272 self.is_equal = solution_space.is_equal # type: ignore
273 self.from_str = solution_space.from_str # type: ignore
274 self.n_points = solution_space.n_points # type: ignore
275 self.validate = solution_space.validate # type: ignore
276 #: the internal section logger
277 self.__sections: dict[str, str] | None = \
278 None if log_file is None else {}
280 #: The time when the process was started, in nanoseconds.
281 self._start_time_nanos: Final[int] = _TIME_IN_NS()
282 #: The maximum runtime in milliseconds.
283 self._max_time_millis: Final[int | None] = \
284 check_max_time_millis(max_time_millis, True)
285 #: A comparable version of :attr:`_max_time_millis`, but representing
286 #: the end time in nanoseconds rounded to the next highest
287 #: millisecond.
288 self._end_time_nanos: Final[float | int] = \
289 inf if (self._max_time_millis is None) else \
290 _ns_to_ms(int(self._start_time_nanos
291 + (1_000_000 * self._max_time_millis))) \
292 * 1_000_000
293 #: The timer until the end-of-run, or `None` if there is no end time.
294 self.__timer: Final[Timer | None] = None \
295 if (self._max_time_millis is None) else \
296 Timer(interval=self._max_time_millis / 1_000.0,
297 function=self.terminate)
299 #: an internal base exception caught by the algorithm execution
300 self._caught: Exception | None = None
302 #: the internal improvement logger
303 self._log_improvement: Final[Callable[[
304 Callable[[Logger], None]], None] | None] = \
305 improvement_logger.log_improvement if improvement_logger else None
307 def _after_init(self) -> None:
308 """
309 Finish initialization, start timer for termination if needed.
311 Internal method that must be called after __init__ is completed.
312 """
313 if self.__timer is not None:
314 self.__timer.start()
316 def get_log_basename(self) -> str | None:
317 lf: Final[str | None] = self.__log_file
318 if lf is None:
319 return None
320 lid = lf.rfind(".")
321 lis = lf.rfind("/")
322 return lf[:lid] if (lid > 0) and (lid > lis) else lf
324 def get_random(self) -> Generator:
325 return self.__random
327 def should_terminate(self) -> bool:
328 if self._terminated:
329 self._knows_that_terminated = True
330 return True
331 return False
333 def get_consumed_fes(self) -> int:
334 return self._current_fes
336 def get_consumed_time_millis(self) -> int:
337 if not self._terminated:
338 self._current_time_nanos = time = _TIME_IN_NS()
339 if time >= self._end_time_nanos:
340 self.terminate()
341 return _ns_to_ms(self._current_time_nanos - self._start_time_nanos)
343 def get_max_time_millis(self) -> int | None:
344 return self._max_time_millis
346 def get_max_fes(self) -> int | None:
347 return self._max_fes
349 def get_last_improvement_fe(self) -> int:
350 if self._last_improvement_fe < 0:
351 raise ValueError("Did not perform FE yet, cannot query "
352 "last improvement FE.")
353 return self._last_improvement_fe
355 def get_last_improvement_time_millis(self) -> int:
356 if self._last_improvement_time_nanos < 0:
357 raise ValueError("Did not perform FE yet, cannot query "
358 "last improvement time.")
359 return _ns_to_ms(self._last_improvement_time_nanos
360 - self._start_time_nanos)
362 def has_best(self) -> bool:
363 return self._current_fes > 0
365 def get_best_f(self) -> int | float:
366 if self._current_fes > 0:
367 return self._current_best_f
368 raise ValueError("No current best available.")
370 def get_copy_of_best_x(self, x) -> None:
371 if self._current_fes > 0:
372 return self._copy_y(x, self._current_best_y)
373 raise ValueError("No current best available.")
375 def terminate(self) -> None:
376 with self.__lock:
377 old_terminated = self._terminated
378 self._terminated = True
379 if old_terminated:
380 return
381 if self.__timer is not None:
382 self.__timer.cancel()
383 del self.__timer
384 self._current_time_nanos = _TIME_IN_NS()
386 def get_copy_of_best_y(self, y) -> None:
387 """
388 Get a copy of the current best point in the solution space.
390 This method in this internal class just forwards to
391 :meth:`get_copy_of_best_x`.
393 :param y: the destination data structure to be overwritten
394 """
395 return self.get_copy_of_best_x(y)
397 def _log_own_parameters(self, logger: KeyValueLogSection) -> None:
398 """
399 Write the parameters of this process to the logger.
401 This includes the limits on runtime and FEs.
403 :param logger: the logger
404 """
405 super().log_parameters_to(logger)
406 if self._max_fes is not None:
407 logger.key_value(KEY_MAX_FES, self._max_fes)
408 if self._max_time_millis is not None:
409 logger.key_value(KEY_MAX_TIME_MILLIS, self._max_time_millis)
410 if self._goal_f is not None:
411 logger.key_value(KEY_GOAL_F, self._goal_f)
412 logger.key_value(KEY_RAND_SEED, self.__rand_seed, True)
413 logger.key_value(KEY_RAND_GENERATOR_TYPE, type_name_of(self.__random))
414 logger.key_value(KEY_RAND_BIT_GENERATOR_TYPE,
415 type_name_of(self.__random.bit_generator))
417 def log_parameters_to(self, logger: KeyValueLogSection) -> None:
418 """
419 Write the standard parameters of this process to the logger.
421 This includes the limits on runtime and FEs.
423 :param logger: the logger
424 """
425 with logger.scope(SCOPE_PROCESS) as sc:
426 self._log_own_parameters(sc)
427 with logger.scope(SCOPE_ALGORITHM) as sc:
428 self.__algorithm.log_parameters_to(sc)
429 with logger.scope(SCOPE_SOLUTION_SPACE) as sc:
430 self._solution_space.log_parameters_to(sc)
431 with logger.scope(SCOPE_OBJECTIVE_FUNCTION) as sc:
432 self.__objective.log_parameters_to(sc)
434 def add_log_section(self, title: str, text: str) -> None:
435 """
436 Add a section to the log, if a log is written (otherwise ignore it).
438 :param title: the title of the log section
439 :param text: the text to log
440 """
441 if not isinstance(title, str):
442 raise type_error(title, "title", str)
443 t = title.strip()
444 if (len(t) != len(title)) or (len(t) <= 0) or (" " in t) \
445 or ("\n" in t) or ("\t" in t):
446 raise ValueError("section title must not be empty or contain "
447 f"white space, but {title!r} is/does.")
448 if (t in _ALL_SECTIONS) or (SECTION_START in t) or (SECTION_END in t):
449 raise ValueError(f"title {t!r} is a reserved section title")
450 if t.upper() != t:
451 raise ValueError("section titles must be in upper case,"
452 f"but yours is {t!r} (vs. {t.upper()!r}.")
453 for ch in t: # check all character codes in t
454 code: int = ord(ch) # we will only permit A-Z, 0-9, and _
455 if not ((65 <= code <= 90) or (48 <= code <= 57) or (code == 95)):
456 raise ValueError(
457 f"{ch!r} forbidden in section title, but got {t!r}.")
458 if not isinstance(text, str):
459 raise type_error(text, "text", str)
460 if (SECTION_START in text) or (SECTION_END in text):
461 raise ValueError(
462 f"text of section {t!r} must not contain {SECTION_START!r} or"
463 f" {SECTION_END!r} but is {text!r}")
464 if self.__sections is not None:
465 if title in self.__sections:
466 raise ValueError(f"section {title!r} already logged.")
467 self.__sections[title] = text.strip()
469 def _log_best(self, kv: KeyValueLogSection) -> None:
470 """
471 Log the best solution.
473 :param kv: the key values logger
474 """
475 kv.key_value(KEY_BEST_F, self._current_best_f)
476 kv.key_value(KEY_LAST_IMPROVEMENT_FE,
477 self._last_improvement_fe)
478 kv.key_value(KEY_LAST_IMPROVEMENT_TIME_MILLIS,
479 _ns_to_ms(self._last_improvement_time_nanos
480 - self._start_time_nanos))
482 def _write_result(self, logger: Logger) -> None:
483 """
484 Write the end result into the log.
486 :param logger: the logger
487 """
488 with logger.text(SECTION_RESULT_Y) as txt:
489 txt.write(self._solution_space.to_str(self._current_best_y))
491 def _write_state_and_setup(
492 self, logger: Logger,
493 more_state: Iterable[tuple[str, str]] | None = None) -> None:
494 """
495 Write the current state and algorithm setup into the log.
497 :param logger: the logger
498 :param more_state: additional key-value pairs, or `None`
499 """
500 with logger.key_values(SECTION_FINAL_STATE) as kv:
501 kv.key_value(KEY_TOTAL_FES, self._current_fes)
502 kv.key_value(KEY_TOTAL_TIME_MILLIS,
503 _ns_to_ms(self._current_time_nanos
504 - self._start_time_nanos))
505 if self._current_fes > 0:
506 self._log_best(kv)
507 if more_state:
508 for key, value in more_state:
509 kv.key_value(key, value)
510 with logger.key_values(SECTION_SETUP) as kv:
511 self.log_parameters_to(kv)
512 log_sys_info(logger)
514 def _write_log(self, logger: Logger) -> None:
515 """
516 Write the information gathered during optimization into the log.
518 :param logger: the logger
519 """
520 self._write_state_and_setup(logger)
521 if self._current_fes > 0:
522 self._write_result(logger)
524 def _validate_x(self) -> None:
525 """Validate x, if it exists."""
527 def _check_timing(self) -> None:
528 """
529 Check whether there has been any timing errors.
531 :raises ValueError: if there is any timing error
532 """
533 if self._current_time_nanos < self._start_time_nanos:
534 raise ValueError(
535 f"current_time_nanos={self._current_time_nanos} < "
536 f"start_time_nanos={self._start_time_nanos}")
537 if self._current_fes <= 0:
538 raise ValueError("no FE was performed")
539 if self._current_fes < self._last_improvement_fe:
540 raise ValueError(
541 f"current_fe={self._current_fes} < "
542 f"last_improvement_fe={self._last_improvement_fe}")
543 if self._current_time_nanos < self._last_improvement_time_nanos:
544 raise ValueError(
545 f"current_time_nanos={self._current_time_nanos} < "
546 "last_improvement_time_nanos="
547 f"{self._last_improvement_time_nanos}")
549 def _validate_best_f(self) -> None:
550 """
551 Validate the best encountered objective value.
553 :raises ValueError: if there is an error
554 """
555 ff: Final[int | float] = self._f(self._current_best_y)
556 if ff != self._current_best_f:
557 raise ValueError( # noqa
558 "We re-computed the objective value of the best solution"
559 f" and got {ff}, but it has been registered as "
560 f"{self._current_best_f}!") # noqa
561 if not isfinite(ff):
562 raise ValueError( # noqa
563 f"The objective value {ff} of " # noqa
564 "the best solution is not finite?")
565 lb: Final[int | float] = self.__objective.lower_bound()
566 ub: Final[int | float] = self.__objective.upper_bound()
567 if not (lb <= ff <= ub):
568 raise ValueError( # noqa
569 f"The objective value {ff} of " # noqa
570 "the best solution is not within the lower and "
571 f"upper bound, i.e., [{lb}, {ub}]?") # noqa
573 def has_log(self) -> bool:
574 """
575 Check if this log has an associated log file.
577 :retval `True`: if the process is associated with a log output
578 :retval `False`: if no information is stored in a log output
579 """
580 return self.__log_file is not None
582 def __exit__(self, exception_type, exception_value, traceback) -> None:
583 """Exit the process and write the log if necessary."""
584 # noinspection PyProtectedMember
585 super().__exit__(exception_type, exception_value, traceback)
587 # Update the total consumed time, but not include the error checks
588 # below.
589 self._current_time_nanos = _TIME_IN_NS()
591 y_error: Exception | None = None # error in solution?
592 v_error: Exception | None = None # error in objective value?
593 x_error: Exception | None = None # error in search space?
594 t_error: Exception | None = None # error in timing?
595 log_error: Exception | None = None # error while logging?
596 try:
597 self._solution_space.validate(self._current_best_y)
598 except Exception as be: # noqa
599 y_error = be
600 if self._current_fes > 0:
601 try:
602 self._validate_best_f()
603 except Exception as be: # noqa
604 v_error = be
605 try:
606 self._validate_x()
607 except Exception as be: # noqa
608 x_error = be
609 try:
610 self._check_timing()
611 except Exception as be: # noqa
612 t_error = be
614 if self.__log_file is not None:
615 with FileLogger(self.__log_file) as logger:
616 try:
617 self._write_log(logger)
618 except Exception as be: # noqa
619 log_error = be
621 if self._caught is not None:
622 _error_2(logger, SECTION_ERROR_IN_RUN,
623 self._caught)
624 if exception_type or exception_value or traceback:
625 _error_1(logger, SECTION_ERROR_IN_CONTEXT,
626 exception_type, exception_value, traceback)
627 if y_error:
628 _error_2(logger, SECTION_ERROR_INVALID_Y, y_error)
629 if v_error:
630 _error_2(logger, SECTION_ERROR_BEST_F, v_error)
631 if x_error:
632 _error_2(logger, SECTION_ERROR_INVALID_X, x_error)
633 if t_error:
634 _error_2(logger, SECTION_ERROR_TIMING, t_error)
635 if log_error:
636 _error_2(logger, SECTION_ERROR_IN_LOG, log_error)
638 # flush all the additional log sections at the end
639 for t in sorted(self.__sections.keys()):
640 with logger.text(t) as sec:
641 sec.write(self.__sections[t])
642 del self.__sections
644 if not exception_type:
645 # if no error happened when closing the process, raise any error
646 # caught during validation.
647 if self._caught is not None:
648 raise self._caught # pylint: disable=[E0702]
649 if y_error:
650 raise y_error
651 if v_error:
652 raise v_error
653 if x_error:
654 raise x_error
655 if t_error:
656 raise t_error
657 if log_error:
658 raise log_error
660 def __str__(self) -> str:
661 """
662 Get the name of this process implementation.
664 :return: "baseProcess"
665 """
666 return "baseProcess"
669def _check_log_time(start_time: int, current_time: int,
670 log: list[list]) -> None:
671 """
672 Check the time inside the log.
674 :param start_time: the start time
675 :param current_time: the current time
676 :param log: the log
677 :raises ValueError: if there is a timing error in the log
678 """
679 last_time: int = -1
680 last_fe: int = -1
681 for row in log:
682 fes: int = cast("int", row[0])
683 time: int = cast("int", row[1])
684 if fes < last_fe:
685 raise ValueError(f"fe={fes} after fe={last_fe}?")
686 if time < last_time:
687 raise ValueError(
688 f"time={time} of fe={fes} is less than "
689 f"last_time={last_time} of last_fe={last_fe}")
690 if time < start_time:
691 raise ValueError(
692 f"time={time} of fe={fes} is less than "
693 f"start_time_nanos={start_time}")
694 if time > current_time:
695 raise ValueError(
696 f"time={time} of fe={fes} is greater than "
697 f"current_time_nanos={current_time}")
698 last_time = time
699 last_fe = fes