Skip to content

Commit

Permalink
NEW: cfg tests
Browse files Browse the repository at this point in the history
- testing single and chained call plus selections
- roadblock atm: #304
  • Loading branch information
tensionhead committed Jul 11, 2022
1 parent 7c10765 commit b75d4b0
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 0 deletions.
3 changes: 3 additions & 0 deletions syncopy/nwanalysis/connectivity_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ def connectivityanalysis(data, method="coh", keeptrials=False, output="abs",
av_compRoutine.pre_check() # make sure we got a trial_average
av_compRoutine.compute(st_out, out, parallel=False, log_dict=log_dict)

# attach potential older cfg's from the input
# to support chained frontend calls..
out.cfg.update(data.cfg)
# attach frontend parameters for replay
out.cfg.update({'connectivityanalysis': new_cfg})
return out
5 changes: 5 additions & 0 deletions syncopy/preproc/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def preprocessing(
filtered, rectified, parallel=kwargs.get("parallel"), log_dict=log_dict
)
del filtered
rectified.cfg.update(data.cfg)
rectified.cfg.update({'preprocessing': new_cfg})
return rectified

Expand All @@ -321,10 +322,14 @@ def preprocessing(
filtered, htrafo, parallel=kwargs.get("parallel"), log_dict=log_dict
)
del filtered
htrafo.cfg.update(data.cfg)
htrafo.cfg.update({'preprocessing': new_cfg})
return htrafo

# no post-processing
else:
# attach potential older cfg's from the input
# to support chained frontend calls..
filtered.cfg.update(data.cfg)
filtered.cfg.update({'preprocessing': new_cfg})
return filtered
2 changes: 2 additions & 0 deletions syncopy/preproc/resampledata.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,5 +222,7 @@ def resampledata(data,
resampleMethod.compute(
data, resampled, parallel=kwargs.get("parallel"), log_dict=log_dict
)

resampled.cfg.update(data.cfg)
resampled.cfg.update({'resampledata': new_cfg})
return resampled
4 changes: 4 additions & 0 deletions syncopy/specest/freqanalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,10 @@ def freqanalysis(data, method='mtmfft', output='pow',
keeptrials=keeptrials)
specestMethod.compute(data, out, parallel=kwargs.get("parallel"), log_dict=log_dct)

# attach potential older cfg's from the input
# to support chained frontend calls..
out.cfg.update(data.cfg)

# attach frontend parameters for replay
out.cfg.update({'freqanalysis': new_cfg})
return out
115 changes: 115 additions & 0 deletions syncopy/tests/test_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
#
# Test cfg structure to replay frontend calls
#

import pytest
import numpy as np
import inspect

# Local imports
import syncopy as spy
from syncopy import __acme__
if __acme__:
import dask.distributed as dd

import syncopy.tests.synth_data as synth_data

# Decorator to decide whether or not to run dask-related tests
skip_without_acme = pytest.mark.skipif(not __acme__, reason="acme not available")

availableFrontend_cfgs = {'freqanalysis': {'method': 'mtmconvol', 't_ftimwin': 0.1},
'preprocessing': {'freq': 10, 'filter_class': 'firws', 'filter_type': 'hp'},
'resampledata': {'resamplefs': 125, 'lpfreq': 100},
'connectivityanalysis': {'method': 'coh', 'tapsmofrq': 5}
}


class TestCfg:

nSamples = 100
nChannels = 3
nTrials = 10
fs = 200
fNy = fs / 2

# -- use flat white noise as test data --

adata = synth_data.white_noise(nTrials,
nSamples=nSamples,
nChannels=nChannels,
samplerate=fs)

# for toi tests, -1s offset
time_span = [-.9, -.6]
flow, fhigh = 0.3 * fNy, 0.4 * fNy

def test_single_frontends(self):

for frontend in availableFrontend_cfgs.keys():

res = getattr(spy, frontend)(self.adata, cfg=availableFrontend_cfgs[frontend])
# now replay with cfg from preceding frontend call
res2 = getattr(spy, frontend)(self.adata, res.cfg)

print(frontend)
# same results
assert np.all(res.data[:] == res2.data[:])
assert res.cfg == res2.cfg

# check that it's not just the defaults
if frontend == 'freqanalysis':
res3 = getattr(spy, frontend)(self.adata)
assert np.any(res.data[:] != res3.data[:])
assert res.cfg != res3.cfg

def test_selection(self):

select = {'toilim': self.time_span, 'trials': [1, 2, 3], 'channel': [2, 0]}
for frontend in availableFrontend_cfgs.keys():
res = getattr(spy, frontend)(self.adata,
cfg=availableFrontend_cfgs[frontend],
select=select)

# now replay with cfg from preceding frontend call
res2 = getattr(spy, frontend)(self.adata, res.cfg)

# same results
assert 'select' in res.cfg[frontend]
assert 'select' in res2.cfg[frontend]
assert np.all(res.data[:] == res2.data[:])
assert res.cfg == res2.cfg

def test_chaining_frontends(self):

# only preprocessing makes sense to chain atm
res_pp = spy.preprocessing(self.adata, cfg=availableFrontend_cfgs['preprocessing'])

for frontend in availableFrontend_cfgs.keys():
res = getattr(spy, frontend)(res_pp,
cfg=availableFrontend_cfgs[frontend])

# now replay with cfg from preceding frontend calls
# note we can use the final results `res.cfg` for both calls!
res_pp2 = spy.preprocessing(self.adata, res.cfg)
res2 = getattr(spy, frontend)(res_pp2, res.cfg)

# same results
assert np.all(res.data[:] == res2.data[:])
assert res.cfg == res2.cfg

@skip_without_acme
def test_parallel(self, testcluster=None):

client = dd.Client(testcluster)
all_tests = [attr for attr in self.__dir__()
if (inspect.ismethod(getattr(self, attr)) and 'parallel' not in attr)]

for test_name in all_tests:
test_method = getattr(self, test_name)
test_method()
client.close()


if __name__ == '__main__':
T1 = TestCfg()

0 comments on commit b75d4b0

Please sign in to comment.