forked from murraylab/PyDDM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsample.py
490 lines (452 loc) · 23.1 KB
/
sample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
# Copyright 2018 Max Shinn <[email protected]>
# 2018 Norman Lam <[email protected]>
#
# This file is part of PyDDM, and is available under the MIT license.
# Please see LICENSE.txt in the root directory for more information.
import numpy as np
import itertools
from paranoid.types import NDArray, Number, List, String, Self, Positive, Positive0, Range, Natural0, Unchecked, Dict, Maybe, Nothing, Boolean
from paranoid.decorators import *
from .models.paranoid_types import Conditions
@paranoidclass
class Sample(object):
"""Describes a sample from some (empirical or simulated) distribution.
Similarly to Solution, this is a glorified container for three
items: a list of correct reaction times, a list of error reaction
times, and the number of undecided trials. Each can have
different properties associated with it, known as "conditions"
elsewhere in this codebase. This is to specifiy the experimental
parameters of the trial, to allow fitting of stimuli by (for
example) color or intensity.
To specify conditions, pass a keyword argument to the constructor.
The name should be the name of the property, and the value should
be a tuple of length two or three. The first element of the tuple
should be a list of length equal to the number of correct trials,
and the second should be equal to the number of error trials. If
there are any undecided trials, the third argument should
contain a list of length equal to `undecided`.
Optionally, additional data can be associated with each
independent data point. These should be passed as keyword
arguments, where the keyword name is the property and the value is
a tuple. The tuple should have either two or three elements: the
first two should be lists of properties for the correct and error
reaction times, where the properties correspond to reaction times
in the correct or error lists. Optionally, a third list of length
equal to the number of undecided trials gives a list of conditions
for these trials. If multiple properties are passed as keyword
arguments, the ordering of the undecided properties (in addition
to those of the correct and error distributions) will correspond
to one another.
"""
@classmethod
def _test(cls, v):
# Most testing is done in the constructor and the data is read
# only, so this isn't strictly necessary
assert type(v) is cls
assert v.corr in NDArray(d=1, t=Positive0), "sample_corr not a numpy array with elements greater than 0, it is " + str(type(v.corr))
assert v.err in NDArray(d=1, t=Positive0), "sample_err not a numpy array with elements greater than 0, it is " + str(type(v.err))
assert v.undecided in Natural0(), "undecided not a natural number"
for k,val in v.conditions.items():
# Make sure shape and type are correct
assert k, "Invalid key"
assert isinstance(val, tuple)
assert len(val) in [2, 3]
assert val[0] in NDArray(d=1)
assert val[1] in NDArray(d=1)
assert len(val[0]) == len(v.corr)
assert len(val[1]) == len(v.err)
if len(val) == 3:
assert len(val[2]) == v.undecided
assert val[2] in NDArray(d=1)
else:
assert v.undecided == 0
@staticmethod
def _generate():
aa = lambda x : np.asarray(x)
yield Sample(aa([.1, .2, .3]), aa([.2, .3, .4]), undecided=0)
yield Sample(aa([.1, .2, .3]), aa([]), undecided=0)
yield Sample(aa([]), aa([.2, .3, .4]), undecided=0)
yield Sample(aa([.1, .2, .3]), aa([.2, .3, .4]), undecided=5)
def __init__(self, sample_corr, sample_err, undecided=0, **kwargs):
assert sample_corr in NDArray(d=1, t=Number), "sample_corr not a numpy array, it is " + str(type(sample_corr))
assert sample_err in NDArray(d=1, t=Number), "sample_err not a numpy array, it is " + str(type(sample_err))
assert undecided in Natural0(), "undecided not a natural number"
self.corr = sample_corr
self.err = sample_err
self.undecided = undecided
# Values should not change
self.corr.flags.writeable = False
self.err.flags.writeable = False
# Make sure the kwarg parameters/conditions are in the correct
# format
for k,v in kwargs.items():
# Make sure shape and type are correct
assert k, "Invalid key"
assert isinstance(v, tuple)
assert len(v) in [2, 3]
assert v[0] in NDArray(d=1)
assert v[1] in NDArray(d=1)
assert len(v[0]) == len(self.corr)
assert len(v[1]) == len(self.err)
# Make read-only
v[0].flags.writeable = False
v[1].flags.writeable = False
if len(v) == 3:
assert len(v[2]) == undecided
else:
assert undecided == 0
self.conditions = kwargs
def __len__(self):
"""The number of samples"""
return len(self.corr) + len(self.err) + self.undecided
def __iter__(self):
"""Iterate through each reaction time, with no regard to whether it was a correct or error trial."""
return np.concatenate([self.corr, self.err]).__iter__()
def __eq__(self, other):
if not np.allclose(self.corr, other.corr) or \
not np.allclose(self.err, other.err) or \
self.undecided != other.undecided:
return False
for k in self.conditions:
if k not in other.conditions:
return False
if np.issubdtype(self.conditions[k][0].dtype, np.floating) and \
np.issubdtype(self.conditions[k][0].dtype, np.floating):
compare_func = np.allclose
else:
compare_func = lambda x,y: np.all(x == y)
if not compare_func(self.conditions[k][0], other.conditions[k][0]) or \
not compare_func(self.conditions[k][1], other.conditions[k][1]):
return False
if len(self.conditions[k]) == 3 and \
len(other.conditions[k]) == 3 and \
not compare_func(self.conditions[k][2], other.conditions[k][2]):
return False
return True
def __add__(self, other):
assert sorted(self.conditions.keys()) == sorted(other.conditions.keys()), "Canot add with unlike conditions"
corr = np.concatenate([self.corr, other.corr])
err = np.concatenate([self.err, other.err])
undecided = self.undecided + other.undecided
conditions = {}
for k in self.conditions.keys():
sc = self.conditions
oc = other.conditions
bothc = np.concatenate([sc[k][0], oc[k][0]])
bothe = np.concatenate([sc[k][1], oc[k][1]])
bothn = np.concatenate([sc[k][2] if len(sc[k]) == 3 else [],
oc[k][2] if len(oc[k]) == 3 else []])
conditions[k] = (bothc, bothe, bothn)
return Sample(corr, err, undecided, **conditions)
@staticmethod
@accepts(NDArray(d=2), List(String))
@returns(Self)
@requires('data.shape[1] >= 2')
@requires('set(list(data[:,1])) - {0, 1} == set()')
@requires('all(data[:,0].astype("float") == data[:,0])')
@requires('data.shape[1] - 2 == len(column_names)')
@ensures('len(column_names) == len(return.condition_names())')
def from_numpy_array(data, column_names):
"""Generate a Sample object from a numpy array.
`data` should be an n x m array (n rows, m columns) where
m>=2. The first column should be the response times, and the
second column should be whether the trial was correct or an
error (1 == correct, 0 == error). Any remaining columns
should be conditions. `column_names` should be a list of
length m of strings indicating the names of the conditions.
The order of the names should correspond to the order of the
columns. This function does not yet work with undecided
trials.
"""
c = data[:,1].astype(bool)
nc = (1-data[:,1]).astype(bool)
def pt(x): # Pythonic types
arr = np.asarray(x, dtype=object)
# The following is somewhat of a hack to get rid of object arrays
# when a condition is not a number (e.g. string or tuple)
if len(arr) > 0 and not isinstance(arr[0], (float, int, np.float_, np.int_)):
return arr
arr = np.asarray(arr.tolist())
try:
if np.all(arr == np.round(arr)):
arr = arr.astype(np.int64)
except TypeError:
pass
return arr
conditions = {k: (pt(data[c,i+2]), pt(data[nc,i+2]), np.asarray([])) for i,k in enumerate(column_names)}
return Sample(pt(data[c,0]), pt(data[nc,0]), 0, **conditions)
@staticmethod
@accepts(Unchecked, String, String) # TODO change unchecked to pandas
@returns(Self)
@requires('df.shape[1] >= 2')
@requires('rt_column_name in df')
@requires('correct_column_name in df')
@requires('not np.any(df.isnull())')
@requires('len(np.setdiff1d(df[correct_column_name], [0, 1])) == 0')
@requires('all(df[rt_column_name].astype("float") == df[rt_column_name])')
@ensures('len(df) == len(return)')
def from_pandas_dataframe(df, rt_column_name, correct_column_name):
"""Generate a Sample object from a pandas dataframe.
`df` should be a pandas array. `rt_column_name` and
`correct_column_name` should be strings, and `df` should
contain columns by these names. The column with the name
`rt_column_name` should be the response times, and the column
with the name `correct_column_name` should be whether the
trial was correct or an error (1 == correct, 0 == error). Any
remaining columns should be conditions. This function does
not yet work with undecided trials.
"""
if len(df) == 0:
print("Warning: Empty DataFrame")
if np.mean(df[rt_column_name]) > 50:
print("Warning: RTs should be specified in seconds, not milliseconds")
for _,col in df.items():
if len(df) > 0 and isinstance(col.iloc[0], (list, np.ndarray)):
raise ValueError("Conditions should not be lists or ndarrays. Please convert to a tuple instead.")
c = df[correct_column_name].astype(bool)
nc = (1-df[correct_column_name]).astype(bool)
def pt(x): # Pythonic types
arr = np.asarray(x, dtype=object)
# The following is somewhat of a hack to get rid of object arrays
# when a condition is not a number (e.g. string or tuple)
if len(arr) > 0 and not isinstance(arr[0], (float, int, np.float_, np.int_)):
return arr
arr = np.asarray(arr.tolist())
try:
if np.all(arr == np.round(arr)):
arr = arr.astype(np.int64)
except TypeError:
pass
return arr
column_names = [e for e in df.columns if not e in [rt_column_name, correct_column_name]]
conditions = {k: (pt(df[c][k]), pt(df[nc][k]), np.asarray([])) for k in column_names}
return Sample(pt(df[c][rt_column_name]), pt(df[nc][rt_column_name]), 0, **conditions)
def to_pandas_dataframe(self, rt_column_name='RT', correct_column_name='correct', drop_undecided=False):
"""Convert the sample to a Pandas dataframe.
`correct_column_name` is the column label for the response
time, and `rt_column_name` is the column label for whether a
trial is correct or incorrect.
Because undecided trials do not have an RT or correct/error, they are
cannot be added to the data frame. To ignore them, thereby creating a
dataframe which is smaller than the sample, set `drop_undecided` to
True.
"""
import pandas
all_trials = []
if self.undecided != 0 and drop_undecided is False:
raise ValueError("The sample object has undecided trials. These do not have an RT or a P(correct), so they cannot be converted to a data frame. Please use the 'drop_undecided' flag when calling this function.")
conditions = list(self.condition_names())
columns = [correct_column_name, rt_column_name] + conditions
for trial in self.items(correct=True):
all_trials.append([1, trial[0]] + [trial[1][c] for c in conditions])
for trial in self.items(correct=False):
all_trials.append([0, trial[0]] + [trial[1][c] for c in conditions])
return pandas.DataFrame(all_trials, columns=columns)
def items(self, correct):
"""Iterate through the reaction times.
This takes only one argument: a boolean `correct`, true if we
want to iterate through the correct trials, and false if we
want to iterate through the error trials.
For each iteration, a two-tuple is returned. The first
element is the reaction time, the second is a dictionary
containing the conditions associated with that reaction time.
If you just want the list of RTs, you can directly iterate
through "sample.corr" and "sample.err".
"""
return _Sample_Iter_Wraper(self, correct=correct)
@accepts(Self)
@returns(Self)
def subset(self, **kwargs):
"""Subset the data by filtering based on specified properties.
Each keyword argument should be the name of a property. These
keyword arguments may have one of three values:
- A list: For each element in the returned subset, the
specified property is in this list of values.
- A function: For each element in the returned subset, the
specified property causes the function to evaluate to True.
- Anything else: Each element in the returned subset must have
this value for the specified property.
Return a sample object representing the filtered sample.
"""
mask_corr = np.ones(len(self.corr)).astype(bool)
mask_err = np.ones(len(self.err)).astype(bool)
mask_undec = np.ones(self.undecided).astype(bool)
for k,v in kwargs.items():
if hasattr(v, '__call__'):
mask_corr = np.logical_and(mask_corr, [v(i) for i in self.conditions[k][0]])
mask_err = np.logical_and(mask_err, [v(i) for i in self.conditions[k][1]])
mask_undec = np.asarray([], dtype=bool) if self.undecided == 0 else np.logical_and(mask_undec, [v(i) for i in self.conditions[k][2]])
elif isinstance(v, (list, np.ndarray)):
mask_corr = np.logical_and(mask_corr, [i in v for i in self.conditions[k][0]])
mask_err = np.logical_and(mask_err, [i in v for i in self.conditions[k][1]])
mask_undec = np.asarray([], dtype=bool) if self.undecided == 0 else np.logical_and(mask_undec, [i in v for i in self.conditions[k][2]])
else:
# Create a zero-dimensional array so this will work with tuples too
val = np.array(None)
val[()] = v
mask_corr = np.logical_and(mask_corr, val == self.conditions[k][0])
mask_err = np.logical_and(mask_err, val == self.conditions[k][1])
mask_undec = np.asarray([], dtype=bool) if self.undecided == 0 else np.logical_and(mask_undec, val == self.conditions[k][2])
for k,v in self.conditions.items():
assert len(v[0]) == len(mask_corr)
assert len(v[1]) == len(mask_err)
assert mask_corr.dtype == bool
if len(v) == 3:
assert len(v[2]) == len(mask_undec)
v[2][mask_undec] if len(v) == 3 else np.asarray([])
filtered_conditions = {k : (v[0][mask_corr.astype(bool)],
v[1][mask_err.astype(bool)],
(v[2][mask_undec.astype(bool)] if len(v) == 3 else np.asarray([])))
for k,v in self.conditions.items()}
return Sample(self.corr[mask_corr],
self.err[mask_err],
sum(mask_undec),
**filtered_conditions)
@accepts(Self)
@returns(List(String))
def condition_names(self):
"""The names of conditions which hold some non-zero value in this sample."""
return list(self.conditions.keys())
@accepts(Self, String)
@requires('cond in self.condition_names()')
@returns(List(Unchecked))
def condition_values(self, cond):
"""The values of a condition that have at least one element in the sample.
`cond` is the name of the condition from which to get the
observed values. Returns a list of these values.
"""
cs = self.conditions
cvs = set(cs[cond][0]).union(set(cs[cond][1]))
if len(cs[cond]) == 3:
cvs = cvs.union(set(cs[cond][2]))
return sorted(list(cvs))
# Saved in case we later come across a bug with sets not working for mutable condition values
# if len(cs[cond]) == 3:
# grouped = itertools.groupby(sorted(list(cs[cond][0])+list(cs[cond][1])+list(cs[cond][2])))
# elif len(cs[cond]) == 2:
# grouped = itertools.groupby(sorted(list(cs[cond][0])+list(cs[cond][1])))
# return [g for g,_ in grouped]
@accepts(Self, Maybe(List(String)))
@returns(List(Conditions))
def condition_combinations(self, required_conditions=None):
"""Get all values for set conditions and return every combination of them.
Since PDFs of solved models in general depend on all of the
conditions, this returns a list of dictionaries. The keys of
each dictionary are the names of conditions, and the value is
a particular value held by at least one element in the sample.
Each list contains all possible combinations of condition values.
If `required_conditions` is iterable, only the conditions with
names found within `required_conditions` will be included.
"""
cs = self.conditions
conditions = []
names = self.condition_names()
if required_conditions is not None:
names = [n for n in names if n in required_conditions]
for c in names:
undecided = cs[c][2] if len(cs[c]) == 3 else np.asarray([])
joined = np.concatenate([cs[c][0], cs[c][1], undecided])
conditions.append(joined)
alljoined = list(zip(*conditions))
# Saved in case we later come across a bug with sets not working for mutable condition values
# combs = [g for g,_ in itertools.groupby(sorted(alljoined))]
combs = list(set(alljoined))
if len(combs) == 0: # Generally not needed since iterools.product does this
return [{}]
return [dict(zip(names, c)) for c in combs]
@staticmethod
@accepts(dt=Positive, T_dur=Positive)
@returns(NDArray(d=1, t=Positive0))
#@requires('T_dur/dt < 1e5') # Too large of a number
def t_domain(dt=.01, T_dur=2):
"""The times that corresponds with pdf/cdf_corr/err parameters (their support)."""
return np.linspace(0, T_dur, int(T_dur/dt)+1)
@accepts(Self, dt=Positive, T_dur=Positive)
@returns(NDArray(d=1, t=Positive0))
#@requires('T_dur/dt < 1e5') # Too large of a number
@ensures('len(return) == len(self.t_domain(dt=dt, T_dur=T_dur))')
def pdf_corr(self, dt=.01, T_dur=2):
"""The correct component of the joint PDF."""
return np.histogram(self.corr, bins=int(T_dur/dt)+1, range=(0-dt/2, T_dur+dt/2))[0]/len(self)/dt # dt/2 terms are for continuity correction
@accepts(Self, dt=Positive, T_dur=Positive)
@returns(NDArray(d=1, t=Positive0))
#@requires('T_dur/dt < 1e5') # Too large of a number
@ensures('len(return) == len(self.t_domain(dt=dt, T_dur=T_dur))')
def pdf_err(self, dt=.01, T_dur=2):
"""The error (incorrect) component of the joint PDF."""
return np.histogram(self.err, bins=int(T_dur/dt)+1, range=(0-dt/2, T_dur+dt/2))[0]/len(self)/dt # dt/2 terms are for continuity correction
@accepts(Self, dt=Positive, T_dur=Positive)
@returns(NDArray(d=1, t=Positive0))
#@requires('T_dur/dt < 1e5') # Too large of a number
@ensures('len(return) == len(self.t_domain(dt=dt, T_dur=T_dur))')
def cdf_corr(self, dt=.01, T_dur=2):
"""The correct component of the joint CDF."""
return np.cumsum(self.pdf_corr(dt=dt, T_dur=T_dur))*dt
@accepts(Self, dt=Positive, T_dur=Positive)
@returns(NDArray(d=1, t=Positive0))
@ensures('len(return) == len(self.t_domain(dt=dt, T_dur=T_dur))')
def cdf_err(self, dt=.01, T_dur=2):
"""The error (incorrect) component of the joint CDF."""
return np.cumsum(self.pdf_err(dt=dt, T_dur=T_dur))*dt
@accepts(Self)
@returns(Range(0, 1))
@requires("len(self) > 0")
def prob_correct(self):
"""The probability of selecting the right response."""
return len(self.corr)/len(self)
@accepts(Self)
@returns(Range(0, 1))
@requires("len(self) > 0")
def prob_error(self):
"""The probability of selecting the incorrect (error) response."""
return len(self.err)/len(self)
@accepts(Self)
@returns(Range(0, 1))
@requires("len(self) > 0")
def prob_undecided(self):
"""The probability of selecting neither response (undecided)."""
return self.undecided/len(self)
@accepts(Self)
@returns(Range(0, 1))
@requires("len(self) > 0")
def prob_correct_forced(self):
"""The probability of selecting the correct response if a response is forced."""
return self.prob_correct() + self.prob_undecided()/2.
@accepts(Self)
@returns(Range(0, 1))
@requires("len(self) > 0")
def prob_error_forced(self):
"""The probability of selecting the incorrect response if a response is forced."""
return self.prob_error() + self.prob_undecided()/2.
@accepts(Self)
@requires("len(self.corr) > 0")
@returns(Positive0)
def mean_decision_time(self):
"""The mean decision time in the correct trials."""
return np.mean(self.corr)
class _Sample_Iter_Wraper(object):
"""Provide an iterator for sample objects.
`sample_obj` is the Sample which we plan to iterate. `correct`
should be either True (to iterate through correct responses) or
False (to iterate through error responses).
Each step of the iteration returns a two-tuple, where the first
element is the reaction time, and the second element is a
dictionary of conditions.
"""
def __init__(self, sample_obj, correct):
self.sample = sample_obj
self.i = 0
self.correct = correct
if self.correct:
self.rt = self.sample.corr
self.ind = 0
elif not self.correct:
self.rt = self.sample.err
self.ind = 1
def __iter__(self):
return self
def __next__(self):
if self.i == len(self.rt):
raise StopIteration
self.i += 1
return (self.rt[self.i-1], {k : self.sample.conditions[k][self.ind][self.i-1] for k in self.sample.conditions.keys()})