moptipyapps.dynamic_control package

Examples of dynamic control problems.

Here we have examples for dynamic control problems. An instance of the dynamic control problem is composed of

  • a system of differential equations that govern the state of a system and that also incorporate the output of a controller,

  • a controller blueprint, i.e., a function that can be parameterized and that computes a controller output from the system state, and

  • an objective that rates how well-behaved the system driven by the controller is in a simulation.

Such systems can serve as primitive testbeds of actual dynamic control scenarios such as flight control or other fluid dynamic systems. Different from such more complicated systems, they are much faster and easier to simulate, though. So we can play with them much more easily and quickly.

The initial starting point of the work here were conversations with Prof. Dr. Bernd NOACK and Guy Yoslan CORNEJO MACEDA of the Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)) as well as the following two MSc theses and book:

  1. Yuxiang LI (李宇翔). Jet Mixing Enhancement using Deep Reinforcement Learning (基于深度强化学习的射流混合增强控制). MSc Thesis. Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)). January 2023.

  2. Wei SUN (孙伟). Wake Control of 1-2-3 Fluidic Pinball using Deep Reinforcement Learning (基于深度强化学习方法的 1-2-3 流体弹球尾流控制). MSc Thesis. Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)). January 2023.

  3. Guy Yoslan CORNEJO MACEDA, François LUSSEYRAN, and Bernd R. NOACK. xMLC: A Toolkit for Machine Learning Control, First Edition. Machine Learning Tools in Fluid Mechanics, Vol 2. Shenzhen & Paris; Universitätsbibliothek der Technischen Universität Braunschweig. 2022 https://doi.org/10.24355/dbbs.084-202208220937-0

Subpackages

Submodules

moptipyapps.dynamic_control.controller module

A base class for implementing controllers.

A controller basically is a parameterizable function that receives the current state and time of a system as input and computes one or multiple controller values as output. These controller values are then used to influence how the state of the system changes in the next iteration. In the dynamic systems control optimization task, the goal is to find the right parameterization for the controller such that an objective is minimized.

Examples for different controllers for dynamic systems are given in package controllers.

class moptipyapps.dynamic_control.controller.Controller(name, state_dims, control_dims, param_dims, func=None)[source]

Bases: Component

A class for governing a system via differential equations.

control_dims: Final[int]

the dimensions of the controller output

controller(state, time, params, out)[source]

Compute the control value and store it in out.

Parameters:
  • state (ndarray) – the state vector

  • time (float) – the time value

  • params (ndarray) – the controller variables

  • out (ndarray) – the output array to receive the controller values

Return type:

None

log_parameters_to(logger)[source]

Log all parameters of this component as key-value pairs.

Parameters:

logger (KeyValueLogSection) – the logger for the parameters

Return type:

None

name: Final[str]

the controller name

param_dims: Final[int]

the dimensions of the controller parameter

parameter_space()[source]

Create a vector space to represent the possible parameterizations.

Return type:

VectorSpace

Returns:

a vector space for the possible parameterizations of this controller.

state_dims: Final[int]

the dimensions of the state variable

moptipyapps.dynamic_control.experiment_raw module

An example experiment for dynamic control.

In this experiment, we try to synthesize (i.e., parameterize) controllers (controller) that steer a dynamic system (system) into a state by using a figure of merit (objective) which minimizes both the squared system state and controller effort.

A model-based experiment variant is given in experiment_surrogate.

moptipyapps.dynamic_control.experiment_raw.base_setup(instance)[source]

Create the basic setup.

Parameters:

instance (Instance) – the instance to use

Return type:

Execution

Returns:

the basic execution

moptipyapps.dynamic_control.experiment_raw.cmaes(instance)[source]

Create the Bi-Pop-CMA-ES setup.

Parameters:

instance (Instance) – the problem instance

Return type:

Execution

Returns:

the setup

moptipyapps.dynamic_control.experiment_raw.make_instances()[source]

Create the instances to be used in the dynamic control experiment.

Return type:

Iterable[Callable[[], Instance]]

Returns:

the instances to be used in the dynamic control experiment.

moptipyapps.dynamic_control.experiment_raw.on_completion(instance, log_file, process)[source]

Plot the corresponding figures and print the objective value components.

Parameters:
  • instance (Any) – the problem instance

  • log_file (Path) – the log file

  • process (Process) – the process

Return type:

None

moptipyapps.dynamic_control.experiment_raw.run(base_dir, n_runs=5)[source]

Run the experiment.

Parameters:
  • base_dir (str) – the base directory

  • n_runs (int, default: 5) – the number of runs

Return type:

None

moptipyapps.dynamic_control.experiment_surrogate module

An example experiment for dynamic control using surrogate system models.

In this experiment, we again try to synthesize (i.e., parameterize) controllers (controller) that steer a dynamic system (system) into a state by using a figure of merit (objective) which minimizes both the squared system state and controller effort.

The difference compared to experiment_raw is that we also try to synthesize a system model at the same time. We employ the procedure detailed in surrogate_optimizer for this purpose.

Word of advice: This experiment takes extremely long and needs a lot of memory!

The starting points of the work here were conversations with Prof. Dr. Bernd NOACK and Guy Yoslan CORNEJO MACEDA of the Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)).

moptipyapps.dynamic_control.experiment_surrogate.MAX_FES: Final[int] = 64

the total objective function evaluations

moptipyapps.dynamic_control.experiment_surrogate.base_setup(instance)[source]

Create the basic setup.

Parameters:

instance (Instance) – the instance to use

Return type:

tuple[Execution, FigureOfMerit, VectorSpace]

Returns:

the basic execution

moptipyapps.dynamic_control.experiment_surrogate.cmaes_raw(instance)[source]

Create the Bi-Pop-CMA-ES setup.

Parameters:

instance (Instance) – the problem instance

Return type:

Execution

Returns:

the setup

