Skip to content

Commit

Permalink
Merge pull request #46 from forsyth2/refactor
Browse files Browse the repository at this point in the history
Refactor
  • Loading branch information
forsyth2 authored Feb 18, 2020
2 parents bbc748d + 0d47156 commit c37fe34
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 801 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ build/
dist/
zstash.egg-info/
*.pyc
*~
\#*
783 changes: 150 additions & 633 deletions tests/test.py

Large diffs are not rendered by default.

38 changes: 13 additions & 25 deletions zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import tarfile
from subprocess import Popen, PIPE
from .hpss import hpss_put
from .utils import addfiles, excludeFiles
from .hpss_utils import add_files
from .settings import config, logger, CACHE, BLOCK_SIZE, DB_FILENAME

from .utils import exclude_files, run_command

def create():

Expand Down Expand Up @@ -68,28 +68,16 @@ def create():
if config.hpss != 'none':
# Create target HPSS directory if needed
logger.debug('Creating target HPSS directory')
p1 = Popen(['hsi', '-q', 'mkdir', '-p', config.hpss],
stdout=PIPE, stderr=PIPE)
(stdout, stderr) = p1.communicate()
status = p1.returncode
if status != 0:
logger.error('Could not create HPSS directory: %s', config.hpss)
logger.debug('stdout:\n%s', stdout)
logger.debug('stderr:\n%s', stderr)
raise Exception
command = 'hsi -q mkdir -p {}'.format(config.hpss)
error_str = 'Could not create HPSS directory: {}'.format(config.hpss)
run_command(command, error_str)

# Make sure it is empty
logger.debug('Making sure target HPSS directory exists and is empty')
cmd = 'hsi -q "cd %s; ls -l"' % (config.hpss)
p1 = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
(stdout, stderr) = p1.communicate()
status = p1.returncode
if status != 0 or len(stdout) != 0 or len(stderr) != 0:
logger.error('Target HPSS directory is not empty')
logger.debug('stdout:\n%s', stdout)
logger.debug('stderr:\n%s', stderr)
raise Exception


command = 'hsi -q "cd {}; ls -l"'.format(config.hpss)
error_str = 'Target HPSS directory is not empty'
run_command(command, error_str)

# Create cache directory
logger.debug('Creating local cache directory')
Expand Down Expand Up @@ -163,10 +151,10 @@ def create():

# Eliminate files based on exclude pattern
if args.exclude is not None:
files = excludeFiles(args.exclude, files)
files = exclude_files(args.exclude, files)

# Add files to archive
failures = addfiles(cur, con, -1, files)
failures = add_files(cur, con, -1, files)

# Close database and transfer to HPSS. Always keep local copy
con.commit()
Expand All @@ -176,5 +164,5 @@ def create():
# List failures
if len(failures) > 0:
logger.warning('Some files could not be archived')
for file in failures:
logger.error('Archiving %s' % (file))
for file_path in failures:
logger.error('Archiving %s' % (file_path))
34 changes: 17 additions & 17 deletions zstash/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ def extract(keep_files=True):

# Find matching files
matches = []
for file in args.files:
cur.execute(u"select * from files where name GLOB ? or tar GLOB ?", (file, file))
for args_file in args.files:
cur.execute(u"select * from files where name GLOB ? or tar GLOB ?", (args_file, args_file))
matches = matches + cur.fetchall()

# Sort by the filename, tape (so the tar archive),
Expand Down Expand Up @@ -266,17 +266,17 @@ def extractFiles(files, keep_files, keep_tars, multiprocess_worker=None):
for i in range(nfiles):
# The current structure of each of the db row, `file`, is:
# (id, name, size, mtime, md5, tar, offset)
file = files[i]
file_tuple = files[i]

# Open new tar archive
if newtar:
newtar = False
tfname = os.path.join(CACHE, file[5])
tfname = os.path.join(CACHE, file_tuple[5])
# Everytime we're extracting a new tar, if running in parallel,
# let the process know.
# This is to synchronize the print statements.
if multiprocess_worker:
multiprocess_worker.set_curr_tar(file[5])
multiprocess_worker.set_curr_tar(file_tuple[5])

if not os.path.exists(tfname):
# Will need to retrieve from HPSS
Expand All @@ -287,24 +287,24 @@ def extractFiles(files, keep_files, keep_tars, multiprocess_worker=None):

