-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathica.py
277 lines (234 loc) · 7.99 KB
/
ica.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
'''
CCU-ML Project.
Independent Component Analysis (ICA) based on:
A. Bell, T. Sejnowski, ’An information-maximation approach to
blind separation and blind deconvolution’, Neural Comput. 1995
Nov;7(6):1129-59.
'''
import numpy as np
from numpy import dot
from numpy.linalg import matrix_rank, inv
from numpy.random import permutation
from scipy.linalg import eigh
from scipy.linalg import norm as mnorm
# Global constants
EPS = 1e-16
MAX_W = 1e8
ANNEAL = 0.9
MAX_STEP = 10000
MIN_LRATE = 1e-6
W_STOP = 1e-6
def norm(x):
"""Computes the norm of a vector or the Frobenius norm of a
matrix_rank
"""
return mnorm(x.ravel())
class ica:
def __init__(self, n_components=10):
self.n_comp = n_components
def fit(self, x2d):
x_white, self.white, self.dewhite\
= pca_whiten(x2d, self.n_comp)
self.mix, self.sources, self.unmix\
= infomax1(x_white, self.n_comp)
return self
def diagsqrts(w):
"""
Returns direct and inverse square root normalization matrices
"""
Di = np.diag(1. / (np.sqrt(w) + np.finfo(float).eps))
D = np.diag(np.sqrt(w))
return D, Di
def pca_whiten(x2d, n_comp, verbose=True):
""" data Whitening
*Input
x2d : 2d data matrix of observations by variables
n_comp: Number of components to retain
*Output
Xwhite : Whitened X
white : whitening matrix (Xwhite = np.dot(white,X))
dewhite : dewhitening matrix (X = np.dot(dewhite,Xwhite))
"""
x2d_demean = x2d - x2d.mean(axis=1).reshape((-1, 1))
NSUB, NVOX = x2d_demean.shape
if NSUB > NVOX:
cov = dot(x2d_demean.T, x2d_demean) / (NSUB - 1)
w, v = eigh(cov, eigvals=(NVOX - n_comp, NVOX - 1))
D, Di = diagsqrts(w)
u = dot(dot(x2d_demean, v), Di)
x_white = v.T
white = dot(Di, u.T)
dewhite = dot(u, D)
else:
cov = dot(x2d_demean, x2d_demean.T) / (NVOX - 1)
w, u = eigh(cov, eigvals=(NSUB - n_comp, NSUB - 1))
D, Di = diagsqrts(w)
white = dot(Di, u.T)
x_white = dot(white, x2d_demean)
dewhite = dot(u, D)
return (x_white, white, dewhite)
def w_update(weights, x_white, bias1, lrate1):
""" Update rule for infomax
This function recieves parameters to update W1
* Input
W1: unmixing matrix (must be a square matrix)
Xwhite1: whitened data
bias1: current estimated bias
lrate1: current learning rate
startW1: in case update blows up it will start again from startW1
* Output
W1: updated mixing matrix
bias: updated bias
lrate1: updated learning rate
"""
NCOMP, NVOX = x_white.shape
block1 = int(np.floor(np.sqrt(NVOX / 3)))
permute1 = permutation(NVOX)
for start in range(0, NVOX, block1):
if start + block1 < NVOX:
tt2 = start + block1
else:
tt2 = NVOX
block1 = NVOX - start
unmixed = dot(weights, x_white[:, permute1[start:tt2]]) + bias1
logit = 1 - (2 / (1 + np.exp(-unmixed)))
weights = weights + lrate1 * dot(block1 * np.eye(NCOMP) +
dot(logit, unmixed.T), weights)
bias1 = bias1 + lrate1 * logit.sum(axis=1).reshape(bias1.shape)
# Checking if W blows up
if (np.isnan(weights)).any() or np.max(np.abs(weights)) > MAX_W:
print("Numeric error! restarting with lower learning rate")
lrate1 = lrate1 * ANNEAL
weights = np.eye(NCOMP)
bias1 = np.zeros((NCOMP, 1))
error = 1
if lrate1 > 1e-6 and \
matrix_rank(x_white) < NCOMP:
print("Data 1 is rank defficient"
". I cannot compute " +
str(NCOMP) + " components.")
return (None, None, None, 1)
if lrate1 < 1e-6:
print("Weight matrix may"
" not be invertible...")
return (None, None, None, 1)
break
else:
error = 0
return(weights, bias1, lrate1, error)
# infomax1: single modality infomax
def infomax1(x_white, verbose=False):
"""Computes ICA infomax in whitened data
Decomposes x_white as x_white=AS
*Input
x_white: whitened data (Use PCAwhiten)
verbose: flag to print optimization updates
*Output
A : mixing matrix
S : source matrix
W : unmixing matrix
"""
NCOMP = x_white.shape[0]
# Initialization
weights = np.eye(NCOMP)
old_weights = np.eye(NCOMP)
d_weigths = np.zeros(NCOMP)
old_d_weights = np.zeros(NCOMP)
lrate = 0.005 / np.log(NCOMP)
bias = np.zeros((NCOMP, 1))
change = 1
angle_delta = 0
if verbose:
print("Beginning ICA training...")
step = 1
while step < MAX_STEP and change > W_STOP:
(weights, bias, lrate, error) = w_update(weights, x_white, bias, lrate)
if error != 0:
step = 1
error = 0
lrate = lrate * ANNEAL
weights = np.eye(NCOMP)
old_weights = np.eye(NCOMP)
d_weigths = np.zeros(NCOMP)
old_d_weights = np.zeros(NCOMP)
bias = np.zeros((NCOMP, 1))
else:
d_weigths = weights - old_weights
change = norm(d_weigths)**2
if step > 2:
angle_delta = np.arccos(
np.sum(d_weigths * old_d_weights) /
(norm(d_weigths) * norm(old_d_weights) + 1e-8)
) * 180 / np.pi
old_weights = np.copy(weights)
if angle_delta > 60:
lrate = lrate * ANNEAL
old_d_weights = np.copy(d_weigths)
elif step == 1:
old_d_weights = np.copy(d_weigths)
if verbose and change < W_STOP:
print("Step %d: Lrate %.1e,"
"Wchange %.1e,"
"Angle %.2f" % (step, lrate,
change, angle_delta))
step = step + 1
# A,S,W
return (inv(weights), dot(weights, x_white), weights)
# Single modality ICA
def ica1(x_raw, ncomp, verbose=False):
'''
Single modality Independent Component Analysis
'''
if verbose:
print("Whitening data...")
x_white, _, dewhite = pca_whiten(x_raw, ncomp)
if verbose:
print('x_white shape: %d, %d' % x_white.shape)
print("Done.")
if verbose:
print("Running INFOMAX-ICA ...")
mixer, sources, unmixer = infomax1(x_white, verbose)
mixer = dot(dewhite, mixer)
scale = sources.std(axis=1).reshape((-1, 1))
sources = sources / scale
scale = scale.reshape((1, -1))
mixer = mixer * scale
if verbose:
print("Done.")
return (mixer, sources, unmixer)
def icax(x_raw, ncomp, verbose=True):
if verbose:
print("Whitening data...")
x_white, _, dewhite = pca_whiten(x_raw, ncomp)
mixer_list = []
sources_list = []
for it in range(10):
if verbose:
print('Run number %d' % it)
print("Running INFOMAX-ICA ...")
mixer, sources, unmix = infomax1(x_white, verbose)
mixer_list.append(mixer)
sources_list.append(sources)
# Reorder all sources to the order of the first
S1 = sources_list[0]
for it in range(1, 10):
S2 = sources_list[it]
A2 = mixer_list[it]
cor_m = np.corrcoef(S1, S2)[:ncomp, ncomp:]
idx = np.argmax(np.abs(cor_m), axis=1)
S2 = S2[idx, :]
A2 = A2[:, idx]
cor_m = np.corrcoef(S1, S2)[:ncomp, ncomp:]
S2 = S2 * np.sign(np.diag(cor_m)).reshape((ncomp, 1))
A2 = A2 * np.sign(np.diag(cor_m)).reshape((1, ncomp))
sources_list[it] = S2
mixer_list[it] = A2
# Average sources
temp_sources = np.zeros(sources.shape)
temp_mixer = np.zeros(mixer.shape)
for sources, mixer in zip(sources_list, mixer_list):
temp_sources = temp_sources + sources
temp_mixer = temp_mixer + mixer
temp_sources = temp_sources / 10.0
temp_mixer = temp_mixer / 10.0
return (temp_mixer, temp_sources)