moptipyapps.dynamic_control.experiment_surrogate.cmaes_surrogate(instance, fes_for_warmup=16, fes_for_training=128, fes_per_model_run=128, fancy_logs=True)[source]

Create the Bi-Pop-CMA-ES setup.

Parameters:
  • instance (SystemModel) – the problem instance

  • fes_for_warmup (int, default: 16) – the FEs to be used for warmup

  • fes_for_training (int, default: 128) – the milliseconds for training

  • fes_per_model_run (int, default: 128) – the milliseconds per model run

  • fancy_logs (bool, default: True) – should we do fancy logging?

Return type:

Execution

Returns:

the setup

moptipyapps.dynamic_control.experiment_surrogate.make_instances()[source]

Create the instances to be used in the dynamic control experiment.

Return type:

Iterable[Callable[[], SystemModel]]

Returns:

the instances to be used in the dynamic control experiment.

moptipyapps.dynamic_control.experiment_surrogate.on_completion(instance, log_file, process)[source]

Plot the corresponding figures and print the objective value components.

Parameters:
  • instance (Any) – the problem instance

  • log_file (Path) – the log file

  • process (Process) – the process

Return type:

None

moptipyapps.dynamic_control.experiment_surrogate.run(base_dir, n_runs=64)[source]

Run the experiment.

Parameters:
  • base_dir (str) – the base directory

  • n_runs (int, default: 64) – the number of runs

Return type:

None

moptipyapps.dynamic_control.instance module

An instance of the dynamic control synthesis problem.

An instance of the dynamic control synthesis problem is comprised of two components: a system of differential equations governing how the state of a system changes over time and a controller that uses the current system state as input and computes a controller value as output that influences the state change. Notice that the controller here is a parametric function. The goal of the dynamic system control is to find the right parameterization of the controller such that an objective is minimized. The objective here usually has the goal to bring the dynamic system into a stable state while using as little controller “energy” as possible.

An instance of the simultaneous control and model synthesis problem is an instance of the class SystemModel, which is a subclass of Instance. It also adds a controller blueprint for modelling the systems response (state differential) based on the system state and controller output.

The starting point of the work here were conversations with Prof. Dr. Bernd NOACK and Guy Yoslan CORNEJO MACEDA of the Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)).

class moptipyapps.dynamic_control.instance.Instance(system, controller, name_base=None)[source]

Bases: Component

An instance of the dynamic control problem.

controller: Final[Controller]

the controller applied to the system

describe_parameterization(title, parameters, base_name, dest_dir)[source]

Describe the performance of a given system of system.

Parameters:
  • title (str | None) – the optional title

  • parameters (ndarray) – the controller parameters

  • base_name (str) – the base name of the file to produce

  • dest_dir (str) – the destination directory

Return type:

tuple[Path, ...]

Returns:

the paths of the generated files

log_parameters_to(logger)[source]

Log all parameters of this component as key-value pairs.

Parameters:

logger (KeyValueLogSection) – the logger for the parameters

Return type:

None

name: Final[str]

the name of this instance

system: Final[System]

the system governing the dynamic system

moptipyapps.dynamic_control.model_objective module

An objective function that can be used to synthesize system models.

We consider the dynamic system to be a function D(s, c) = ds/dt, where

  • s is the state vector (e.g., two-dimensional for the Stuart-Landau system, see stuart_landau, and three-dimensional for the Lorenz system, see lorenz),

  • c is the control vector (one-dimensional in both cases), and

  • ds/dt is the state differential (again two respectively three-dimensional).

The FigureOfMerit objective function allows us to collect tuples of (s,c) and ds/dt vectors. So all we need to do is to train a model M that receives as input a vector x=(s,c) (where (s,c) be the concatenation of s and c) and fill as output a vector ds/dt.

The objective function in this module minimizes the root mean square error over the model-computed ds/dt vectors and the actual ds/dt vectors. The model objective function is used by the surrogate_optimizer algorithm.

class moptipyapps.dynamic_control.model_objective.ModelObjective(real, model)[source]

Bases: Objective

The objective for modeling.

This objective function works on sequences of tuples (s, c) of the system state s and controller output c as well as the corresponding ds/dt differentials. The goal is to train a model M(s, c, q) that will compute the ds/dt values reasonably accurately. The model is parameterized with some vector q, think of M being, e.g., an artificial neural network and q being its weight vector. To find good values of q, this objective here computes the squared differences between the values M(s, c, q) and the expected outputs ds/dt.

These squared differences are then either averaged directly or the expm1(mean(logp1(…)))) hack of the FigureOfMeritLE is used to alleviate the impact of large differentials, depending on which “real” (controller-synthesis) objective is passed into the constructor).

The tuples (s, c) and ds/dt are pulled from the real objective with the method begin(). The ds/dt rows are flattened for performance reasons.

begin()[source]

Begin a model optimization run.

This function pulls the training data from the actual controller-synthesis objective function, which is an instance of FigureOfMerit, via the method get_differentials() and allocates internal data structures accordingly.

Return type:

None

end()[source]

End a model optimization run and free the associated memory.

Return type:

None

evaluate(x)[source]

Evaluate a model parameterization.

Parameters:

x (ndarray) – the model parameterization

Return type:

float

Returns:

the objective value

log_parameters_to(logger)[source]

Log all parameters of this component as key-value pairs.

Parameters:

logger (KeyValueLogSection) – the logger for the parameters

Return type:

None

lower_bound()[source]

Get the lower bound of the model objective, which is 0.

Return type:

float

Returns:

0.0

moptipyapps.dynamic_control.objective module

An objective functions for the dynamic control problem.

The dynamic control problem means that our system starts in a given state and we try to move it to a stable state by using control effort. Controllers are trained over several training states, for each of which we can compute a figure of merit.

