-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathrepackage.py
executable file
·190 lines (159 loc) · 6.61 KB
/
repackage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#!/usr/bin/env python
"""
SDM tar archive creation utility.
example usage:
create_archive.py 8311_17_1_dicoms -f dicoms newarchive
"""
from __future__ import print_function
import os
import glob
import json
import tarfile
import calendar
import datetime
def datetime_encoder(o):
if isinstance(o, datetime.datetime):
if o.utcoffset() is not None:
o = o - o.utcoffset()
return {"$date": int(calendar.timegm(o.timetuple()) * 1000 + o.microsecond / 1000)}
raise TypeError(repr(o) + " is not JSON serializable")
def datetime_decoder(dct):
if "$date" in dct:
return datetime.datetime.utcfromtimestamp(float(dct["$date"]) / 1000.0)
return dct
def create_archive(path, content, arcname, metadata={}, **kwargs):
# write metadata file
metadata_filepath = os.path.join(content, 'METADATA.json')
if os.path.exists(metadata_filepath):
existing_metadata = json.load(open(metadata_filepath), object_hook=datetime_decoder)
metadata.update(existing_metadata)
with open(metadata_filepath, 'w') as json_file:
json.dump(metadata, json_file, default=datetime_encoder)
json_file.write('\n')
# write digest file
digest_filepath = os.path.join(content, 'DIGEST.txt')
open(digest_filepath, 'w').close() # touch file, so that it's included in the digest
filenames = sorted(os.listdir(content), key=lambda fn: (fn.endswith('.json') and 1) or (fn.endswith('.txt') and 2) or fn)
with open(digest_filepath, 'w') as digest_file:
digest_file.write('\n'.join(filenames) + '\n')
# create archive
with tarfile.open(path, 'w:gz', **kwargs) as archive:
archive.add(content, arcname, recursive=False) # add the top-level directory
for fn in filenames:
archive.add(os.path.join(content, fn), os.path.join(arcname, fn))
def repackage(dcmtgz, outdir=None, args=None):
if outdir and not os.path.exists(outdir):
os.makedirs(outdir)
outname = os.path.basename(dcmtgz)
if outdir:
outname = os.path.join(outdir, outname)
if os.path.exists(outname):
print ('%s exists! We will replace it.' % outname)
with TemporaryDirectory() as tempdir_path:
with tarfile.open(dcmtgz) as archive:
archive.extractall(path=tempdir_path)
dcm_dir = glob.glob(os.path.join(tempdir_path, '*'))[0]
metadata = {'filetype': 'dicom'}
if args.group:
if not args.project:
args.project='unknown'
overwrite = {'overwrite': { 'group_name': args.group, 'project_name': args.project }}
metadata.update(overwrite)
basename = os.path.basename(dcm_dir)
print ('repackaging %s to %s' % (dcmtgz, outname))
create_archive(outname, dcm_dir, basename, metadata, compresslevel=6)
"""This is a backport of TemporaryDirectory from Python 3.3."""
import warnings as _warnings
import sys as _sys
import os as _os
from tempfile import mkdtemp
template = "tmp"
# entire contents of tempfile copied here for portability
class TemporaryDirectory(object):
"""Create and return a temporary directory. This has the same
behavior as mkdtemp but can be used as a context manager. For
example:
with TemporaryDirectory() as tmpdir:
...
Upon exiting the context, the directory and everything contained
in it are removed.
"""
def __init__(self, suffix="", prefix=template, dir=None):
self._closed = False
self.name = None # Handle mkdtemp raising an exception
self.name = mkdtemp(suffix, prefix, dir)
def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.name)
def __enter__(self):
return self.name
def cleanup(self, _warn=False):
if self.name and not self._closed:
try:
self._rmtree(self.name)
except (TypeError, AttributeError) as ex:
# Issue #10188: Emit a warning on stderr
# if the directory could not be cleaned
# up due to missing globals
if "None" not in str(ex):
raise
print("ERROR: {!r} while cleaning up {!r}".format(ex, self,),
file=_sys.stderr)
return
self._closed = True
if _warn:
self._warn("Implicitly cleaning up {!r}".format(self),
ResourceWarning)
def __exit__(self, exc, value, tb):
self.cleanup()
def __del__(self):
# Issue a ResourceWarning if implicit cleanup needed
self.cleanup(_warn=True)
# XXX (ncoghlan): The following code attempts to make
# this class tolerant of the module nulling out process
# that happens during CPython interpreter shutdown
# Alas, it doesn't actually manage it. See issue #10188
_listdir = staticmethod(_os.listdir)
_path_join = staticmethod(_os.path.join)
_isdir = staticmethod(_os.path.isdir)
_islink = staticmethod(_os.path.islink)
_remove = staticmethod(_os.remove)
_rmdir = staticmethod(_os.rmdir)
_os_error = OSError
_warn = _warnings.warn
def _rmtree(self, path):
# Essentially a stripped down version of shutil.rmtree. We can't
# use globals because they may be None'ed out at shutdown.
for name in self._listdir(path):
fullname = self._path_join(path, name)
try:
isdir = self._isdir(fullname) and not self._islink(fullname)
except self._os_error:
isdir = False
if isdir:
self._rmtree(fullname)
else:
try:
self._remove(fullname)
except self._os_error:
pass
try:
self._rmdir(path)
except self._os_error:
pass
if __name__ == '__main__':
import argparse
ap = argparse.ArgumentParser()
ap.add_argument('target', help='input tgz or dir to walk)')
ap.add_argument('-o', '--output_dir', help='output into this directory, will create if doesn not exist')
ap.add_argument('-g', '--group', type=str, help='name of group to sort data into')
ap.add_argument('-p', '--project', type=str, help='name of project to sort data into')
args = ap.parse_args()
outdir = None
if args.output_dir:
outdir = os.path.abspath(args.output_dir)
print ('outputting to %s' % outdir)
if os.path.isdir(args.target):
for f in glob.glob(os.path.join(args.target, '*.tgz')):
repackage(f, args.output_dir, args)
elif os.path.isfile(args.target):
repackage(args.target, args.output_dir, args)