forked from secondmind-labs/trieste
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinequality_constraints.pct.py
355 lines (281 loc) · 12.6 KB
/
inequality_constraints.pct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# %% [markdown]
# # Inequality constraints
# %%
import numpy as np
import tensorflow as tf
np.random.seed(1793)
tf.random.set_seed(1793)
# %% [markdown]
# ## The problem
#
# In this tutorial, we replicate one of the results of <cite data-cite="gardner14">[Gardner et al.](http://proceedings.mlr.press/v32/gardner14.html)</cite>, specifically their synthetic experiment "simulation 1", which consists of an objective function with a single constraint, defined over a two-dimensional input domain. We'll start by defining the problem parameters. The constraint is satisfied when `constraint(input_data) <= threshold`.
# %%
from trieste.space import Box
class Sim:
threshold = 0.5
@staticmethod
def objective(input_data):
x, y = input_data[..., -2], input_data[..., -1]
z = tf.cos(2.0 * x) * tf.cos(y) + tf.sin(x)
return z[:, None]
@staticmethod
def constraint(input_data):
x, y = input_data[:, -2], input_data[:, -1]
z = tf.cos(x) * tf.cos(y) - tf.sin(x) * tf.sin(y)
return z[:, None]
search_space = Box([0, 0], [6, 6])
# %% [markdown]
# The objective and constraint functions are accessible as methods on the `Sim` class. Let's visualise these functions, as well as the constrained objective. We get the constrained objective by masking out regions where the constraint function is above the threshold.
# %%
import trieste
import matplotlib.pyplot as plt
from util.inequality_constraints_utils import plot_objective_and_constraints
plot_objective_and_constraints(search_space, Sim)
plt.show()
# %% [markdown]
# We'll make an observer that outputs both the objective and constraint data. Since the observer is outputting multiple datasets, we have to label them so that the optimization process knows which is which.
# %%
from trieste.data import Dataset
OBJECTIVE = "OBJECTIVE"
CONSTRAINT = "CONSTRAINT"
def observer(query_points):
return {
OBJECTIVE: Dataset(query_points, Sim.objective(query_points)),
CONSTRAINT: Dataset(query_points, Sim.constraint(query_points)),
}
# %% [markdown]
# Let's randomly sample some initial data from the observer ...
# %%
num_initial_points = 5
initial_data = observer(search_space.sample(num_initial_points))
# %% [markdown]
# ... and visualise those points on the constrained objective. Note how the generated data is labelled, like the observer.
# %%
from util.inequality_constraints_utils import plot_init_query_points
plot_init_query_points(
search_space,
Sim,
initial_data[OBJECTIVE].astuple(),
initial_data[CONSTRAINT].astuple(),
)
plt.show()
# %% [markdown]
# ## Modelling the two functions
#
# We'll model the objective and constraint data with their own Gaussian process regression models.
# %%
import gpflow
from trieste.models.gpflow import GPflowModelConfig
def create_bo_model(data):
variance = tf.math.reduce_variance(initial_data[OBJECTIVE].observations)
lengthscale = 1.0 * np.ones(2, dtype=gpflow.default_float())
kernel = gpflow.kernels.Matern52(variance=variance, lengthscales=lengthscale)
jitter = gpflow.kernels.White(1e-12)
gpr = gpflow.models.GPR(data.astuple(), kernel + jitter, noise_variance=1e-5)
gpflow.set_trainable(gpr.likelihood, False)
return trieste.models.create_model(GPflowModelConfig(**{
"model": gpr,
"optimizer": gpflow.optimizers.Scipy(),
"optimizer_args": {
"minimize_args": {"options": dict(maxiter=100)},
},
}))
initial_models = trieste.utils.map_values(create_bo_model, initial_data)
# %% [markdown]
# ## Define the acquisition process
#
# We can construct the _expected constrained improvement_ acquisition function defined in <cite data-cite="gardner14">[Gardner et al.](http://proceedings.mlr.press/v32/gardner14.html)</cite>, where they use the probability of feasibility with respect to the constraint model.
# %%
from trieste.acquisition.rule import EfficientGlobalOptimization
pof = trieste.acquisition.ProbabilityOfFeasibility(threshold=Sim.threshold)
eci = trieste.acquisition.ExpectedConstrainedImprovement(
OBJECTIVE, pof.using(CONSTRAINT)
)
rule = EfficientGlobalOptimization(eci) # type: ignore
# %% [markdown]
# ## Run the optimization loop
#
# We can now run the optimization loop. We obtain the final objective and constraint data using `.try_get_final_datasets()`.
# %%
num_steps = 20
bo = trieste.bayesian_optimizer.BayesianOptimizer(observer, search_space)
data = bo.optimize(
num_steps, initial_data, initial_models, rule, track_state=False
).try_get_final_datasets()
# %% [markdown]
# To conclude this section, we visualise the resulting data. Orange dots show the new points queried during optimization. Notice the concentration of these points in regions near the local minima.
# %%
constraint_data = data[CONSTRAINT]
new_query_points = constraint_data.query_points[-num_steps:]
new_observations = constraint_data.observations[-num_steps:]
new_data = (new_query_points, new_observations)
plot_init_query_points(
search_space,
Sim,
initial_data[OBJECTIVE].astuple(),
initial_data[CONSTRAINT].astuple(),
new_data,
)
plt.show()
# %% [markdown]
# ## Batch-sequential strategy
#
# We'll now look at a batch-sequential approach to the same problem. Sometimes it's beneficial to query several points at a time instead of one. The acquisition function we used earlier, built by `ExpectedConstrainedImprovement`, only supports a batch size of 1, so we'll need a new acquisition function builder for larger batch sizes. We can implement this using the reparametrization trick with the Monte-Carlo sampler `BatchReparametrizationSampler`. Note that when we do this, we must initialise the sampler *outside* the acquisition function (here `batch_efi`). This is crucial: a given instance of a sampler produces repeatable, continuous samples, and we can use this to create a repeatable continuous acquisition function. Using a new sampler on each call would not result in a repeatable continuous acquisition function.
# %%
class BatchExpectedConstrainedImprovement(
trieste.acquisition.AcquisitionFunctionBuilder
):
def __init__(self, sample_size, threshold):
self._sample_size = sample_size
self._threshold = threshold
def prepare_acquisition_function(self, models, datasets):
objective_model = models[OBJECTIVE]
objective_dataset = datasets[OBJECTIVE]
samplers = {
tag: trieste.acquisition.BatchReparametrizationSampler(
self._sample_size, model
) for tag, model in models.items()
}
pf = trieste.acquisition.probability_of_feasibility(
models[CONSTRAINT], self._threshold
)(tf.expand_dims(objective_dataset.query_points, 1))
is_feasible = pf >= 0.5
mean, _ = objective_model.predict(objective_dataset.query_points)
eta = tf.reduce_min(tf.boolean_mask(mean, is_feasible), axis=0)
def batch_efi(at):
samples = {
tag: tf.squeeze(sampler.sample(at), -1)
for tag, sampler in samplers.items()
}
feasible_mask = samples[CONSTRAINT] < self._threshold # [N, S, B]
improvement = tf.where(
feasible_mask,
tf.maximum(eta - samples[OBJECTIVE], 0.),
0.
) # [N, S, B]
batch_improvement = tf.reduce_max(improvement, axis=-1) # [N, S]
return tf.reduce_mean(
batch_improvement, axis=-1, keepdims=True
) # [N, 1]
return batch_efi
num_query_points = 4
batch_eci = BatchExpectedConstrainedImprovement(50, Sim.threshold)
batch_rule = EfficientGlobalOptimization( # type: ignore
batch_eci, num_query_points=num_query_points
)
# %% [markdown]
# We can now run the BO loop as before; note that here we also query twenty points, but in five batches of four points.
# %%
initial_models = trieste.utils.map_values(create_bo_model, initial_data)
num_steps = 5
batch_data = bo.optimize(
num_steps, initial_data, initial_models, batch_rule, track_state=False
).try_get_final_datasets()
# %% [markdown]
# We visualise the resulting data as before.
# %%
batch_constraint_data = batch_data[CONSTRAINT]
new_batch_data = (
batch_constraint_data.query_points[-num_query_points * num_steps:],
batch_constraint_data.observations[-num_query_points * num_steps:]
)
plot_init_query_points(
search_space,
Sim,
initial_data[OBJECTIVE].astuple(),
initial_data[CONSTRAINT].astuple(),
new_batch_data,
)
plt.show()
# %% [markdown]
# Finally, we compare the regret from the non-batch strategy (left panel) to the regret from the batch strategy (right panel).
# In the following plots each marker represents a query point. The x-axis is the index of the query point (where the first queried point has index 0), and the y-axis is the observed value. The vertical blue line denotes the end of initialisation/start of optimisation. Green points satisfy the constraint, red points do not.
# %%
from util.plotting import plot_regret
mask_fail = constraint_data.observations.numpy() > Sim.threshold
batch_mask_fail = batch_constraint_data.observations.numpy() > Sim.threshold
fig, ax = plt.subplots(1, 2, sharey="all")
plot_regret(
data[OBJECTIVE].observations.numpy(),
ax[0],
num_init=num_initial_points,
mask_fail=mask_fail.flatten()
)
plot_regret(
batch_data[OBJECTIVE].observations.numpy(),
ax[1],
num_init=num_initial_points,
mask_fail=batch_mask_fail.flatten()
)
# %% [markdown]
# ## Constrained optimization with more than one constraint
#
# We'll now show how to use a reducer to combine multiple constraints. The new problem `Sim2` inherets from the previous one its objective and first constraint, but possess a second constraint. We start by adding an output to our observer, and creating a set of three models.
# %%
class Sim2(Sim):
threshold2 = 0.5
@staticmethod
def constraint2(input_data):
x, y = input_data[:, -2], input_data[:, -1]
z = tf.sin(x) * tf.cos(y) - tf.cos(x) * tf.sin(y)
return z[:, None]
CONSTRAINT2 = "CONSTRAINT2"
def observer_two_constraints(query_points):
return {
OBJECTIVE: Dataset(query_points, Sim2.objective(query_points)),
CONSTRAINT: Dataset(query_points, Sim2.constraint(query_points)),
CONSTRAINT2: Dataset(query_points, Sim2.constraint2(query_points)),
}
num_initial_points = 10
initial_data = observer_two_constraints(search_space.sample(num_initial_points))
initial_models = trieste.utils.map_values(create_bo_model, initial_data)
# %% [markdown]
# Now, the probability that the two constraints are feasible is the product of the two feasibilities. Hence, we combine the two `ProbabilityOfFeasibility` into one quantity by using a `Product` `Reducer`:
# %%
from trieste.acquisition.combination import Product
pof1 = trieste.acquisition.ProbabilityOfFeasibility(threshold=Sim2.threshold)
pof2 = trieste.acquisition.ProbabilityOfFeasibility(threshold=Sim2.threshold2)
pof = Product(pof1.using(CONSTRAINT), pof2.using(CONSTRAINT2)) # type: ignore
# %% [markdown]
# We can now run the BO loop as before, and visualize the results:
# %%
eci = trieste.acquisition.ExpectedConstrainedImprovement(OBJECTIVE, pof) # type: ignore
rule = EfficientGlobalOptimization(eci)
num_steps = 20
bo = trieste.bayesian_optimizer.BayesianOptimizer(observer_two_constraints, search_space)
data = bo.optimize(
num_steps, initial_data, initial_models, rule, track_state=False
).try_get_final_datasets()
constraint_data = data[CONSTRAINT]
new_query_points = constraint_data.query_points[-num_steps:]
new_observations = constraint_data.observations[-num_steps:]
new_data = (new_query_points, new_observations)
def masked_objective(x):
mask_nan = np.logical_or(Sim2.constraint(x) > Sim2.threshold,
Sim2.constraint2(x) > Sim2.threshold2)
y = np.array(Sim2.objective(x))
y[mask_nan] = np.nan
return tf.convert_to_tensor(y.reshape(-1, 1), x.dtype)
mask_fail1 = data[CONSTRAINT].observations.numpy().flatten().astype(int) > Sim2.threshold
mask_fail2 = data[CONSTRAINT].observations.numpy().flatten().astype(int) > Sim2.threshold2
mask_fail = np.logical_or(mask_fail1, mask_fail2)
import matplotlib.pyplot as plt
from util.plotting import plot_function_2d, plot_bo_points
fig, ax = plot_function_2d(
masked_objective,
search_space.lower,
search_space.upper,
grid_density=50,
contour=True
)
plot_bo_points(
data[OBJECTIVE].query_points.numpy(),
ax=ax[0, 0],
num_init=num_initial_points,
mask_fail=mask_fail,
)
plt.show()
# %% [markdown]
# ## LICENSE
#
# [Apache License 2.0](https://github.com/secondmind-labs/trieste/blob/develop/LICENSE)