We offer two different approaches for this:

  • FigureOfMerit computes the arithmetic mean z over the separate figures of merit J of the training cases.

  • FigureOfMeritLE tries to smooth out the impact of bad starting states by computing exp(mean[log(J + 1)]) - 1.

These objective functions also offer a way to collect the state+control and corresponding differential vectors.

class moptipyapps.dynamic_control.objective.FigureOfMerit(instance, supports_model_mode=False)[source]

Bases: Objective

A base class for figures of merit.

evaluate(x)[source]

Evaluate the parameterization of a controller.

Parameters:

x (ndarray) – the controller parameters

Return type:

float

Returns:

the figure of merit

get_differentials()[source]

Get the collected differentials.

If supports_model_mode was set to True in the creating of this objective function, then the system will gather tuples (s, c) and ds/dt when in raw mode (see set_raw()) and make them available here to train system models (see set_model()). Notice that gathering training data is a very memory intense process.

Return type:

tuple[ndarray, ndarray]

Returns:

the collected differentials

initialize()[source]

Initialize the objective for use.

Return type:

None

instance: Final[Instance]

the dynamic control instance

log_parameters_to(logger)[source]

Log all parameters of this component as key-value pairs.

Parameters:

logger (KeyValueLogSection) – the logger for the parameters

Return type:

None

lower_bound()[source]

Get the lower bound of the figure of merit, which is 0.

Return type:

float

Returns:

0.0

set_model(equations)[source]

Set the model-driven mode for the evaluation.

In this modus, the internal system equations are replaced by the callable equations passed into this function and the data collection is stopped. The idea is that equations could be a model synthesized on the data gathered (see get_differentials()) and thus does not represent the actual dynamic system but a model thereof. We could synthesize a controller for this model and for this purpose would use the exactly same objective function – just instead of using the actual system equations, we use the system model. Of course, we then need to deactivate the data gathering mechanism (see again get_differentials()), because the data would then not be real system data. You can toggle back to the actual system using set_raw().

Parameters:

equations (Callable[[ndarray, float, ndarray, ndarray], None]) – the equations to be used instead of the actual system’s differential equations.

Return type:

None

set_raw()[source]

Let this objective work on the original system equations.

The objective function here can be used in two modi: a) based on the original systems model, as given in system, or b) on a learned model of the system. This function here toggles to the former mode, i.e., to the actual system mode. In this modus, training data for training the system model will be gathered if the objective function is configured to do so. In that case, you can toggle to model mode via set_model().

Return type:

None

sum_up_results(results)[source]

Compute the final objective value from several single J values.

When synthesizing controllers, we do not just apply them to a single simulation run. Instead, we use multiple training cases (see training_starting_states) and perform training_steps simulation steps on each of them. Each such training starting state will result in a single J value, which is the sum of squared state and control values. We now compute the end objective value from these different J values by using this function here.

This will destroy the contents of results.

Parameters:

results (ndarray) – the array of J values

Return type:

float

Returns:

the final result

class moptipyapps.dynamic_control.objective.FigureOfMeritLE(instance, supports_model_mode=False)[source]

Bases: FigureOfMerit

Compute a exp(mean(log(z+1)))-1 over the figures of merit z.

Different from FigureOfMerit, we compute the mean of log(z + 1) where z be the figures of merit of the single training cases. We then return exp(mean[log(z + 1)]) - 1 as final result. The goal is to reduce the impact of training cases that require more control effort.

If we solve the dynamic control problem for diverse training cases, then we may have some very easy cases, where the system just needs a small control impulse to move into a stable and cheap state. Others may have very far out and expensive starting states that require lots of control efforts to be corrected. If we simply average over all states, then these expensive states will dominate whatever good we are doing in the cheap states. Averaging over the log(J+1) reduces such impact. We then compute exp[…]-1 of the result as cosmetics to get back into the original range of the figure of merits.

sum_up_results(results)[source]

Compute the final objective value from several single J values.

For each training case, there is one basic figure of merit J and here we compute exp(mean[log(J + 1)]) - 1 over all of these values.

Parameters:

results (ndarray) – the array of J values

Return type:

float

Returns:

the final result

moptipyapps.dynamic_control.ode module

A primitive integrator for systems of ordinary differential equations.

Many dynamic systems can be modeled as systems of ordinary differential equations that govern their progress over time. Trying to find out in which state such systems are at a given point in time means to integrate these equations until that point in time (starting from a starting state).

What we want to play around with, however, is synthesizing controllers. In this case, the differential equations also merge the output of the controller with the current state. If the controller behaves inappropriately, this may make the system diverge, i.e., some of its state variables go to infinity over time or sometimes rather quickly.

Using ODE integrators that compute the system state at pre-defined time steps is thus cumbersome, as the system may have already exploded at these goal times. Therefore, we perform ODE integration in several steps. First, we try it the “normal” way. However, as soon as the system escapes the sane parameter range, we stop. We then use the last point where the system was stable and the first point where it escaped the reasonable range to estimate a new reasonable end time for the integration. We do this until we finally succeed.

Thus, we can simulate a well-behaved system over a long time and an ill-behaved system for a shorter time period. Neither system will diverge.

The following functions are provided:

  • run_ode() executes a set of differential system equations and controller equations and returns an array with the system state, controller output, and time at the different interpolation steps.

  • t_from_ode() returns the total time over which the result of run_ode() was simulated.

  • j_from_ode() returns a figure of merit, i.e., a weighted sum of (a part of) the system state and the controller output from the result of run_ode().

  • diff_from_ode() extracts state difference information from the result of run_ode().

class moptipyapps.dynamic_control.ode.T

the type variable for ODE controller parameters

alias of TypeVar(‘T’)

moptipyapps.dynamic_control.ode.diff_from_ode(ode, state_dim)[source]

Compute all the state+control vectors and the resulting differentials.

