Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make unpacking archives optional #2215

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/guides/writing-mrjobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,23 @@ telling you which file was being read when a task fails.
itself. If your cluster has tightly tuned memory requirements, this can
sometimes cause an out-of-memory error.


Passing raw archive files to your job
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. versionadded:: 0.?.?

By default, files in recognized compressed formats (e.g. `.gz`, `.bz2`) are
decompressed after being download. If you want to handle the original
file directly in your mapper, you can set `unpack_archives: false` in your
runner configuration. For example, to configure this setting for the Hadoop
runner, use::

runners:
hadoop:
unpack_archives: false


.. _non-hadoop-streaming-jar-steps:

Jar steps
Expand Down
21 changes: 12 additions & 9 deletions mrjob/bin.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def _default_opts(cls):
super(MRJobBinRunner, cls)._default_opts(),
dict(
read_logs=True,
unpack_archives=True
)
)

Expand Down Expand Up @@ -689,15 +690,17 @@ def _manifest_download_content(self):
lines.append('')

# unpack .bz2 and .gz files
lines.append(' # if input file is compressed, unpack it')
lines.append(' case $INPUT_PATH in')
for ext, cmd in self._manifest_uncompress_commands():
lines.append(' *.%s)' % ext)
lines.append(' %s $INPUT_PATH' % cmd)
lines.append(" INPUT_PATH="
r"$(echo $INPUT_PATH | sed -e 's/\.%s$//')" % ext)
lines.append(' ;;')
lines.append(' esac')
if self._opts['unpack_archives']:
lines.append(' # if input file is compressed, unpack it')
lines.append(' case $INPUT_PATH in')
for ext, cmd in self._manifest_uncompress_commands():
lines.append(' *.%s)' % ext)
lines.append(' %s $INPUT_PATH' % cmd)
lines.append(" INPUT_PATH="
r"$(echo $INPUT_PATH | sed -e 's/\.%s$//')" % ext)
lines.append(' ;;')
lines.append(' esac')

lines.append('} 1>&2')
lines.append('')

Expand Down
12 changes: 12 additions & 0 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,18 @@ def __call__(self, parser, namespace, value, option_string=None):
)),
],
),
unpack_archives=dict(
switches=[
(['--unpack-archives'], dict(
action='store_true',
help=('Unpack archives when processing entire files (the default).')
)),
(['--no-unpack-archives'], dict(
action='store_false',
help="Don't unpack archive formats when processing entire files."
)),
],
),
upload_archives=dict(
combiner=combine_path_lists,
switches=[
Expand Down
1 change: 1 addition & 0 deletions mrjob/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class MRJobRunner(object):
'py_files',
'read_logs',
'setup',
'unpack_archives',
'upload_archives',
'upload_dirs',
'upload_files'
Expand Down