Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace pickle with shelve #4

Merged
merged 18 commits into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ __*
*.bak
!HCP_1200/.gitkeep
!HCP_1200/meta_data.csv
!HCP_1200/README.md
*.pyc
68 changes: 67 additions & 1 deletion HCP_1200/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,67 @@
The file `metadata.csv` contains publicly accessible metadata regarding the scans in the HCP S1200 release. In using this repository and the data contained in it you are agreeing to the [Open Access Terms](https://www.humanconnectome.org/study/hcp-young-adult/document/wu-minn-hcp-consortium-open-access-data-use-terms) established by the Connectome Organization.
The file metadata.csv contains publicly accessible metadata regarding the scans in the HCP S1200 release. In using this repository and the data contained in it you are agreeing to the Open Access Terms established by the Connectome Organization.

# How to use shelved Py object

The code on successful run should place three files called `hcp_data.dat`, `hcp_data.bak` and `hcp_data.dir` in the `HCP_1200` folder. You can access the content of these files as follows:

```python
import zipshelve
from pathlib import Path

fin = Path('HCP_1200/hcp_data')

with zipshelve.open(fin, mode='r') as shelf:
print('Have subjects:')

# Can also call shelf.ls() instead of for loop
for key in shelf.keys():
print(key)

for key, scans in shelf.items():
print('\nFor subject: ' + str(key) + '\thave:')
for scan, data_dict in scans.items():
if scan != 'metadata':
print('Scan: \t', scan)
print('With ROIs:\n', '\n'.join(list(data_dict.keys())))
else:
print('Subject age:\t', data_dict.loc['Age'])
```

The [`shelve`](https://docs.python.org/3/library/shelve.html) module wraps around pickling, by loading only the necessary key from disk (as opposed to `pickle.load()` which would load the whole data-set into memory). We move from `pickle` to `shelve` because pickling the whole data set will likely cause memory issues. The use of disk as opposed to RAM for hold data will cause some parallelization challenges, but that is to be worked out.

# A `gdbm` issue

On Linux machines, if the `gdbm` module is not installed then `shelve` modules' readonly flag when opening does not work. This is an issue caused by the fact that Anaconda environment uses its own `lib-dynload` directory so even running

`sudo apt-get install python3-gdbm`

may not fix the issue. To get around this (after running the above command) run:

`dpkg -L python3-gdbm`

and examine the output for a file with name of the form: `_gdbm.cpython-3Xm-x86_64-linux-gnu.so`. This is the dynamic library we need. Then with the Anaconda environment activated run:

`cd $(python -c 'import sys; [print(x) for x in sys.path if "lib-dynload" in x]')`

to `cd` into the Anaconda environment's `lib-dynload` folder. Next copy the above file dependeing on `36m` or `37m` into this folder. This should fix the issue. To confirm run:

```Python
import shelve

with shelve.open('test_shelf.gdb') as s:
s['key1'] = {'int': 10,'float': 9.5, 'string': 'Sample data'}

with shelve.open('test_shelf.gdb', flag='r') as s:
existing = s['key1']

with shelve.open('test_shelf.gdb', flag='r') as s:
print('Existing:', s['key1'])
s['key1'] = 'new value'

with shelve.open('test_shelf.gdb', flag='r') as s:
print('Existing:', s['key1'])
```

on the Python prompt to confirm you get the error: `_gdbm.error: Reader can't store`. If the problem is **not** fixed the output will be `Existing: new value` which means the readonly flag did **not work.**.


92 changes: 35 additions & 57 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,20 @@ This is a short explanation of the inner workings of the code in this repositor
#### Testing 1
If you have Amazon S3 and `boto` set up correctly with your credentials, you should be able to activate your environment, fire up python, and run

from download_hcp import *
dfiles = download_subject('100610')

```Python
from download_hcp import *
dfiles = download_subject('100610')
```
and get no errors.


#### Testing 2
If you have workbench installed and correctly added to path, then in your conda environment, you should be able to fire up python and say

import subprocess
subprocess.run(['wb_command','-help'])

```Python
import subprocess
subprocess.run(['wb_command','-help'])
```
and get meaningful output.

## Parallelizing
Expand All @@ -61,17 +62,16 @@ Prof. YMB suggested that having large amounts of RAM even with just a few cores

Note how `do_subject` really only does:

clean_subject(idx, process_subject(*download_subject(idx)))
```clean_subject(idx, process_subject(*download_subject(idx)))```

and parallelization only involves:

with mp.Pool(N) as pool:
result = pool.map(do_subject, subject_ids)

```Python
with mp.Pool(N) as pool:
result = pool.map(do_subject, subject_ids)
```
where `N` is the number of parallel processes. That's so clean even I am surprised that it worked out this way.


---

# Using rpy2 for CIFTI2
Expand All @@ -80,61 +80,39 @@ where `N` is the number of parallel processes. That's so clean even I am surpris

We utilize an R module in this repo. If you set up the environment using the provided .yml file, and it worked without errors you should be good. Else you need to first, install rpy2 for conda using:

conda install rpy2
```conda install rpy2```

on your environment in use. That should install the R packages needed to use R from within python. Next install the `cifti` package from CRAN:

# import rpy2's package module
import rpy2.robjects.packages as rpackages

# import R's utility package
utils = rpackages.importr('utils')
utils.install_packages('cifti')

```Python
# import rpy2's package module
import rpy2.robjects.packages as rpackages

# import R's utility package
utils = rpackages.importr('utils')
utils.install_packages('cifti')
```
It should prompt you to pick a CRAN server for the session. If the installation is successful, it should end with

.
.
** building package indices
** installing vignettes
** testing if installed package can be loaded
* DONE (cifti)
.
.
** building package indices
** installing vignettes
** testing if installed package can be loaded
* DONE (cifti)

You can confirm successful installation by opening python and running:

from rpy2.robjects.packages import importr
importr('cifti')


```Python
from rpy2.robjects.packages import importr
importr('cifti')
```
which should return:

>>> importr('cifti')
rpy2.robjects.packages.Package as a <module 'cifti'>
>>> importr('cifti')
rpy2.robjects.packages.Package as a <module 'cifti'>

You may have to install a development packageis on your system for `xml2`, etc. Just use `sudo apt-get install xml2-dev` or whatever is missing.

Alternatively, you can set up the environment using the `environment.yml` file.

# How to use pickled Py object

The code on successful run should place a file called `hcp_data.bin` in the `HCP_1200` folder. You can access the content of this file as follows:

import gzip, pickle
import numpy as np

with gzip.open('hcp_data.bin') as s:
data = pickle.load(s)

print('Have subjects:')

for key in data.keys():
print('\t' + str(key))

for key, scans in data.items():
print('\nFor subject: ' + str(key) + '\thave:')
for scan, data_dict in scans.items():
if scan != 'metadata':
print('Scan: \t', scan)
print('With ROIs:\n', '\n'.join(list(data_dict.keys())))
else:
print('Subject age:\t', data_dict.loc['Age'])

40 changes: 22 additions & 18 deletions automate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from download_hcp import do_subject
import multiprocessing as mp
import gzip, pickle
from rpy2.robjects.packages import importr
import zipshelve
from pickle import HIGHEST_PROTOCOL
from datetime import datetime


def main():
Expand All @@ -16,35 +18,37 @@ def main():
utils = importr('utils')
utils.install_packages('cifti')


with open('subjectlist.txt') as stream:
subject_ids = stream.readlines()


# Strip newline characters
subject_ids = [idx.strip() for idx in subject_ids]
fin = 'HCP_1200/hcp_data'

# Download and process. `procs` is # of processors
procs = 4

print(datetime.now())
with mp.Pool(procs) as pool:
result = pool.map(do_subject, subject_ids)

print('Pickling returned data')

# `result` is a list of dictionaries, but has no subject
# identifier so we make a new dictionary

data = dict(zip(map(int,subject_ids), result))

with gzip.open('HCP_1200/hcp_data.bin', 'wb') as stream:
pickle.dump(data, stream)
result = zip(subject_ids, pool.map(do_subject, subject_ids))

print('Shelving returned data')

with zipshelve.open(fin, protocol=HIGHEST_PROTOCOL) as shelf:
for key, value in result:
shelf[key] = value

print(datetime.now())

# Serial instead of parallel?

# for idx in subject_ids:
# datum = do_subject(idx)
# breakpoint()
# datum = dict()
# for idx in subject_ids:
# datum[idx] = do_subject(idx)
#
# with zipshelve.open(fin, protocol=HIGHEST_PROTOCOL) as shelf:
# for key, value in datum:
# shelf[key] = value



if __name__ == "__main__":
Expand Down
26 changes: 17 additions & 9 deletions download_hcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,22 @@ def download_subject(sname):
bucket = s3.Bucket(BUCKET_NAME)

# Append all keys( file names with full path) to a list
key_list = [str(key.key) for key in bucket.objects.filter(Prefix='HCP_1200/' + sname)]
key_list = (str(key.key) for key in bucket.objects.filter(Prefix='HCP_1200/' + sname))

print('-'*10, ' Downloading data for ...', sname)

keyword1='Atlas_MSMAll_hp2000_clean.dtseries.nii'
keyword2='aparc.32k_fs_LR.dlabel.nii'
filtered_list1 = list(filter(lambda x: (keyword1 in x ), key_list))
filtered_list2 = list(filter(lambda x: (keyword2 in x ), key_list))
filtered_list = filtered_list1 + filtered_list2
filtered_list = filter(lambda x: (keyword1 in x or keyword2 in x), key_list)
dense_time_series, parcel_labels = list(), list()

# filtered_list2 = list(filter(lambda x: (keyword2 in x ), key_list))
# filtered_list = filtered_list1 + filtered_list2

# Loop through keys and use download_file to download each key (file) to the directory where this code is running.
for key in filtered_list:
for key in filtered_list:
try:
# Respect the directory structure
# Respect the directory structure
os.makedirs(os.path.dirname(key), exist_ok=True)
if not Path(key).is_file():
s3.Bucket(BUCKET_NAME).download_file(key, key)
Expand All @@ -55,8 +57,15 @@ def download_subject(sname):
else:
raise KeyError

#return dense_time_series, parcellation_labels, subject name
return filtered_list1, filtered_list2, sname
if keyword1 in key:
dense_time_series.append(key)
elif keyword2 in key:
parcel_labels.append(key)
else:
raise LookupError

# return dense_time_series, parcellation_labels, subject name
return dense_time_series, parcel_labels, sname


def process_subject(dtseries, dlabels, sid):
Expand Down Expand Up @@ -111,7 +120,6 @@ def du(path):
return subprocess.check_output(['powershell', win_string])



def process_ptseries(ptseries):
""" Process the XML format ptseries to extract the ROI/time series file.

Expand Down
Loading