This function returns two matrices. Each row of both matrices corresponds to a time slot. Each row in the first matrix holds the state vector and the control vector (that was computed by the controller). The corresponding row in the second matrix then holds the state differential resulting from the control vector being applied in the differential equations that govern the system state change.

The idea is that this function basically provides the data that we would like to learn when training a surrogate model for a system: From the current state and the computed control vector, we want that our model can give us the resulting system differential. If we have such a model and it works reasonably well, then we could essentially plug this model into run_ode() instead of the original equations parameter.

What this function does to compute the differential is to basically “invert” the dynamic weighting done by run_ode(). run_ode() starts in a given starting state s. It then computes the control vector c as a function of s, i.e., c(s). Then, the equations of the dynamic system (see module system) to compute the state differential D=ds/dt as a function of c(s) and s, i.e., as something like D(s, c(s)). The next step would be to update the state, i.e., to set s=s+D(s, c(s)). Unfortunately, this can make s go to infinity. So run_ode() will compute a dynamic weight w and do s=s+w*D(s, c(s)), where w is chosen such that the state vector s does not grow unboundedly. While s and c(s) and w are stored in one row of the result matrix of run_ode(), s+w*D(s,c(s)) is stored as state s in the next row. So what this function here basically does is to subtract the old state from the next state and divide the result by w to get D(s, c(s)). s and c(s) are already available directly in the ODE result and w is not needed anymore.

We then get the rows s, c(s) and D(s, c(s)) in the first and second result matrix, respectively. This can then be used to train a system model as proposed in model system_model.

Parameters:
Return type:

tuple[ndarray, ndarray]

Returns:

a tuple of the state+control vectors and the resulting state differential vectors

>>> od = np.array([
...     [0, 0, 0, 0, 0, 0],  # state 0,0,0; control 0,0; time 0
...     [1, 2, 3, 4, 5, 1],  # state 1,2,3; control 4,5; time 1
...     [2, 3, 4, 5, 6, 3],  # state 2,3,4; control 5,6; time 3
...     [4, 6, 8, 7, 7, 7]])    # state 4,6,8; control 7,7, time 7
>>> res = diff_from_ode(od, 3)
>>> res[0]  # state and control vectors, time col and last row removed
array([[0, 0, 0, 0, 0],
       [1, 2, 3, 4, 5],
       [2, 3, 4, 5, 6]])
>>> res[1]  # (state[i + 1] - state[i]) / (time[i + 1] / time[i])
array([[1.  , 2.  , 3.  ],
       [0.5 , 0.5 , 0.5 ],
       [0.5 , 0.75, 1.  ]])
moptipyapps.dynamic_control.ode.j_from_ode(ode, state_dim, use_state_dims=-1, gamma=0.1)[source]

Compute the original figure of merit from an ODE array.

The figure of merit is the sum of state variable squares plus 0.1 times the control variable squares. We disregard the state variable values of the starting states (because they are the same for all controllers on a given training case and because the control cannot influence them) and we also disregard the final state and final controller output (as there is no time slice associated with them, i.e., we only “arrive” in them but basically spent 0 time in them in our simulation).

Parameters:
  • ode (ndarray) – the array returned by the ODE function, i.e., run_ode()

  • state_dim (int) – the state dimension

  • use_state_dims (int, default: -1) – the dimension until which the state is used, -1 for using the complete state

  • gamma (float, default: 0.1) – the weight of the controller input

Return type:

float

Returns:

the figure of merit

>>> od = np.array([[1, 2, 3, 4, 0],
...                [5, 6, 7, 8, 1],
...                [9, 6, 4, 3, 3],
...                [7, 4, 2, 1, 7]])
>>> sta_dim = 3
>>> print(f"{j_from_ode(od, sta_dim):.10f}")
110.0000000000
>>> print((1.6 + 12.8 + 98 + 72 + 50 + 3.6 + 64 + 144 + 324) / 7)
110.0
>>> sta_dim = 3
>>> print(f"{j_from_ode(od, 3, 2, 0.5):.10f}")
97.1428571429
>>> print((8 + 64 + 72 + 50 + 18 + 144 + 324) / 7)
97.14285714285714
moptipyapps.dynamic_control.ode.multi_run_ode(test_starting_states, training_starting_states, collector, equations, controller, parameters, controller_dim=1, test_steps=5000, test_time=50.0, training_steps=5000, training_time=50.0, use_state_dims=-1, gamma=0.1)[source]

Invoke run_ode() multiple times and pass the result to collector.

This function allows us to perform multiple runs of the differential equation simulator, using different starting points. It also allows us to distinguish training and test points and to assign them different numbers of steps. For each of them, run_ode() will be applied and the returned matrix is passed to the collector function.

Parameters:
  • test_starting_states (Iterable[ndarray]) – the iterable of test starting states

  • training_starting_states (Iterable[ndarray]) – the iterable of training starting states

  • collector (Union[Callable[[int, ndarray, float, float], None], Iterable[Callable[[int, ndarray, float, float], None]]]) – the destination to receive the results, in the form of index, ode array, j, and t.

  • equations (Callable[[ndarray, float, ndarray, ndarray], None]) – the differential system

  • controller (Callable[[ndarray, float, TypeVar(T), ndarray], None]) – the controller function

  • parameters (TypeVar(T)) – the controller parameters

  • controller_dim (int, default: 1) – the dimension of the controller result

  • test_steps (int, default: 5000) – the number of test steps to simulate

  • test_time (float, default: 50.0) – the time limit for tests

  • training_steps (int, default: 5000) – the number of training steps to simulate

  • training_time (float, default: 50.0) – the time limit for training

  • use_state_dims (int, default: -1) – the dimension until which the state is used, -1 for using the complete state

  • gamma (float, default: 0.1) – the weight of the controller input

Return type:

None

moptipyapps.dynamic_control.ode.run_ode(starting_state, equations, controller, parameters, controller_dim=1, steps=5000, max_time=50.0)[source]