# Extract file
cmd = 'Extracting' if keep_files else 'Checking'
logger.info(cmd + ' %s' % (file[1]))
logger.info(cmd + ' %s' % (file_tuple[1]))
# if multiprocess_worker:
# print('{} is {} {} from {}'.format(multiprocess_worker, cmd, file[1], file[5]))

if keep_files and not should_extract_file(file):
if keep_files and not should_extract_file(file_tuple):
# If we were going to extract, but aren't
# because a matching file is on disk
msg = 'Not extracting {}, because it'
msg += ' already exists on disk with the same'
msg += ' size and modification date.'
logger.info(msg.format(file[1]))
logger.info(msg.format(file_tuple[1]))

# True if we should actually extract the file from the tar
extract_this_file = keep_files and should_extract_file(file)
extract_this_file = keep_files and should_extract_file(file_tuple)

try:
# Seek file position
tar.fileobj.seek(file[6])
tar.fileobj.seek(file_tuple[6])

# Get next member
tarinfo = tar.tarinfo.fromtarfile(tar)
Expand Down Expand Up @@ -346,16 +346,16 @@ def extractFiles(files, keep_files, keep_tars, multiprocess_worker=None):
tar.chmod(tarinfo, fname)
tar.utime(tarinfo, fname)
# Verify size
if os.path.getsize(fname) != file[2]:
if os.path.getsize(fname) != file_tuple[2]:
logger.error('size mismatch for: %s' % (fname))

# Verify md5 checksum
if md5 != file[4]:
if md5 != file_tuple[4]:
logger.error('md5 mismatch for: %s' % (fname))
logger.error('md5 of extracted file: %s' % (md5))
logger.error('md5 of original file: %s' % (file[4]))
logger.error('md5 of original file: %s' % (file_tuple[4]))

failures.append(file)
failures.append(file_tuple)
else:
logger.debug('Valid md5: %s %s' % (md5, fname))

Expand All @@ -373,8 +373,8 @@ def extractFiles(files, keep_files, keep_tars, multiprocess_worker=None):

except:
traceback.print_exc()
logger.error('Retrieving %s' % (file[1]))
failures.append(file)
logger.error('Retrieving %s' % (file_tuple[1]))
failures.append(file_tuple)

if multiprocess_worker:
multiprocess_worker.print_contents()
Expand All @@ -386,7 +386,7 @@ def extractFiles(files, keep_files, keep_tars, multiprocess_worker=None):
tar.close()

if multiprocess_worker:
multiprocess_worker.done_enqueuing_output_for_tar(file[5])
multiprocess_worker.done_enqueuing_output_for_tar(file_tuple[5])

# Open new archive next time
newtar = True
Expand Down
12 changes: 1 addition & 11 deletions zstash/hpss.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,7 @@
import shlex
import subprocess
from .settings import DB_FILENAME, logger


def run_command(command, error_str):
p1 = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = p1.communicate()
status = p1.returncode
if status != 0:
logger.error(error_str)
logger.debug('stdout:\n%s', stdout)
logger.debug('stderr:\n%s', stderr)
raise Exception
from .utils import run_command


def hpss_transfer(hpss, file_path, transfer_type, keep=None):
Expand Down
100 changes: 100 additions & 0 deletions zstash/hpss_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from __future__ import print_function, absolute_import

import hashlib
import os.path
import tarfile
import traceback
from datetime import datetime
from .hpss import hpss_put
from .settings import config, logger, CACHE, BLOCK_SIZE, DB_FILENAME


def add_files(cur, con, itar, files):

# Now, perform the actual archiving
failures = []
newtar = True
nfiles = len(files)
for i in range(nfiles):

# New tar archive in the local cache
if newtar:
newtar = False
archived = []
tarsize = 0
itar += 1
tname = "{0:0{1}x}".format(itar, 6)
tfname = "%s.tar" % (tname)
logger.info('Creating new tar archive %s' % (tfname))
tar = tarfile.open(os.path.join(CACHE, tfname), "w")

# Add current file to tar archive
current_file = files[i]
logger.info('Archiving %s' % (current_file))
try:
offset, size, mtime, md5 = add_file(tar, current_file)
archived.append((current_file, size, mtime, md5, tfname, offset))
tarsize += size
except:
traceback.print_exc()
logger.error('Archiving %s' % (current_file))
failures.append(current_file)

# Close tar archive if current file is the last one or adding one more
# would push us over the limit.
next_file_size = tar.gettarinfo(current_file).size
if (i == nfiles-1 or tarsize+next_file_size > config.maxsize):

