-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbfgs.py
138 lines (110 loc) · 4.42 KB
/
bfgs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import line_search, BFGS
print("Importing data...")
#Import data in list of pandas.DataFrames: df
df = [] #list of pandas dataframes
prefix ='COVID-19/dati-andamento-nazionale/dpc-covid19-ita-andamento-nazionale-2020'
for d in range(24,30): #February 24-29
filename = prefix+'02{}.csv'.format(d)
df.append(pd.read_csv(filename))
for d in range(1,32): #March
filename = prefix+'03{:02d}.csv'.format(d)
df.append(pd.read_csv(filename))
print("Imported.")
#Array with current positive cases: current
current = np.array([d.loc[0,'totale_positivi'] for d in df])
#Array of days after 24th February: X
X = np.arange(len(current))
#Rescale current array to [0,1]: scaled
scaled = (current-np.min(current))/(np.max(current)-np.min(current))
#Definition of loss function and gradient: loss(), gradient()
#loss_c() is for non-scaled data
def loss(theta):
return np.sum((theta[0]/(theta[1]+theta[2]*np.exp(-theta[3]*(X-theta[4])))+theta[5] - scaled)**2)/(2*len(X))
def loss_c(theta):
return np.sum(((theta[0]/(theta[1]+theta[2]*np.exp(-theta[3]*(X-theta[4])))+theta[5])*(np.max(current)-np.min(current))+np.min(current) - current)**2)/(2*len(X))
def gradient(theta):
prediction = theta[0]/(theta[1]+theta[2]*np.exp(-theta[3]*(X-theta[4])))+theta[5]
da = 1/(theta[1]+theta[2]*np.exp(-theta[3]*(X-theta[4])))
db = -(theta[0]/(theta[1]+theta[2]*np.exp(-theta[3]*(X-theta[4])))**2)
dc = -(theta[0]*np.exp(theta[3]*(X - theta[4])))/((theta[1]*np.exp(theta[3]*(X - theta[4])) + theta[2])**2)
dd = -(theta[0]*theta[2]*(theta[4] - X)*np.exp(theta[3]*(X - theta[4])))/((theta[1]*np.exp(theta[3]*(X - theta[4])) + theta[2])**2)
de = -(theta[0]* theta[2]* theta[3]* np.exp(theta[3]*(X - theta[4])))/((theta[1]*np.exp(theta[3]*(X - theta[4])) + theta[2])**2)
#df = 1
Da = np.dot(prediction - scaled,da)
Db = np.dot(prediction - scaled,db)
Dc = np.dot(prediction - scaled,dc)
Dd = np.dot(prediction - scaled,dd)
De = np.dot(prediction - scaled,de)
Df = np.sum(prediction - scaled)
return np.array([Da,Db,Dc,Dd,De,Df])
#Definition of BFGS algorithm: BFGS_algorithm()
def BFGS_algorithm(obj_fun, theta0, max_iter=2e04, epsilon=0):
print("Starting BFGS algorithm.")
#Initialization of object: bfgs
bfgs = BFGS()
bfgs.initialize(6, "inv_hess")
#Lists to store results for theta (th) and cost(c)
th,c = [],[]
th.append(theta0)
c.append(obj_fun(theta0))
niter = max_iter
success = (False, "max_iter reached.")
#Iteration
for n in range(max_iter):
th_0 = th[n]
g0 = gradient(th_0)
#If loss<epsilon, converged
#If epsilon=0, no check for convergence
if (epsilon > 0) and (obj_fun(th_0) < epsilon):
niter = n
success = (True, "Loss = {}".format(obj_fun(th_0)))
break
#Compute search direction
d = bfgs.dot(g0)
#Compute step size through line search
alpha = line_search(obj_fun,gradient,th_0,-g0)[0]
#Update theta and gradient
th_1 = th_0 - alpha*d
g1 = gradient(th_1)
#Update theta history and cost history
th.append(th_1)
c.append(obj_fun(th_1))
#Update inverse hessian
bfgs.update(th_1-th_0, g1-g0)
print("Exiting.")
return th,c,niter,success
#Inputs: theta0, max_iter, epsilon
theta0 = np.ones((6,))
max_iter = 20000
epsilon = 0
#Compute results: theta_history, cost_history, niter, success
theta_history, cost_history, niter, success = BFGS_algorithm(loss,theta0,max_iter,epsilon)
theta = theta_history[-1]
#Print out results
print("\n----------\nResults:\n")
print("Number of iterations: ", niter)
print("Theta: ", theta)
print("Loss: ", loss_c(theta))
print("Loss (rescaled data): ",loss(theta))
print("Converged: {}, {}".format(*success))
#Plot results
print("\nPlotting...")
#Fit
plt.scatter(X, current, color='orange', label='Data points')
plt.plot(X, (theta[0]/(theta[1]+theta[2]*np.exp(-theta[3]*(X-theta[4])))+theta[5])*(np.max(current)-np.min(current))+np.min(current), label='Logistic regression')
plt.title("BFGS logistic fit")
plt.xlabel("Days after 24th February")
plt.ylabel("Current positive cases")
plt.legend()
plt.show()
#Cost_history
plt.plot(range(niter+1),cost_history)
plt.title("Cost history")
plt.xlabel("Iteration")
plt.ylabel("Loss function value")
plt.yscale('log')
plt.show()
print("Terminated.")