Simulate a set of controlled differential system.

The system begins in the starting state stored in the vector starting_state. In each time step, first, the controller is invoked. It receives the current system state vector as input, the current time t, its parameters (parameters), and an output array to store its computed control values into. This array has dimension controller_dim, which usually is 1. Then, the function system will be called and receives the current state vector, the time step t, and the controller output as input, as well as an output array to store the result of the differential into. This output array has the same dimension as the state vector.

Now the run_ode function simulates such a system over steps time steps over the closed interval [0, max_time]. If both the system and the controller are well-behaved, then the output array will contain steps rows with the state, controller, and time information of each step. If the system diverges at some point in time but we can simulate it reasonably well before that, then we try to simulate steps, but on a shorter time frame. If even that fails, you will get a single row output with 1e100 as the controller value.

This function returns a matrix where each row corresponds to a simulated time step. Each row contains three components in a concatenated fashion: 1. the state vector, 2. the control vector, 3. the time value

Parameters:
Return type:

ndarray

Returns:

a matrix where each row represents a point in time, composed of the current state, the controller output, and the length of the time slice

If we simulate the flight of a projectile with our ODE execution, then both the flight time as well as the flight length are about 0.12% off from what the mathematical solution of the flight system prescribe. That’s actually not bad for a crude and fast integration method…

>>> v = 100.0
>>> angle = np.deg2rad(45.0)
>>> v_x = v * np.cos(angle)
>>> print(f"{v_x:.10f}")
70.7106781187
>>> v_y = v * np.sin(angle)
>>> print(f"{v_y:.10f}")
70.7106781187
>>> def projectile(position, ttime, ctrl, out):
...     out[0] = 70.71067811865474
...     out[1] = 70.71067811865474 - ttime * 9.80665
>>> param = 0.0   # ignore
>>> def contrl(position, ttime, params, dest):
...     dest[0] = 0.0  #  controller that does nothing
>>> strt = np.array([0.0, 1.0])
>>> ode = run_ode(strt, projectile, contrl, param, 1, 10000)
>>> print(len(ode))
10000
>>> time_of_flight = 2 * v_y / 9.80665
>>> print(f"{time_of_flight:.10f}")
14.4209649817
>>> travel_distance_x = time_of_flight * v_x
>>> print(f"{travel_distance_x:.10f}")
1019.7162129779
>>> idx = np.argwhere(ode[:, 1] <= 0.0)[0][0]
>>> print(idx)
2887
>>> print(f"{ode[idx - 1, 0]:.10f}")
1020.4571309653
>>> print(f"{ode[idx, 0]:.10f}")
1020.8107197148
>>> print(f"{ode[idx - 1, -1]:.10f}")
14.4314431443
>>> print(f"{ode[idx, -1]:.10f}")
14.4364436444
>>> print(ode[-1, -1])
50.0
>>> def contrl2(position, ttime, params, dest):
...     dest[0] = 1e50  #  controller that is ill-behaved
>>> run_ode(strt, projectile, contrl2, param, 1, 10000)
array([[0.e+000, 1.e+000, 1.e+100, 0.e+000]])
>>> def contrl3(position, ttime, params, dest):
...     dest[0] = 1e50 if ttime > 10 else 0.0  # diverging controller
>>> ode = run_ode(strt, projectile, contrl3, param, 1, 10000)
>>> print(len(ode))
10000
>>> print(ode[-1])
[690.10677249 224.06765771   0.           9.75958357]
>>> def projectile2(position, ttime, ctrl, out):
...     out[:] = 0
>>> ode = run_ode(strt, projectile2, contrl, param, 1, 10000)
>>> print(len(ode))
10000
>>> print(ode[-1])
[ 0.  1.  0. 50.]
>>> ode = run_ode(strt, projectile2, contrl3, param, 1, 10000)
>>> print(len(ode))
10000
>>> print(ode[-1])
[0.         1.         0.         9.41234557]
moptipyapps.dynamic_control.ode.t_from_ode(ode)[source]

Get the time sum from an ODE solution.

The total time that we simulate a system depends on the behavior of the system.

Parameters:

ode (ndarray) – the ODE solution, as return from run_ode().

Return type:

float

Returns:

the total consumed time

>>> od = np.array([[1, 2, 3, 4, 0.1],
...                [5, 6, 7, 8, 0.2],
...                [9, 6, 4, 3, 0.3],
...                [7, 4, 2, 1, 0.4]])
>>> print(t_from_ode(od))
0.4

moptipyapps.dynamic_control.results_log module

A logger for results gathered from ODE integration to a text file.

class moptipyapps.dynamic_control.results_log.ResultsLog(state_dim, out)[source]

Bases: AbstractContextManager

A class for logging results via multi_run_ode.

Function moptipyapps.dynamic_control.ode.multi_run_ode() can pass its results to various output generating procedures. This class here offers a procedure for writing them to a log file.

>>> def projectile(position, ttime, ctrl, out):
...     out[0] = 70.71067811865474
...     out[1] = 70.71067811865474 - ttime * 9.80665
>>> param: np.ndarray = np.array([1])   # ignore
>>> def contrl(position, ttime, params, dest):
...     dest[0] = params[0]  #  controller that returns param
>>> from io import StringIO
>>> from moptipyapps.dynamic_control.ode import multi_run_ode
>>> with StringIO() as sio:
...     with ResultsLog(2, sio) as log:
...         multi_run_ode([np.array([0.0, 1.0])],
...                       [np.array([1.0, 1.0])],
...                       log.collector, projectile, contrl, param,
...                       1, 10000, 14.890, 10000, 14.961)
...         x=sio.getvalue()
>>> tt = x.split()
>>> print(tt[0])
figureOfMerit;totalTime;nSteps;start0;start1;end0;end1
>>> for y in tt[1:]:
...     print(";".join(f"{float(v):.3f}" for v in y.split(";")))
403374.924;14.890;10000.000;0.000;1.000;1052.882;-33.244
407810.750;14.961;10000.000;1.000;1.000;1058.902;-38.616
collector(index, ode, j, time)[source]