# Close current temporary file
logger.debug('Closing tar archive %s' % (tfname))
tar.close()

# Transfer tar archive to HPSS
hpss_put(config.hpss, os.path.join(CACHE, tfname), config.keep)

# Update database with files that have been archived
cur.executemany(u"insert into files values (NULL,?,?,?,?,?,?)",
archived)
con.commit()

# Open new archive next time
newtar = True

return failures


# Add file to tar archive while computing its hash
# Return file offset (in tar archive), size and md5 hash
def add_file(tar, file_name):
offset = tar.offset
tarinfo = tar.gettarinfo(file_name)
# Change the size of any hardlinks from 0 to the size of the actual file
if tarinfo.islnk():
tarinfo.size = os.path.getsize(file_name)
tar.addfile(tarinfo)

# Only add files or hardlinks.
# So don't add directories or softlinks.
if tarinfo.isfile() or tarinfo.islnk():
f = open(file_name, "rb")
hash_md5 = hashlib.md5()
while True:
s = f.read(BLOCK_SIZE)
if len(s) > 0:
tar.fileobj.write(s)
hash_md5.update(s)
if len(s) < BLOCK_SIZE:
blocks, remainder = divmod(tarinfo.size, tarfile.BLOCKSIZE)
if remainder > 0:
tar.fileobj.write(tarfile.NUL *
(tarfile.BLOCKSIZE - remainder))
blocks += 1
tar.offset += blocks * tarfile.BLOCKSIZE
break
f.close()
md5 = hash_md5.hexdigest()
else:
md5 = None
size = tarinfo.size
mtime = datetime.utcfromtimestamp(tarinfo.mtime)
return offset, size, mtime, md5
4 changes: 2 additions & 2 deletions zstash/ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ def ls():

# Find matching files
matches = []
for file in args.files:
cur.execute(u"select * from files where name GLOB ? or tar GLOB ?", (file, file))
for args_file in args.files:
cur.execute(u"select * from files where name GLOB ? or tar GLOB ?", (args_file, args_file))
matches = matches + cur.fetchall()

# Remove duplicates
Expand Down
24 changes: 12 additions & 12 deletions zstash/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import sys
from datetime import datetime
from .hpss import hpss_get, hpss_put
from .utils import addfiles, excludeFiles
from .hpss_utils import add_files
from .settings import config, logger, CACHE, BLOCK_SIZE, DB_FILENAME, TIME_TOL

from .utils import exclude_files

def update():

Expand Down Expand Up @@ -100,13 +100,13 @@ def update():

# Eliminate files based on exclude pattern
if args.exclude is not None:
files = excludeFiles(args.exclude, files)
files = exclude_files(args.exclude, files)

# Eliminate files that are already archived and up to date
newfiles = []
for file in files:
for file_path in files:

statinfo = os.lstat(file)
statinfo = os.lstat(file_path)
mdtime_new = datetime.utcfromtimestamp(statinfo.st_mtime)
mode = statinfo.st_mode
# For symbolic links or directories, size should be 0
Expand All @@ -115,7 +115,7 @@ def update():
else:
size_new = statinfo.st_size

cur.execute(u"select * from files where name = ?", (file,))
cur.execute(u"select * from files where name = ?", (file_path,))
new = True
while True:
match = cur.fetchone()
Expand All @@ -131,7 +131,7 @@ def update():
break
# print(file,size_new,size,mdtime_new,mdtime)
if (new):
newfiles.append(file)
newfiles.append(file_path)

# Anything to do?
if len(newfiles) == 0:
Expand All @@ -141,8 +141,8 @@ def update():
# --dry-run option
if args.dry_run:
print("List of files to be updated")
for file in newfiles:
print(file)
for file_path in newfiles:
print(file_path)
return

# Find last used tar archive
Expand All @@ -153,7 +153,7 @@ def update():
itar = max(itar, int(tfile[0][0:6], 16))

# Add files
failures = addfiles(cur, con, itar, newfiles)
failures = add_files(cur, con, itar, newfiles)

# Close database and transfer to HPSS. Always keep local copy
con.commit()
Expand All @@ -163,5 +163,5 @@ def update():
# List failures
if len(failures) > 0:
logger.warning('Some files could not be archived')
for file in failures:
logger.error('Archiving %s' % (file))
for file_path in failures:
logger.error('Archiving %s' % (file_path))
Loading

0 comments on commit c37fe34

Please sign in to comment.