-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathoptuna_surrogate.py
254 lines (190 loc) · 8.92 KB
/
optuna_surrogate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import ember
import numpy as np
from sklearn.utils import shuffle
from sklearn.metrics import hamming_loss, roc_curve
import shap
import lightgbm as lgb
import os
import sys
from typing import Any, Dict
import optuna
# from optuna.pruners import MedianPruner
from optuna.samplers import TPESampler
from sorel_net import SorelFFNN
N_TRIALS = 20 # Maximum number of trials
N_JOBS = 1 # Number of jobs to run in parallel
N_STARTUP_TRIALS = 5 # Stop random sampling after N_STARTUP_TRIALS
# N_EVALUATIONS = 5 # Number of evaluations during the training
# N_TIMESTEPS = 5000 # Training budget
# EVAL_FREQ = int(N_TIMESTEPS / N_EVALUATIONS)
# N_EVAL_ENVS = 5
# N_EVAL_EPISODES = 10
# TIMEOUT = int(60 * 15 * 10) # 15 minutes
module_path = os.path.split(os.path.abspath(sys.modules[__name__].__file__))[0]
DATA_PATH = os.path.join(module_path, "malware_rl/envs/utils")
FPR_TARGET = 0.002
TARGET = 'sorel'
def find_threshold(y_true, y_pred, fpr_target):
"""
Given the true labels and the probabilities of the model
calculate the decision threshold for a given FPR level.
"""
fpr, _, thresh = roc_curve(y_true, y_pred)
return np.interp(fpr_target, fpr, thresh)
def get_ember_predictions(X_test):
model = lgb.Booster(model_file='/data/mari/ember2018/ember_model_2018.txt')
y_proba = model.predict(X_test)
y_pred = [int(i > 0.8336) for i in y_proba]
return y_pred
def get_sorel_predictions(X_test):
model = lgb.Booster(model_file=os.path.join('malware_rl/envs/utils', 'sorel.model'))
y_proba = model.predict(X_test)
y_pred = np.where(y_proba>0.5, 1, 0)
return y_pred
def get_sorelFFNN_predictions(X_test):
model = SorelFFNN(model_file=os.path.join('malware_rl/envs/utils', 'sorelFFNN.pt'))
y_proba = model.predict(X_test)
y_pred = np.where(y_proba>0.5, 1, 0)
return y_pred
def get_sorel_data(data_dir, seed=42):
ndim = 2381
X_train_path = os.path.join(data_dir, "X_val.dat")
y_train_path = os.path.join(data_dir, "y_val.dat")
y_train = np.memmap(y_train_path, dtype=np.float32, mode="r")
N = y_train.shape[0]
X_train = np.memmap(X_train_path, dtype=np.float32, mode="r", shape=(N, ndim))
X_test_path = os.path.join(data_dir, "X_test.dat")
y_test_path = os.path.join(data_dir, "y_test.dat")
y_test = np.memmap(y_test_path, dtype=np.float32, mode="r")
N = y_test.shape[0]
X_test = np.memmap(X_test_path, dtype=np.float32, mode="r", shape=(N, ndim))
X_train, y_train = shuffle(X_train, y_train, random_state=seed)
return X_train, X_test, y_train, y_test
def get_ember_data(data_dir, seed=42):
emberdf = ember.read_metadata(data_dir)
X_train, y_train, X_test, y_test = ember.read_vectorized_features(data_dir)
# Get the labels of the "unlabeled" data using the avclass column
# This is only required for the dual input DNN but is done for all models
# for reasons of uniformity.
idx_unlab_mal = emberdf.query("label==-1")["avclass"].dropna().index
idx_unlab_ben = emberdf[emberdf.avclass.isnull()].query("label == -1").index
# Read the data and select the unlabeled instances
X_train_mal = X_train[idx_unlab_mal]
X_train_ben = X_train[idx_unlab_ben]
y_train_ben = np.zeros(X_train_ben.shape[0])
y_train_mal = np.ones(X_train_mal.shape[0])
# Create the training dataset
y_train = np.concatenate((y_train_ben, y_train_mal))
X_train = np.vstack((X_train_ben, X_train_mal))
X_train_unlab, y_train_unlab = shuffle(X_train, y_train, random_state=seed)
return X_train_unlab, X_test, y_train_unlab, y_test
def eval_explainability(model, target_model, X_test, num_test_samples=2000):
idx = np.random.choice(np.arange(X_test.shape[0]), num_test_samples)
surr_ind = get_shapley_indices(model, X_test[idx], 20)
if TARGET in ['ember', 'sorel']:
target_ind = get_shapley_indices(target_model, X_test[idx], 20)
else:
target_ind = [2360, 810, 693, 2357, 692, 827, 615, 822, 698, 2378, 2358, 697, 736, 2380, 721, 742, 2359, 625, 691, 727]
exp10 = find_num_common_elements(surr_ind[:10], target_ind[:10]) / 10
exp20 = find_num_common_elements(surr_ind, target_ind) / 20
return exp10, exp20
def find_num_common_elements(a, b):
return len(np.intersect1d(a, b))
def get_shapley_indices(model, X_test, k):
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)[0]
sort_inds = np.argsort(np.sum(np.abs(shap_values), axis=0))
return sort_inds[::-1][:k]
def create_dataset(data_path: str, alpha: float):
# X_train, X_test, y_train, y_test = get_ember_data('/data/mari/ember2018')
X_train, X_test, y_train, y_test = get_sorel_data('/data/mari/sorel-data')
X_rl = np.load(os.path.join(data_path, 'observations.npy'))
y_rl = np.load(os.path.join(data_path, 'scores.npy'))
t = X_rl.shape[0]
m = X_train.shape[0]
w_rl = np.ones(t) * alpha
w_train = np.ones(m)
X = np.vstack((X_train, X_rl))
y = np.concatenate((y_train, y_rl))
w = np.concatenate((w_train, w_rl))
train_data = lgb.Dataset(X, label=y, weight=w)
return train_data, X_test, y_test
def sample_params(trial: optuna.Trial) -> Dict[str, Any]:
alpha = trial.suggest_float("alpha", 1.0, 1000, log=True)
num_boosting_rounds = trial.suggest_int("num_boosting_rounds", 100, 2000)
# learning_rate = trial.suggest_float("learning_rate", 0.001, 0.1, log=True)
# num_leaves = trial.suggest_int("num_leaves", 128, 2048)
# max_depth = trial.suggest_int("max_depth", 10, 16)
# min_child_samples = trial.suggest_int("min_child_samples", 5, 100)
# feature_fraction = trial.suggest_float('feature_fraction', 0.4, 1.0)
"""
'lambda_l1': trial.suggest_float('lambda_l1', 1e-8, 10.0, log=True),
'lambda_l2': trial.suggest_float('lambda_l2', 1e-8, 10.0, log=True),
'num_leaves': trial.suggest_int('num_leaves', 2, 256),
'feature_fraction': trial.suggest_float('feature_fraction', 0.4, 1.0),
'bagging_fraction': trial.suggest_float('bagging_fraction', 0.4, 1.0),
'bagging_freq': trial.suggest_int('bagging_freq', 1, 7),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
"""
lgb_params = {
"boosting_type" : "gbdt",
"objective" : "binary",
"learning_rate" : trial.suggest_float("learning_rate", 0.001, 0.1, log=True),
"num_leaves": trial.suggest_int("num_leaves", 128, 2048),
"max_depth" : trial.suggest_int("max_depth", 5, 16),
"min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
"feature_fraction": trial.suggest_float('feature_fraction', 0.4, 1.0),
"verbose": -1
}
params = {
"alpha": alpha,
"lgb_params": lgb_params,
"num_boosting_rounds": num_boosting_rounds
}
return params
def objective(trial: optuna.Trial) -> float:
params = sample_params(trial)
train_data, X_test, y_test = create_dataset(DATA_PATH, params["alpha"])
model = lgb.train(params["lgb_params"],
train_data,
num_boost_round=params["num_boosting_rounds"])
if TARGET == 'sorel':
target_model = lgb.Booster(model_file=os.path.join('malware_rl/envs/utils', 'sorel.model'))
target_model.params["objective"] = 'binary'
elif TARGET == 'ember':
target_model = lgb.Booster(model_file=os.path.join('malware_rl/envs/utils', 'ember_model.txt'))
target_model.params["objective"] = 'binary'
elif TARGET == 'sorelFFNN':
target_model = SorelFFNN(model_file=os.path.join('malware_rl/envs/utils', 'sorelFFNN.pt'))
y_proba = model.predict(X_test)
# y_pred_target = get_ember_predictions(X_test)
y_pred_target = get_sorel_predictions(X_test)
# y_pred_target = get_sorelFFNN_predictions(X_test)
thresh = find_threshold(y_test, y_proba, FPR_TARGET)
y_pred = np.where(y_proba>thresh, 1, 0)
# y_pred = [int(i > thresh) for i in y_proba]
agg_score = 1.0 - hamming_loss(y_pred_target, y_pred)
exp10, exp20 = eval_explainability(model, target_model, X_test, num_test_samples=1000)
return exp10, exp20, agg_score
sampler = TPESampler(n_startup_trials=N_STARTUP_TRIALS)
# Do not prune early
# pruner = MedianPruner(
# n_startup_trials=N_STARTUP_TRIALS, n_warmup_steps=10
# )
# Create the study and start the hyperparameter optimization
study = optuna.create_study(study_name="surrogate_9",
storage="sqlite:///surrogate_trials.db",
sampler=sampler,
# pruner=pruner,
directions=["maximize", "maximize", "maximize"])
try:
study.optimize(objective, n_trials=N_TRIALS, n_jobs=N_JOBS)
except KeyboardInterrupt:
pass
print("Number of finished trials: ", len(study.trials))
# print("Best trial:")
# trial = study.best_trial
# print(f" Value: {trial.value}")
# print(" Params: ")
# for key, value in trial.params.items():
# print(f" {key}: {value}")