Log the result of a multi-ode run.

Parameters:
  • index (int) – the index of the result

  • ode (ndarray) – the ode result matrix

  • j (float) – the figure of merit

  • time (float) – the time value

Return type:

None

moptipyapps.dynamic_control.results_plot module

An illustrator for results gathered from ODE integration to multi-plots.

This evaluator will create figures where each sub-plot corresponds to one evolution of the system over time. The starting state is marked with a blue cross, the final state with a red one. Both states are connected with a line that marks the progress over time. The line begins with blue color and changes its color over violet/pinkish towards yellow the farther in time the system progresses.

moptipyapps.dynamic_control.results_plot.END_COLOR: Final[str] = 'red'

the end point color

moptipyapps.dynamic_control.results_plot.END_MARKER: Final[str] = 'o●'

the end marker

class moptipyapps.dynamic_control.results_plot.ResultsPlot(dest_file, sup_title, state_dims, plot_indices=(0,), state_dim_mod=0)[source]

Bases: AbstractContextManager

A class for plotting results via multi_run_ode.

Function moptipyapps.dynamic_control.ode.multi_run_ode() can pass its results to various output generating procedures. This class here offers a procedure for plotting them to a file.

collector(index, ode, j, time)[source]

Plot the result of a multi-ode run.

Parameters:
  • index (int) – the index of the result

  • ode (ndarray) – the ode result matrix

  • j (float) – the figure of merit

  • time (float) – the time value

Return type:

None

moptipyapps.dynamic_control.results_plot.START_COLOR: Final[str] = 'blue'

the start point color

moptipyapps.dynamic_control.results_plot.START_MARKER: Final[str] = '++'

the start marker

moptipyapps.dynamic_control.starting_points module

Synthesize some interesting starting points.

Here we have some basic functionality to synthesize starting points, i.e., training cases, for the dynamic systems control task. The points synthesized here by function make_interesting_starting_points() try to fulfill two goals:

  1. the points should be as far away from each other as possible in the state space,

  2. there should be points of many different distances from the state space origin, and

  3. compared to the coordinates of the other points, the coordinates of the synthesized points should be sometimes smaller and sometimes larger (where sometimes should ideally mean “equally often”).

These two goals are slightly contradicting and are achieved by forcing the points to be located on rings of increasing distance from the origin via interesting_point_transform() while maximizing their mean distance to each other via interesting_point_objective(). Since make_interesting_starting_points() is a bit slow, it makes sense to pre-compute the points and then store them as array constants.

moptipyapps.dynamic_control.starting_points.interesting_point_objective(x, other, max_radius, dim)[source]

Compute the point diversity. :rtype: float

  1. the distance between the points

  2. the distance to the other points

  3. the coordinates in the different dimensions should be equally often smaller and larger than those of the starting points

moptipyapps.dynamic_control.starting_points.interesting_point_transform(x, max_radius, dim)[source]

Transform interesting points.

>>> xx = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12.0])
>>> ppp = interesting_point_transform(xx, 10.0, 3)
>>> print(ppp)
[[0.6681531  1.33630621 2.00445931]
 [2.27921153 2.84901441 3.41881729]
 [3.76928033 4.30774895 4.84621757]
 [5.23423923 5.75766315 6.28108707]]
>>> print(xx)
[0.6681531  1.33630621 2.00445931 2.27921153 2.84901441 3.41881729
 3.76928033 4.30774895 4.84621757 5.23423923 5.75766315 6.28108707]
>>> print([np.sqrt(np.square(pppp).sum()) for pppp in ppp])
[2.5, 5.0, 7.5, 10.0]
>>> ppp = interesting_point_transform(xx, 10.0, 3)
>>> print(ppp)
[[0.6681531  1.33630621 2.00445931]
 [2.27921153 2.84901441 3.41881729]
 [3.76928033 4.30774895 4.84621757]
 [5.23423923 5.75766315 6.28108707]]
>>> print([np.sqrt(np.square(pppp).sum()) for pppp in ppp])
[2.5, 5.0, 7.5, 10.0]
>>> ppp = interesting_point_transform(xx, 10.0, 2)
>>> print(ppp)
[[0.74535599 1.49071198]
 [2.20132119 2.50305736]
 [3.200922   3.8411064 ]
 [4.39003072 5.01717796]
:rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`\~numpy.ndarray\``

[5.66154475 6.11484713] [6.75724629 7.3715414 ]]

moptipyapps.dynamic_control.starting_points.make_interesting_starting_points(n, other, log=True)[source]

Create some reasonably diverse starting points.

Parameters:
  • n (int) – the number of starting points

  • other (Iterable[Iterable[float]]) – the other points

  • log (bool, default: True) – write log output

Return type:

ndarray

Returns:

the starting points

>>> p = make_interesting_starting_points(
...     3, np.array([[1.0, 2.0], [3.0, 2.9]]), False)
>>> print(",".join(";".join(f"{x:.5f}" for x in row) for row in p))
1.77767;0.33029,1.10482;3.44328,3.18515;-4.39064
>>> p = make_interesting_starting_points(
...     3, np.array([[1.0, 2.0, 7.0], [3.0, 2.9, 1.1]]), False)
>>> print(",".join(";".join(f"{x:.5f}" for x in row) for row in p))
1.51392;-2.66567;0.86153,3.26154;-5.07757;2.03484,3.75627;5.88214;6.52310

moptipyapps.dynamic_control.surrogate_optimizer module

A surrogate system model-based Optimization approach.

