-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmorse.py
155 lines (107 loc) · 6.11 KB
/
morse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import gurobipy as gp
from gurobipy import GRB, quicksum
import random as rm
import pandas as pd
from collections.abc import Iterable
from math import lcm
from fractions import Fraction
class Morse():
def __init__(self, instance, seed = None, weights = None, orig_obj_val = None):
self.instance = instance
self.orig_model = gp.read(instance)
self.orig_obj = self.orig_model.getObjective()
self.orig_obj_coeffs = [self.orig_obj.getCoeff(i) for i in range(self.orig_obj.size())]
self.orig_obj_val = orig_obj_val
self.model = self.orig_model.copy()
self.obj_function = self.model.getObjective()
self.obj_vars = [self.obj_function.getVar(i) for i in range(self.obj_function.size())]
self.obj_coeffs = [self.obj_function.getCoeff(i) for i in range(self.obj_function.size())]
self.contains_continuous = any([var.vtype == 'C' for var in self.obj_vars])
self.integer_coeffs = all([int(coeff) == coeff for coeff in self.obj_coeffs])
if seed:
assert isinstance(seed, int), 'Seed must be an integer.'
self.seed = seed
if weights:
assert not isinstance(weights, str), 'Perturbation vector must be a non-string iterable'
assert isinstance(weights, Iterable), 'Perturbation vector is not iterable.'
assert len(weights) == len(self.obj_vars), f'Perturbation vector has length {len(weights)}, expected length {len(self.obj_vars)}.'
assert all(isinstance(x, (int, float)) for x in weights), 'Perturbation vector elements must be float or int'
self.weights = weights
def scale_objective(self) -> None:
''' Scale objective function to eliminate non-integral coefficients.
'''
# Convert each objective function coefficient to a fraction and store the denominators
denoms = [Fraction(coeff).limit_denominator().denominator for coeff in self.obj_coeffs]
# Define the scaling factor as the least common multiple of the denominators
scaler = lcm(*denoms)
# Scale the objective function coefficients
self.obj_coeffs = [scaler * coeff for coeff in self.obj_coeffs]
# Update the objective function with the scaled coefficients
self.model.setObjective(quicksum(coeff * var for coeff, var in zip(self.obj_coeffs, self.obj_vars)))
def generate_epsilon(self) -> float:
''' Generate value of epsilon.
Returns:
-------
epsilon: value of epsilon to be used for the perturbation
'''
# Define S as the sum of max(abs(upper bound), abs(lower bound)) for all binary/integer variables in the objective function
S = sum([abs(coeff) * max(abs(var.ub), abs(var.lb)) for coeff, var in zip(self.obj_coeffs, self.obj_vars) if var.vtype != 'C'])
# Define epsilon as 1/2S
return 1 / (2 * S)
def parse_objective(self, epsilon) -> None:
''' Apply perturbation to objective function and update model's objective.
Args:
-------
epsilon: value of epsilon
'''
if not self.weights:
if self.seed:
rm.seed(self.seed)
# Generate random weights for binary/integer variables, generate uniform weights of 1 for continuous variables
self.weights = [rm.uniform(1 - epsilon, 1 + epsilon) if var.vtype != 'C' else 1 for var in self.obj_vars]
# Map weights to coefficients in the objective function, set new model objective
self.model.setObjective(quicksum(weight * coeff * var for weight, coeff, var in zip(self.weights, self.obj_coeffs, self.obj_vars)))
def solve(self) -> None:
''' Generate and apply perturbation to the model, optimize the resulting instance.
'''
if self.seed:
# Set Gurobi seed
self.model.setParam('Seed', self.seed)
# Scale the objective function to eliminate non-integral coefficients
self.scale_objective()
# Generate value of epsilon for perturbation
epsilon = self.generate_epsilon()
# Apply perturbation to model objective function
self.parse_objective(epsilon)
# Optimize the perturbed model
self.model.optimize()
# If the objective function contains continuous variables, check that the solution found with MORSE is optimal for the original problem
if self.contains_continuous or not self.integer_coeffs:
# While the MORSE solution is not optimal, decrease epsilon by a factor of 10, reperturb the objective function, and reoptimize
while not self.check_optimality():
self.__init__(self.instance, orig_obj_val = self.orig_obj_val)
epsilon /= 10
self.parse_objective(epsilon)
self.model.optimize()
def check_optimality(self) -> bool:
''' Check that a solution found with MORSE is optimal to the original problem.
'''
if not self.orig_obj_val:
self.orig_model.optimize()
self.orig_obj_val = self.orig_model.ObjVal
morse_obj_val = sum([var.X * coeff for var, coeff in zip(self.obj_vars, self.orig_obj_coeffs)])
relative_diff = abs(morse_obj_val - self.orig_obj_val) / abs(self.orig_obj_val)
return relative_diff < self.orig_model.Params.OptimalityTol
def record_sols(self, filepath: str) -> None:
''' Write objective function variables, values, and weights to csv file.
Args:
-------
filepath: filepath to csv file
'''
# Raise error if the model is not optimized
if not self.model.status == GRB.OPTIMAL:
raise Exception('Model is not optimized')
# Write dataframe with variable names, values, and weights, to csv at user's specified filepath
pd.DataFrame({'VarName': [v.VarName for v in self.obj_vars],
'Value': list(map(lambda var: round(var.x) if abs(round(var.x) - var.x) < 0.0001 else var.x, self.obj_vars)),
'Weight': self.weights}).to_csv(filepath, index = False)