moptipyapps.dynamic_control package¶
Examples of dynamic control problems.
Here we have examples for dynamic control problems. An instance
of the dynamic control problem is composed of
a
system
of differential equations that govern the state of a system and that also incorporate the output of a controller,a
controller
blueprint, i.e., a function that can be parameterized and that computes a controller output from the system state, andan
objective
that rates how well-behaved the system driven by the controller is in a simulation.
Such systems can serve as primitive testbeds of actual dynamic control scenarios such as flight control or other fluid dynamic systems. Different from such more complicated systems, they are much faster and easier to simulate, though. So we can play with them much more easily and quickly.
The initial starting point of the work here were conversations with Prof. Dr. Bernd NOACK and Guy Yoslan CORNEJO MACEDA of the Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)) as well as the following two MSc theses and book:
Yuxiang LI (李宇翔). Jet Mixing Enhancement using Deep Reinforcement Learning (基于深度强化学习的射流混合增强控制). MSc Thesis. Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)). January 2023.
Wei SUN (孙伟). Wake Control of 1-2-3 Fluidic Pinball using Deep Reinforcement Learning (基于深度强化学习方法的 1-2-3 流体弹球尾流控制). MSc Thesis. Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)). January 2023.
Guy Yoslan CORNEJO MACEDA, François LUSSEYRAN, and Bernd R. NOACK. xMLC: A Toolkit for Machine Learning Control, First Edition. Machine Learning Tools in Fluid Mechanics, Vol 2. Shenzhen & Paris; Universitätsbibliothek der Technischen Universität Braunschweig. 2022 https://doi.org/10.24355/dbbs.084-202208220937-0
Subpackages¶
- moptipyapps.dynamic_control.controllers package
- Submodules
- moptipyapps.dynamic_control.controllers.ann module
- moptipyapps.dynamic_control.controllers.codegen module
- moptipyapps.dynamic_control.controllers.cubic module
- moptipyapps.dynamic_control.controllers.linear module
- moptipyapps.dynamic_control.controllers.min_ann module
- moptipyapps.dynamic_control.controllers.partially_linear module
- moptipyapps.dynamic_control.controllers.peaks module
- moptipyapps.dynamic_control.controllers.predefined module
- moptipyapps.dynamic_control.controllers.quadratic module
- moptipyapps.dynamic_control.systems package
Submodules¶
moptipyapps.dynamic_control.controller module¶
A base class for implementing controllers.
A controller basically is a parameterizable function that receives the current state and time of a system
as input and computes one or multiple controller values as output. These controller values are then used to influence how the state of the system changes in the next iteration. In the dynamic systems control optimization task, the goal is to find the right parameterization for the controller such that an objective
is minimized.
Examples for different controllers for dynamic systems are given in package controllers
.
- class moptipyapps.dynamic_control.controller.Controller(name, state_dims, control_dims, param_dims, func=None)[source]¶
Bases:
Component
A class for governing a system via differential equations.
- log_parameters_to(logger)[source]¶
Log all parameters of this component as key-value pairs.
- Parameters:
logger (
KeyValueLogSection
) – the logger for the parameters- Return type:
moptipyapps.dynamic_control.experiment_raw module¶
An example experiment for dynamic control.
In this experiment, we try to synthesize (i.e., parameterize) controllers (controller
) that steer a dynamic system (system
) into a state by using a figure of merit (objective
) which minimizes both the squared system state and controller effort.
A model-based experiment variant is given in experiment_surrogate
.
- moptipyapps.dynamic_control.experiment_raw.make_instances()[source]¶
Create the instances to be used in the dynamic control experiment.
moptipyapps.dynamic_control.experiment_surrogate module¶
An example experiment for dynamic control using surrogate system models.
In this experiment, we again try to synthesize (i.e., parameterize) controllers (controller
) that steer a dynamic system (system
) into a state by using a figure of merit (objective
) which minimizes both the squared system state and controller effort.
The difference compared to experiment_raw
is that we also try to synthesize a system model at the same time. We employ the procedure detailed in surrogate_optimizer
for this purpose.
Word of advice: This experiment takes extremely long and needs a lot of memory!
The starting points of the work here were conversations with Prof. Dr. Bernd NOACK and Guy Yoslan CORNEJO MACEDA of the Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)).
- moptipyapps.dynamic_control.experiment_surrogate.MAX_FES: Final[int] = 64¶
the total objective function evaluations
- moptipyapps.dynamic_control.experiment_surrogate.base_setup(instance)[source]¶
Create the basic setup.
- Parameters:
instance (
Instance
) – the instance to use- Return type:
- Returns:
the basic execution
- moptipyapps.dynamic_control.experiment_surrogate.cmaes_raw(instance)[source]¶
Create the Bi-Pop-CMA-ES setup.
- moptipyapps.dynamic_control.experiment_surrogate.cmaes_surrogate(instance, fes_for_warmup=16, fes_for_training=128, fes_per_model_run=128, fancy_logs=True)[source]¶
Create the Bi-Pop-CMA-ES setup.
- Parameters:
instance (
SystemModel
) – the problem instancefes_for_warmup (
int
, default:16
) – the FEs to be used for warmupfes_for_training (
int
, default:128
) – the milliseconds for trainingfes_per_model_run (
int
, default:128
) – the milliseconds per model runfancy_logs (
bool
, default:True
) – should we do fancy logging?
- Return type:
- Returns:
the setup
- moptipyapps.dynamic_control.experiment_surrogate.make_instances()[source]¶
Create the instances to be used in the dynamic control experiment.
- Return type:
Iterable
[Callable
[[],SystemModel
]]- Returns:
the instances to be used in the dynamic control experiment.
moptipyapps.dynamic_control.instance module¶
An instance of the dynamic control synthesis problem.
An instance of the dynamic control synthesis problem is comprised of two components: a system
of differential equations governing how the state of a system changes over time and a controller
that uses the current system state as input and computes a controller value as output that influences the state change. Notice that the controller here is a parametric function. The goal of the dynamic system control is to find the right parameterization of the controller such that an objective
is minimized. The objective here usually has the goal to bring the dynamic system into a stable state while using as little controller “energy” as possible.
An instance of the simultaneous control and model synthesis problem is an instance of the class SystemModel
, which is a subclass of Instance
. It also adds a controller blueprint for modelling the systems response (state differential) based on the system state and controller output.
The starting point of the work here were conversations with Prof. Dr. Bernd NOACK and Guy Yoslan CORNEJO MACEDA of the Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)).
- class moptipyapps.dynamic_control.instance.Instance(system, controller, name_base=None)[source]¶
Bases:
Component
An instance of the dynamic control problem.
- controller:
Final
[Controller
]¶ the controller applied to the system
- describe_parameterization(title, parameters, base_name, dest_dir)[source]¶
Describe the performance of a given system of system.
- log_parameters_to(logger)[source]¶
Log all parameters of this component as key-value pairs.
- Parameters:
logger (
KeyValueLogSection
) – the logger for the parameters- Return type:
- controller:
moptipyapps.dynamic_control.model_objective module¶
An objective function that can be used to synthesize system models.
We consider the dynamic system to be a function D(s, c) = ds/dt, where
s is the state vector (e.g., two-dimensional for the Stuart-Landau system, see
stuart_landau
, and three-dimensional for the Lorenz system, seelorenz
),c is the control vector (one-dimensional in both cases), and
ds/dt is the state differential (again two respectively three-dimensional).
The FigureOfMerit
objective function allows us to collect tuples of (s,c) and ds/dt vectors. So all we need to do is to train a model M that receives as input a vector x=(s,c) (where (s,c) be the concatenation of s and c) and fill as output a vector ds/dt.
The objective function in this module minimizes the root mean square error over the model-computed ds/dt vectors and the actual ds/dt vectors. The model objective function is used by the surrogate_optimizer
algorithm.
- class moptipyapps.dynamic_control.model_objective.ModelObjective(real, model)[source]¶
Bases:
Objective
The objective for modeling.
This objective function works on sequences of tuples (s, c) of the system state s and controller output c as well as the corresponding ds/dt differentials. The goal is to train a model M(s, c, q) that will compute the ds/dt values reasonably accurately. The model is parameterized with some vector q, think of M being, e.g., an artificial neural network and q being its weight vector. To find good values of q, this objective here computes the squared differences between the values M(s, c, q) and the expected outputs ds/dt.
These squared differences are then either averaged directly or the expm1(mean(logp1(…)))) hack of the
FigureOfMeritLE
is used to alleviate the impact of large differentials, depending on which “real” (controller-synthesis) objective is passed into the constructor).The tuples (s, c) and ds/dt are pulled from the real objective with the method
begin()
. The ds/dt rows are flattened for performance reasons.- begin()[source]¶
Begin a model optimization run.
This function pulls the training data from the actual controller-synthesis objective function, which is an instance of
FigureOfMerit
, via the methodget_differentials()
and allocates internal data structures accordingly.- Return type:
- log_parameters_to(logger)[source]¶
Log all parameters of this component as key-value pairs.
- Parameters:
logger (
KeyValueLogSection
) – the logger for the parameters- Return type:
moptipyapps.dynamic_control.objective module¶
An objective functions for the dynamic control problem.
The dynamic control problem means that our system starts in a given state and we try to move it to a stable state by using control effort. Controllers are trained over several training states, for each of which we can compute a figure of merit.
We offer two different approaches for this:
FigureOfMerit
computes the arithmetic mean z over the separate figures of merit J of the training cases.FigureOfMeritLE
tries to smooth out the impact of bad starting states by computing exp(mean[log(J + 1)]) - 1.
These objective functions also offer a way to collect the state+control and corresponding differential vectors.
- class moptipyapps.dynamic_control.objective.FigureOfMerit(instance, supports_model_mode=False)[source]¶
Bases:
Objective
A base class for figures of merit.
- get_differentials()[source]¶
Get the collected differentials.
If supports_model_mode was set to True in the creating of this objective function, then the system will gather tuples (s, c) and ds/dt when in raw mode (see
set_raw()
) and make them available here to train system models (seeset_model()
). Notice that gathering training data is a very memory intense process.
- log_parameters_to(logger)[source]¶
Log all parameters of this component as key-value pairs.
- Parameters:
logger (
KeyValueLogSection
) – the logger for the parameters- Return type:
- lower_bound()[source]¶
Get the lower bound of the figure of merit, which is 0.
- Return type:
- Returns:
0.0
- set_model(equations)[source]¶
Set the model-driven mode for the evaluation.
In this modus, the internal system equations are replaced by the callable equations passed into this function and the data collection is stopped. The idea is that equations could be a model synthesized on the data gathered (see
get_differentials()
) and thus does not represent the actual dynamic system but a model thereof. We could synthesize a controller for this model and for this purpose would use the exactly same objective function – just instead of using the actual system equations, we use the system model. Of course, we then need to deactivate the data gathering mechanism (see againget_differentials()
), because the data would then not be real system data. You can toggle back to the actual system usingset_raw()
.
- set_raw()[source]¶
Let this objective work on the original system equations.
The objective function here can be used in two modi: a) based on the original systems model, as given in
system
, or b) on a learned model of the system. This function here toggles to the former mode, i.e., to the actual system mode. In this modus, training data for training the system model will be gathered if the objective function is configured to do so. In that case, you can toggle to model mode viaset_model()
.- Return type:
- sum_up_results(results)[source]¶
Compute the final objective value from several single J values.
When synthesizing controllers, we do not just apply them to a single simulation run. Instead, we use multiple training cases (see
training_starting_states
) and performtraining_steps
simulation steps on each of them. Each such training starting state will result in a single J value, which is the sum of squared state and control values. We now compute the end objective value from these different J values by using this function here.This will destroy the contents of results.
- class moptipyapps.dynamic_control.objective.FigureOfMeritLE(instance, supports_model_mode=False)[source]¶
Bases:
FigureOfMerit
Compute a exp(mean(log(z+1)))-1 over the figures of merit z.
Different from
FigureOfMerit
, we compute the mean of log(z + 1) where z be the figures of merit of the single training cases. We then return exp(mean[log(z + 1)]) - 1 as final result. The goal is to reduce the impact of training cases that require more control effort.If we solve the dynamic control problem for diverse training cases, then we may have some very easy cases, where the system just needs a small control impulse to move into a stable and cheap state. Others may have very far out and expensive starting states that require lots of control efforts to be corrected. If we simply average over all states, then these expensive states will dominate whatever good we are doing in the cheap states. Averaging over the log(J+1) reduces such impact. We then compute exp[…]-1 of the result as cosmetics to get back into the original range of the figure of merits.
moptipyapps.dynamic_control.ode module¶
A primitive integrator for systems of ordinary differential equations.
Many dynamic systems can be modeled as systems of ordinary differential equations that govern their progress over time. Trying to find out in which state such systems are at a given point in time means to integrate these equations until that point in time (starting from a starting state).
What we want to play around with, however, is synthesizing controllers. In this case, the differential equations also merge the output of the controller with the current state. If the controller behaves inappropriately, this may make the system diverge, i.e., some of its state variables go to infinity over time or sometimes rather quickly.
Using ODE integrators that compute the system state at pre-defined time steps is thus cumbersome, as the system may have already exploded at these goal times. Therefore, we perform ODE integration in several steps. First, we try it the “normal” way. However, as soon as the system escapes the sane parameter range, we stop. We then use the last point where the system was stable and the first point where it escaped the reasonable range to estimate a new reasonable end time for the integration. We do this until we finally succeed.
Thus, we can simulate a well-behaved system over a long time and an ill-behaved system for a shorter time period. Neither system will diverge.
The following functions are provided:
run_ode()
executes a set of differential system equations and controller equations and returns an array with the system state, controller output, and time at the different interpolation steps.t_from_ode()
returns the total time over which the result ofrun_ode()
was simulated.j_from_ode()
returns a figure of merit, i.e., a weighted sum of (a part of) the system state and the controller output from the result ofrun_ode()
.diff_from_ode()
extracts state difference information from the result ofrun_ode()
.
- class moptipyapps.dynamic_control.ode.T¶
the type variable for ODE controller parameters
alias of TypeVar(‘T’)
- moptipyapps.dynamic_control.ode.diff_from_ode(ode, state_dim)[source]¶
Compute all the state+control vectors and the resulting differentials.
This function returns two matrices. Each row of both matrices corresponds to a time slot. Each row in the first matrix holds the state vector and the control vector (that was computed by the controller). The corresponding row in the second matrix then holds the state differential resulting from the control vector being applied in the differential equations that govern the system state change.
The idea is that this function basically provides the data that we would like to learn when training a surrogate model for a system: From the current state and the computed control vector, we want that our model can give us the resulting system differential. If we have such a model and it works reasonably well, then we could essentially plug this model into
run_ode()
instead of the original equations parameter.What this function does to compute the differential is to basically “invert” the dynamic weighting done by
run_ode()
.run_ode()
starts in a given starting state s. It then computes the control vector c as a function of s, i.e., c(s). Then, the equations of the dynamic system (see modulesystem
) to compute the state differential D=ds/dt as a function of c(s) and s, i.e., as something like D(s, c(s)). The next step would be to update the state, i.e., to set s=s+D(s, c(s)). Unfortunately, this can make s go to infinity. Sorun_ode()
will compute a dynamic weight w and do s=s+w*D(s, c(s)), where w is chosen such that the state vector s does not grow unboundedly. While s and c(s) and w are stored in one row of the result matrix ofrun_ode()
, s+w*D(s,c(s)) is stored as state s in the next row. So what this function here basically does is to subtract the old state from the next state and divide the result by w to get D(s, c(s)). s and c(s) are already available directly in the ODE result and w is not needed anymore.We then get the rows s, c(s) and D(s, c(s)) in the first and second result matrix, respectively. This can then be used to train a system model as proposed in model
system_model
.- Parameters:
- Return type:
- Returns:
a tuple of the state+control vectors and the resulting state differential vectors
>>> od = np.array([ ... [0, 0, 0, 0, 0, 0], # state 0,0,0; control 0,0; time 0 ... [1, 2, 3, 4, 5, 1], # state 1,2,3; control 4,5; time 1 ... [2, 3, 4, 5, 6, 3], # state 2,3,4; control 5,6; time 3 ... [4, 6, 8, 7, 7, 7]]) # state 4,6,8; control 7,7, time 7 >>> res = diff_from_ode(od, 3) >>> res[0] # state and control vectors, time col and last row removed array([[0, 0, 0, 0, 0], [1, 2, 3, 4, 5], [2, 3, 4, 5, 6]]) >>> res[1] # (state[i + 1] - state[i]) / (time[i + 1] / time[i]) array([[1. , 2. , 3. ], [0.5 , 0.5 , 0.5 ], [0.5 , 0.75, 1. ]])
- moptipyapps.dynamic_control.ode.j_from_ode(ode, state_dim, use_state_dims=-1, gamma=0.1)[source]¶
Compute the original figure of merit from an ODE array.
The figure of merit is the sum of state variable squares plus 0.1 times the control variable squares. We disregard the state variable values of the starting states (because they are the same for all controllers on a given training case and because the control cannot influence them) and we also disregard the final state and final controller output (as there is no time slice associated with them, i.e., we only “arrive” in them but basically spent 0 time in them in our simulation).
- Parameters:
- Return type:
- Returns:
the figure of merit
>>> od = np.array([[1, 2, 3, 4, 0], ... [5, 6, 7, 8, 1], ... [9, 6, 4, 3, 3], ... [7, 4, 2, 1, 7]]) >>> sta_dim = 3 >>> print(f"{j_from_ode(od, sta_dim):.10f}") 110.0000000000 >>> print((1.6 + 12.8 + 98 + 72 + 50 + 3.6 + 64 + 144 + 324) / 7) 110.0 >>> sta_dim = 3 >>> print(f"{j_from_ode(od, 3, 2, 0.5):.10f}") 97.1428571429 >>> print((8 + 64 + 72 + 50 + 18 + 144 + 324) / 7) 97.14285714285714
- moptipyapps.dynamic_control.ode.multi_run_ode(test_starting_states, training_starting_states, collector, equations, controller, parameters, controller_dim=1, test_steps=5000, test_time=50.0, training_steps=5000, training_time=50.0, use_state_dims=-1, gamma=0.1)[source]¶
Invoke
run_ode()
multiple times and pass the result to collector.This function allows us to perform multiple runs of the differential equation simulator, using different starting points. It also allows us to distinguish training and test points and to assign them different numbers of steps. For each of them,
run_ode()
will be applied and the returned matrix is passed to the collector function.- Parameters:
test_starting_states (
Iterable
[ndarray
]) – the iterable of test starting statestraining_starting_states (
Iterable
[ndarray
]) – the iterable of training starting statescollector (
Union
[Callable
[[int
,ndarray
,float
,float
],None
],Iterable
[Callable
[[int
,ndarray
,float
,float
],None
]]]) – the destination to receive the results, in the form of index, ode array, j, and t.equations (
Callable
[[ndarray
,float
,ndarray
,ndarray
],None
]) – the differential systemcontroller (
Callable
[[ndarray
,float
,TypeVar
(T
),ndarray
],None
]) – the controller functionparameters (
TypeVar
(T
)) – the controller parameterscontroller_dim (
int
, default:1
) – the dimension of the controller resulttest_steps (
int
, default:5000
) – the number of test steps to simulatetest_time (
float
, default:50.0
) – the time limit for teststraining_steps (
int
, default:5000
) – the number of training steps to simulatetraining_time (
float
, default:50.0
) – the time limit for traininguse_state_dims (
int
, default:-1
) – the dimension until which the state is used, -1 for using the complete stategamma (
float
, default:0.1
) – the weight of the controller input
- Return type:
- moptipyapps.dynamic_control.ode.run_ode(starting_state, equations, controller, parameters, controller_dim=1, steps=5000, max_time=50.0)[source]¶
Simulate a set of controlled differential system.
The system begins in the starting state stored in the vector starting_state. In each time step, first, the controller is invoked. It receives the current system state vector as input, the current time t, its parameters (parameters), and an output array to store its computed control values into. This array has dimension controller_dim, which usually is 1. Then, the function system will be called and receives the current state vector, the time step t, and the controller output as input, as well as an output array to store the result of the differential into. This output array has the same dimension as the state vector.
Now the run_ode function simulates such a system over steps time steps over the closed interval [0, max_time]. If both the system and the controller are well-behaved, then the output array will contain steps rows with the state, controller, and time information of each step. If the system diverges at some point in time but we can simulate it reasonably well before that, then we try to simulate steps, but on a shorter time frame. If even that fails, you will get a single row output with 1e100 as the controller value.
This function returns a matrix where each row corresponds to a simulated time step. Each row contains three components in a concatenated fashion: 1. the state vector, 2. the control vector, 3. the time value
- Parameters:
starting_state (
ndarray
) – the startingequations (
Callable
[[ndarray
,float
,ndarray
,ndarray
],None
]) – the differential systemcontroller (
Callable
[[ndarray
,float
,TypeVar
(T
),ndarray
],None
]) – the controller functionparameters (
TypeVar
(T
)) – the controller parameterscontroller_dim (
int
, default:1
) – the dimension of the controller resultsteps (
int
, default:5000
) – the number of steps to simulatemax_time (
float
, default:50.0
) – the maximum time to simulate for
- Return type:
- Returns:
a matrix where each row represents a point in time, composed of the current state, the controller output, and the length of the time slice
If we simulate the flight of a projectile with our ODE execution, then both the flight time as well as the flight length are about 0.12% off from what the mathematical solution of the flight system prescribe. That’s actually not bad for a crude and fast integration method…
>>> v = 100.0 >>> angle = np.deg2rad(45.0) >>> v_x = v * np.cos(angle) >>> print(f"{v_x:.10f}") 70.7106781187 >>> v_y = v * np.sin(angle) >>> print(f"{v_y:.10f}") 70.7106781187 >>> def projectile(position, ttime, ctrl, out): ... out[0] = 70.71067811865474 ... out[1] = 70.71067811865474 - ttime * 9.80665 >>> param = 0.0 # ignore >>> def contrl(position, ttime, params, dest): ... dest[0] = 0.0 # controller that does nothing >>> strt = np.array([0.0, 1.0]) >>> ode = run_ode(strt, projectile, contrl, param, 1, 10000) >>> print(len(ode)) 10000 >>> time_of_flight = 2 * v_y / 9.80665 >>> print(f"{time_of_flight:.10f}") 14.4209649817 >>> travel_distance_x = time_of_flight * v_x >>> print(f"{travel_distance_x:.10f}") 1019.7162129779 >>> idx = np.argwhere(ode[:, 1] <= 0.0)[0][0] >>> print(idx) 2887 >>> print(f"{ode[idx - 1, 0]:.10f}") 1020.4571309653 >>> print(f"{ode[idx, 0]:.10f}") 1020.8107197148 >>> print(f"{ode[idx - 1, -1]:.10f}") 14.4314431443 >>> print(f"{ode[idx, -1]:.10f}") 14.4364436444 >>> print(ode[-1, -1]) 50.0
>>> def contrl2(position, ttime, params, dest): ... dest[0] = 1e50 # controller that is ill-behaved >>> run_ode(strt, projectile, contrl2, param, 1, 10000) array([[0.e+000, 1.e+000, 1.e+100, 0.e+000]])
>>> def contrl3(position, ttime, params, dest): ... dest[0] = 1e50 if ttime > 10 else 0.0 # diverging controller >>> ode = run_ode(strt, projectile, contrl3, param, 1, 10000) >>> print(len(ode)) 10000 >>> print(ode[-1]) [690.10677249 224.06765771 0. 9.75958357]
>>> def projectile2(position, ttime, ctrl, out): ... out[:] = 0 >>> ode = run_ode(strt, projectile2, contrl, param, 1, 10000) >>> print(len(ode)) 10000 >>> print(ode[-1]) [ 0. 1. 0. 50.] >>> ode = run_ode(strt, projectile2, contrl3, param, 1, 10000) >>> print(len(ode)) 10000 >>> print(ode[-1]) [0. 1. 0. 9.41234557]
- moptipyapps.dynamic_control.ode.t_from_ode(ode)[source]¶
Get the time sum from an ODE solution.
The total time that we simulate a system depends on the behavior of the system.
- Parameters:
- Return type:
- Returns:
the total consumed time
>>> od = np.array([[1, 2, 3, 4, 0.1], ... [5, 6, 7, 8, 0.2], ... [9, 6, 4, 3, 0.3], ... [7, 4, 2, 1, 0.4]]) >>> print(t_from_ode(od)) 0.4
moptipyapps.dynamic_control.results_log module¶
A logger for results gathered from ODE integration to a text file.
- class moptipyapps.dynamic_control.results_log.ResultsLog(state_dim, out)[source]¶
Bases:
AbstractContextManager
A class for logging results via multi_run_ode.
Function
moptipyapps.dynamic_control.ode.multi_run_ode()
can pass its results to various output generating procedures. This class here offers a procedure for writing them to a log file.>>> def projectile(position, ttime, ctrl, out): ... out[0] = 70.71067811865474 ... out[1] = 70.71067811865474 - ttime * 9.80665 >>> param: np.ndarray = np.array([1]) # ignore >>> def contrl(position, ttime, params, dest): ... dest[0] = params[0] # controller that returns param >>> from io import StringIO >>> from moptipyapps.dynamic_control.ode import multi_run_ode >>> with StringIO() as sio: ... with ResultsLog(2, sio) as log: ... multi_run_ode([np.array([0.0, 1.0])], ... [np.array([1.0, 1.0])], ... log.collector, projectile, contrl, param, ... 1, 10000, 14.890, 10000, 14.961) ... x=sio.getvalue() >>> tt = x.split() >>> print(tt[0]) figureOfMerit;totalTime;nSteps;start0;start1;end0;end1 >>> for y in tt[1:]: ... print(";".join(f"{float(v):.3f}" for v in y.split(";"))) 403374.924;14.890;10000.000;0.000;1.000;1052.882;-33.244 407810.750;14.961;10000.000;1.000;1.000;1058.902;-38.616
moptipyapps.dynamic_control.results_plot module¶
An illustrator for results gathered from ODE integration to multi-plots.
This evaluator will create figures where each sub-plot corresponds to one evolution of the system over time. The starting state is marked with a blue cross, the final state with a red one. Both states are connected with a line that marks the progress over time. The line begins with blue color and changes its color over violet/pinkish towards yellow the farther in time the system progresses.
- class moptipyapps.dynamic_control.results_plot.ResultsPlot(dest_file, sup_title, state_dims, plot_indices=(0,), state_dim_mod=0)[source]¶
Bases:
AbstractContextManager
A class for plotting results via multi_run_ode.
Function
moptipyapps.dynamic_control.ode.multi_run_ode()
can pass its results to various output generating procedures. This class here offers a procedure for plotting them to a file.
moptipyapps.dynamic_control.starting_points module¶
Synthesize some interesting starting points.
Here we have some basic functionality to synthesize starting points, i.e., training cases, for the dynamic systems control task. The points synthesized here by function make_interesting_starting_points()
try to fulfill two goals:
the points should be as far away from each other as possible in the state space,
there should be points of many different distances from the state space origin, and
compared to the coordinates of the other points, the coordinates of the synthesized points should be sometimes smaller and sometimes larger (where sometimes should ideally mean “equally often”).
These two goals are slightly contradicting and are achieved by forcing the points to be located on rings of increasing distance from the origin via interesting_point_transform()
while maximizing their mean distance to each other via interesting_point_objective()
. Since make_interesting_starting_points()
is a bit slow, it makes sense to pre-compute the points and then store them as array constants.
- moptipyapps.dynamic_control.starting_points.interesting_point_objective(x, other, max_radius, dim)[source]¶
Compute the point diversity. :rtype:
float
the distance between the points
the distance to the other points
the coordinates in the different dimensions should be equally often smaller and larger than those of the starting points
- moptipyapps.dynamic_control.starting_points.interesting_point_transform(x, max_radius, dim)[source]¶
Transform interesting points.
- Return type:
>>> xx = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12.0]) >>> ppp = interesting_point_transform(xx, 10.0, 3) >>> print(ppp) [[0.6681531 1.33630621 2.00445931] [2.27921153 2.84901441 3.41881729] [3.76928033 4.30774895 4.84621757] [5.23423923 5.75766315 6.28108707]] >>> print(xx) [0.6681531 1.33630621 2.00445931 2.27921153 2.84901441 3.41881729 3.76928033 4.30774895 4.84621757 5.23423923 5.75766315 6.28108707] >>> print([np.sqrt(np.square(pppp).sum()) for pppp in ppp]) [2.5, 5.0, 7.5, 10.0] >>> ppp = interesting_point_transform(xx, 10.0, 3) >>> print(ppp) [[0.6681531 1.33630621 2.00445931] [2.27921153 2.84901441 3.41881729] [3.76928033 4.30774895 4.84621757] [5.23423923 5.75766315 6.28108707]] >>> print([np.sqrt(np.square(pppp).sum()) for pppp in ppp]) [2.5, 5.0, 7.5, 10.0] >>> ppp = interesting_point_transform(xx, 10.0, 2) >>> print(ppp) [[0.74535599 1.49071198] [2.20132119 2.50305736] [3.200922 3.8411064 ] [4.39003072 5.01717796] [5.66154475 6.11484713] [6.75724629 7.3715414 ]]
- moptipyapps.dynamic_control.starting_points.make_interesting_starting_points(n, other, log=True)[source]¶
Create some reasonably diverse starting points.
- Parameters:
- Return type:
- Returns:
the starting points
>>> p = make_interesting_starting_points( ... 3, np.array([[1.0, 2.0], [3.0, 2.9]]), False) >>> print(",".join(";".join(f"{x:.5f}" for x in row) for row in p)) 1.77767;0.33029,1.10482;3.44328,3.18515;-4.39064
>>> p = make_interesting_starting_points( ... 3, np.array([[1.0, 2.0, 7.0], [3.0, 2.9, 1.1]]), False) >>> print(",".join(";".join(f"{x:.5f}" for x in row) for row in p)) 1.51392;-2.66567;0.86153,3.26154;-5.07757;2.03484,3.75627;5.88214;6.52310
moptipyapps.dynamic_control.surrogate_optimizer module¶
A surrogate system model-based Optimization approach.
In the real world, we want to synthesize a controller c(s, p) that can drive a dynamic system into a good state. The controller receives as input the current state s, say, from sensor readings. It can also be parameterized by a vector p, imagine c to be, for example, an artificial neural network and then p would be its weight vector. The output of c will influence the system in some way. In our example experiments, this is done by becoming part of the state differential ds/dt. Anyway, in the real world, the controller may steer the rotation speed of a rotor or the angles of rotor blades or whatever. Now you can imagine that doing real-world experiments is costly and takes a long time. Everytime we want to test a parameterization p, some form experiment, maybe in a wind tunnel, has to be done.
So it would be beneficial if we could replace the actual experiment by a simulation. This would mean that we learn a model M that can compute the state change ds/dt based on the current state s and controller output c at reasonable accuracy. If we had such a computational model, then we could run the controller optimization process on that model. Once finished, we could apply and evaluate the best controller that we have discovered in a real experiment. With the observed behavior of the actual controller system, we may even be able to update and improve our system model to become more accurate. So we could alternate between real experiments and optimization runs on the simulation. Of course, we would always need to do some real experiments at first to gather the data to obtain our initial model M. But if we can get this to work, then we could probably get much better controllers with fewer actual experiments.
This here is an algorithm that tries to implement the above pattern. This algorithm employs three different sub-algorithms:
For sampling the initial
fes_for_warmup
FEs, it uses a warm-up algorithm - by default, this is done byrandom_sampling
.Then, it spends
fes_for_training
steps on the collected data to train the model using the model training algorithm, which, by default, isBiPopCMAES
.Then, it spends
fes_per_model_run
steps to train controllers on the model, by default again with aBiPopCMAES
.
For the model and controller optimization, it thus uses by default the BiPop-CMA-ES offered by moptipy (BiPopCMAES
). But it uses two instances of this algorithm, namely one to optimize the controller parameters and one that optimizes the model parameters. And, as said, a random sampling method to gather the initial samples.
The idea is that we divide the computational budget into a warmup and a model-based phase. In the warmup phase, we use CMA-ES to normally optimize the controller based on the actual simulation of the dynamic system. However, while doing so, for every time step in the simulation, we collect three things: The current state vector s, the control vector c, and the resulting state differential ds/dt. Now if we have such data, we can look at the dynamic system as a function D(s, c) = ds/dt. If we consider the dynamic system to be such a function and we have collected the vectors (s, c) and ds/dt, then we may as well attempt to learn this system. So after the warmup phase, our algorithm does the following: In a loop (for the rest of the computational budget), it first tries to learn a model M of D. Then, it replaces the actual differential equations of the system in ODE solution approach of the objective function with M. In other words, we kick out the actual system and instead use the learned system model M. We replace the differential equations that describe the system using M. We can now run an entire optimization process on this learned model only, with ODE integration and all. This optimization process gives us one new solution which we then evaluate on the real objective function (which costs 1 FE and gives us a new heap of (s, c) and ds/dt vectors). With this new data, we again learn a new and hopefully more accurate model M. This process is iterated until the rest of the computational budget is exhausted.
This approach hopefully allows us to learn a model of a dynamic system while synthesizing a controller for it. Since we can have infinitely more time to synthesize the controller on a learned system model compared to an actual model, this may give us much better results.
The starting points of the work here were conversations with Prof. Dr. Bernd NOACK and Guy Yoslan CORNEJO MACEDA of the Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)).
- class moptipyapps.dynamic_control.surrogate_optimizer.SurrogateOptimizer(system_model, controller_space, objective, fes_for_warmup, fes_for_training=None, ms_for_training=None, fes_per_model_run=None, ms_per_model_run=None, fancy_logs=False, warmup_algorithm=<function SurrogateOptimizer.<lambda>>, model_training_algorithm=<function _bpcmaes>, controller_training_algorithm=<function _bpcmaes>)[source]¶
Bases:
Algorithm
A surrogate model-based CMA-ES algorithm.
- log_parameters_to(logger)[source]¶
Log all parameters of this component as key-value pairs.
- Parameters:
logger (
KeyValueLogSection
) – the logger for the parameters- Return type:
- solve(process)[source]¶
Solve the modelling problem.
This function begins by spending
fes_for_warmup
objective function evaluations (FEs) on the actual problem, i.e., by trying to synthesize controllers for the “real” system, using process to evaluate the controller performance. The objective function passed to the constructor (an instance ofFigureOfMerit
) must be used by process as well. This way, during the warm-up phase, we can collect tuples of the (system state, controller output) and the resulting system differential for each simulated time step. This is done viaset_raw()
.After the warm-up phase, we can obtain these collected data via
get_differentials()
. The data is then used to train a model via the model objective functionmodel_objective
. The system model is again basically aController
which is parameterized appropriately. For this, we use a CMA-ES algorithm forfes_for_training
FEs.Once the model training is completed, we switch the objective function to use the model instead of the actual system for evaluating controllers, which is done via
set_model()
. We then train a completely new controller on the model objective function. Notice that now, the actual system is not involved at all. We do this again using a CMA-ES algorithm forfes_per_model_run
FEs.After training the controller, we can evaluate it on the real system using the
evaluate()
method of the actual process (after switching back to the real model viaset_raw()
). This nets us a) the actual controller performance and b) a new set of (system state, controller output) + system state differential tuples.Since we now have more data, we can go back and train a new system model and then use this model for another model-based optimization run. And so on, and so on. Until the budget is exhausted.
- Parameters:
process (
Process
) – the original optimization process, which must use the objective function (an instance ofFigureOfMerit
) as its objective function.- Return type:
- system_model:
Final
[SystemModel
]¶ the system model
moptipyapps.dynamic_control.system module¶
A class to model a dynamic system governed by differential equations.
A system has a current state vector at any point in time. The state changes over time based on differential equations. These equations can be influenced by the output of a controller
. Our System
presents the state dimension and differential equations. It also presents several starting states for simulating the system. The starting states are divided into training and testing states. The training states can be used for, well, training controllers to learn how to handle the system. The testing states are then used to verify whether a synthesized controller actually works.
Examples for different dynamic systems are given in package systems
.
- class moptipyapps.dynamic_control.system.System(name, state_dims, control_dims, state_dim_mod, state_dims_in_j, gamma, test_starting_states, training_starting_states, test_steps=5000, test_time=50.0, training_steps=1000, training_time=50.0, plot_examples=(0,))[source]¶
Bases:
Component
A class for governing a system via differential system.
- describe_system(title, controller, parameters, base_name, dest_dir, skip_if_exists=False)[source]¶
Describe the performance of a given system of system.
- Parameters:
controller (
Callable
[[ndarray
,float
,TypeVar
(T
),ndarray
],None
]) – the controller to simulateparameters (
TypeVar
(T
)) – the controller parametersbase_name (
str
) – the base name of the file to producedest_dir (
str
) – the destination directoryskip_if_exists (
bool
, default:False
) – if the file already exists
- Return type:
- Returns:
the paths of the generated files
- describe_system_without_control(dest_dir, skip_if_exists=True)[source]¶
Describe the performance of a given system of system.
- equations(state, time, control, out)[source]¶
Compute the values of the differential equations to be simulated.
- log_parameters_to(logger)[source]¶
Log all parameters of this component as key-value pairs.
- Parameters:
logger (
KeyValueLogSection
) – the logger for the parameters- Return type:
- class moptipyapps.dynamic_control.system.T¶
the type variable for ODE controller parameters
alias of TypeVar(‘T’)
moptipyapps.dynamic_control.system_model module¶
An extended dynamic control problem Instance with a model for the dynamics.
An Instance
is a combination of
a set of differential equations (
system
) that govern the change D=ds/dt of the state s of a dynamic system (based on its current state s and the controller output c(s)), i.e., ds/dt=D(s,c(s)) anda blueprint c(s, p) of the controller c(s) (
controller
).
The blueprint of the controller is basically a function that can be parameterized, so it is actually a function c(s, p) and the goal of optimization is to parameterize it in such a way that the figure of merit, i.e., the objective value (objective
) of the system (usually the sum of squared state values and squared controller outputs) is minimized. Parameterization means finding good values p such that the above goal is reached. In other words, we want to synthesize a controller (by finding good values of p) in such a way that the state equations drive the system into a stable state, usually ideally the origin of the coordinate system.
Now here this Instance
is extended to a SystemModel
by adding a parameterized model M(s, c(s), q) to the mix. The idea is to develop the parameterization q of the model M that can replace the actual system equations D. Such a model receives as input the current system state vector s and the controller output c(s) for the state vector s. It will return the differential D of the system state, i.e., ds/dt. In other words, a properly constructed model can replace the equations parameter in the ODE integrator run_ode()
. The input used for training is provided by diff_from_ode()
.
What we do here is to re-use the same function models as used in controllers (controller
) and learn their parameterizations from the observed data. If successful, we can wrap everything together into a Callable and plug it into the system instead of the original equations.
The thing that SystemModel
offers is thus a blueprint of the model M. Obviously, we can conceive many different such blueprints. We could have a linear model, a quadratic model, or maybe neural network (in which case, we need to decide about the number of layers and layer sizes). So an instance of this surrogate model based approach has an equation system, a controller blueprint, and a model blueprint.
An example implementation of the concept of synthesizing models for a dynamic system in order to synthesize controllers is given as the surrogate_optimizer
algorithm. Examples for different dynamic systems controllers (which we here also use to model the systems themselves) are given in package controllers
.
The starting points of the work here were conversations with Prof. Dr. Bernd NOACK and Guy Yoslan CORNEJO MACEDA of the Harbin Institute of Technology in Shenzhen, China (哈尔滨工业大学(深圳)).
- class moptipyapps.dynamic_control.system_model.SystemModel(system, controller, model)[source]¶
Bases:
Instance
A dynamic system control Instance with a system model blueprint.
- log_parameters_to(logger)[source]¶
Log all parameters of this component as key-value pairs.
- Parameters:
logger (
KeyValueLogSection
) – the logger for the parameters- Return type:
- model:
Final
[Controller
]¶ the model blueprint that can be trained to hopefully replace the
system
in the ODE integration / system simulation procedure