In the real world, we want to synthesize a controller c(s, p) that can drive a dynamic system into a good state. The controller receives as input the current state s, say, from sensor readings. It can also be parameterized by a vector p, imagine c to be, for example, an artificial neural network and then p would be its weight vector. The output of c will influence the system in some way. In our example experiments, this is done by becoming part of the state differential ds/dt. Anyway, in the real world, the controller may steer the rotation speed of a rotor or the angles of rotor blades or whatever. Now you can imagine that doing real-world experiments is costly and takes a long time. Everytime we want to test a parameterization p, some form experiment, maybe in a wind tunnel, has to be done.

So it would be beneficial if we could replace the actual experiment by a simulation. This would mean that we learn a model M that can compute the state change ds/dt based on the current state s and controller output c at reasonable accuracy. If we had such a computational model, then we could run the controller optimization process on that model. Once finished, we could apply and evaluate the best controller that we have discovered in a real experiment. With the observed behavior of the actual controller system, we may even be able to update and improve our system model to become more accurate. So we could alternate between real experiments and optimization runs on the simulation. Of course, we would always need to do some real experiments at first to gather the data to obtain our initial model M. But if we can get this to work, then we could probably get much better controllers with fewer actual experiments.

This here is an algorithm that tries to implement the above pattern. This algorithm employs three different sub-algorithms:

  1. For sampling the initial fes_for_warmup FEs, it uses a warm-up algorithm - by default, this is done by random_sampling.

  2. Then, it spends fes_for_training steps on the collected data to train the model using the model training algorithm, which, by default, is BiPopCMAES.

  3. Then, it spends fes_per_model_run steps to train controllers on the model, by default again with a BiPopCMAES.

For the model and controller optimization, it thus uses by default the BiPop-CMA-ES offered by moptipy (BiPopCMAES). But it uses two instances of this algorithm, namely one to optimize the controller parameters and one that optimizes the model parameters. And, as said, a random sampling method to gather the initial samples.

The idea is that we divide the computational budget into a warmup and a model-based phase. In the warmup phase, we use CMA-ES to normally optimize the controller based on the actual simulation of the dynamic system. However, while doing so, for every time step in the simulation, we collect three things: The current state vector s, the control vector c, and the resulting state differential ds/dt. Now if we have such data, we can look at the dynamic system as a function D(s, c) = ds/dt. If we consider the dynamic system to be such a function and we have collected the vectors (s, c) and ds/dt, then we may as well attempt to learn this system. So after the warmup phase, our algorithm does the following: In a loop (for the rest of the computational budget), it first tries to learn a model M of D. Then, it replaces the actual differential equations of the system in ODE solution approach of the objective function with M. In other words, we kick out the actual system and instead use the learned system model M. We replace the differential equations that describe the system using M. We can now run an entire optimization process on this learned model only, with ODE integration and all. This optimization process gives us one new solution which we then evaluate on the real objective function (which costs 1 FE and gives us a new heap of (s, c) and ds/dt vectors). With this new data, we again learn a new and hopefully more accurate model M. This process is iterated until the rest of the computational budget is exhausted.

This approach hopefully allows us to learn a model of a dynamic system while synthesizing a controller for it. Since we can have infinitely more time to synthesize the controller on a learned system model compared to an actual model, this may give us much better results.

The starting points of the work here were conversations with Prof. Dr. Bernd NOACK and Guy Yoslan CORNEJO MACEDA of the Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)).

class moptipyapps.dynamic_control.surrogate_optimizer.SurrogateOptimizer(system_model, controller_space, objective, fes_for_warmup, fes_for_training=None, ms_for_training=None, fes_per_model_run=None, ms_per_model_run=None, fancy_logs=False, warmup_algorithm=<function SurrogateOptimizer.<lambda>>, model_training_algorithm=<function _bpcmaes>, controller_training_algorithm=<function _bpcmaes>)[source]

Bases: Algorithm

A surrogate model-based CMA-ES algorithm.

fancy_logs: Final[bool]

should we do fancy logging?

fes_for_training: Final[int | None]

the FEs for training the model

fes_for_warmup: Final[int]

the number of objective function evaluations to be used for warmup

fes_per_model_run: Final[int | None]

the FEs for each run on the model

initialize()[source]

Initialize.

Return type:

None

log_parameters_to(logger)[source]

Log all parameters of this component as key-value pairs.

Parameters:

logger (KeyValueLogSection) – the logger for the parameters

Return type:

None

ms_for_training: Final[int | None]

the ms for training the model

ms_per_model_run: Final[int | None]

the ms for each run on the model

solve(process)[source]

Solve the modelling problem.

This function begins by spending fes_for_warmup objective function evaluations (FEs) on the actual problem, i.e., by trying to synthesize controllers for the “real” system, using process to evaluate the controller performance. The objective function passed to the constructor (an instance of FigureOfMerit) must be used by process as well. This way, during the warm-up phase, we can collect tuples of the (system state, controller output) and the resulting system differential for each simulated time step. This is done via set_raw().

After the warm-up phase, we can obtain these collected data via get_differentials(). The data is then used to train a model via the model objective function model_objective. The system model is again basically a Controller which is parameterized appropriately. For this, we use a CMA-ES algorithm for fes_for_training FEs.

Once the model training is completed, we switch the objective function to use the model instead of the actual system for evaluating controllers, which is done via set_model(). We then train a completely new controller on the model objective function. Notice that now, the actual system is not involved at all. We do this again using a CMA-ES algorithm for fes_per_model_run FEs.

After training the controller, we can evaluate it on the real system using the evaluate() method of the actual process (after switching back to the real model via set_raw()). This nets us a) the actual controller performance and b) a new set of (system state, controller output) + system state differential tuples.

Since we now have more data, we can go back and train a new system model and then use this model for another model-based optimization run. And so on, and so on. Until the budget is exhausted.

Parameters:

process (Process) – the original optimization process, which must use the objective function (an instance of FigureOfMerit) as its objective function.

Return type:

None

system_model: Final[SystemModel]

the system model

moptipyapps.dynamic_control.system module

A class to model a dynamic system governed by differential equations.

A system has a current state vector at any point in time. The state changes over time based on differential equations. These equations can be influenced by the output of a controller. Our System presents the state dimension and differential equations. It also presents several starting states for simulating the system. The starting states are divided into training and testing states. The training states can be used for, well, training controllers to learn how to handle the system. The testing states are then used to verify whether a synthesized controller actually works.

Examples for different dynamic systems are given in package systems.

class moptipyapps.dynamic_control.system.System(name, state_dims, control_dims, state_dim_mod, state_dims_in_j, gamma, test_starting_states, training_starting_states, test_steps=5000, test_time=50.0, training_steps=1000, training_time=50.0, plot_examples=(0,))[source]

Bases: Component

A class for governing a system via differential system.

control_dims: Final[int]

the dimensions of the controller output

describe_system(title, controller, parameters, base_name, dest_dir, skip_if_exists=False)[source]

Describe the performance of a given system of system.

Parameters:
  • title (str | None) – the title

  • controller (Callable[[ndarray, float, TypeVar(T), ndarray], None]) – the controller to simulate

  • parameters (TypeVar(T)) – the controller parameters

  • base_name (str) – the base name of the file to produce

  • dest_dir (str) – the destination directory

  • skip_if_exists (bool, default: False) – if the file already exists

Return type:

tuple[Path, ...]

Returns:

the paths of the generated files

describe_system_without_control(dest_dir, skip_if_exists=True)[source]

Describe the performance of a given system of system.

Parameters:
  • dest_dir (str) – the destination directory

  • skip_if_exists (bool, default: True) – if the file already exists

Return type:

tuple[Path, ...]

Returns:

the paths of the generated files

equations(state, time, control, out)[source]

Compute the values of the differential equations to be simulated.

Parameters:
  • state (ndarray) – the state of the system

  • time (float) – the time index

  • control (ndarray) – the output of the controller

  • out (ndarray) – the differential, i.e., the output of this function

Return type:

None

gamma: Final[float]

The Weight of the control values in the figure of merit computation.

log_parameters_to(logger)[source]

Log all parameters of this component as key-value pairs.

Parameters:

logger (KeyValueLogSection) – the logger for the parameters

Return type:

None

name: Final[str]

the name of the model

plot_examples: Final[tuple[int, ...]]

the plot examples

plot_points(dest_dir, skip_if_exists=True)[source]

Plot the training and testing points of the equation.

Parameters:
  • dest_dir (str) – the destination directory

  • skip_if_exists (bool, default: True) – True to skip existing files, False otherwise

Return type:

Path

Returns:

the plotted file

state_dim_mod: Final[int]

The modulus for the state dimensions for plotting

state_dims: Final[int]

the dimensions of the state variable

state_dims_in_j: Final[int]

The number of dimensions used in the J computation

test_starting_states: Final[ndarray]

the test starting states

test_steps: Final[int]

the test simulation steps

test_time: Final[float]

the test time

training_starting_states: Final[ndarray]

the test starting states

training_steps: Final[int]

the training simulation steps

training_time: Final[float]

the training time

class moptipyapps.dynamic_control.system.T

the type variable for ODE controller parameters

alias of TypeVar(‘T’)

moptipyapps.dynamic_control.system_model module

An extended dynamic control problem Instance with a model for the dynamics.

An Instance is a combination of

  • a set of differential equations (system) that govern the change D=ds/dt of the state s of a dynamic system (based on its current state s and the controller output c(s)), i.e., ds/dt=D(s,c(s)) and

  • a blueprint c(s, p) of the controller c(s) (controller).

The blueprint of the controller is basically a function that can be parameterized, so it is actually a function c(s, p) and the goal of optimization is to parameterize it in such a way that the figure of merit, i.e., the objective value (objective) of the system (usually the sum of squared state values and squared controller outputs) is minimized. Parameterization means finding good values p such that the above goal is reached. In other words, we want to synthesize a controller (by finding good values of p) in such a way that the state equations drive the system into a stable state, usually ideally the origin of the coordinate system.

Now here this Instance is extended to a SystemModel by adding a parameterized model M(s, c(s), q) to the mix. The idea is to develop the parameterization q of the model M that can replace the actual system equations D. Such a model receives as input the current system state vector s and the controller output c(s) for the state vector s. It will return the differential D of the system state, i.e., ds/dt. In other words, a properly constructed model can replace the equations parameter in the ODE integrator run_ode(). The input used for training is provided by diff_from_ode().

What we do here is to re-use the same function models as used in controllers (controller) and learn their parameterizations from the observed data. If successful, we can wrap everything together into a Callable and plug it into the system instead of the original equations.

The thing that SystemModel offers is thus a blueprint of the model M. Obviously, we can conceive many different such blueprints. We could have a linear model, a quadratic model, or maybe neural network (in which case, we need to decide about the number of layers and layer sizes). So an instance of this surrogate model based approach has an equation system, a controller blueprint, and a model blueprint.

An example implementation of the concept of synthesizing models for a dynamic system in order to synthesize controllers is given as the surrogate_optimizer algorithm. Examples for different dynamic systems controllers (which we here also use to model the systems themselves) are given in package controllers.

The starting points of the work here were conversations with Prof. Dr. Bernd NOACK and Guy Yoslan CORNEJO MACEDA of the Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)).

class moptipyapps.dynamic_control.system_model.SystemModel(system, controller, model)[source]

Bases: Instance

A dynamic system control Instance with a system model blueprint.

log_parameters_to(logger)[source]

Log all parameters of this component as key-value pairs.

Parameters:

logger (KeyValueLogSection) – the logger for the parameters

Return type:

None

model: Final[Controller]

the model blueprint that can be trained to hopefully replace the system in the ODE integration / system simulation procedure