Skip to content

Commit

Permalink
convert usages of the ExecuteProcess helper into simple @rules to sim…
Browse files Browse the repository at this point in the history
…plify snapshot consumption for process execution (pantsbuild#5703)
  • Loading branch information
cosmicexplorer authored and illicitonion committed Apr 18, 2018
1 parent 19f7aa8 commit 3f49af5
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 440 deletions.
8 changes: 2 additions & 6 deletions src/python/pants/engine/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,10 @@ python_library(
name='isolated_process',
sources=['isolated_process.py'],
dependencies=[
'3rdparty/python/twitter/commons:twitter.common.collections',
':addressable',
':fs',
':nodes',
':struct',
'src/python/pants/build_graph',
':rules',
':selectors',
'src/python/pants/util:objects',
'src/python/pants/util:process_handler',
]
)

Expand Down
17 changes: 17 additions & 0 deletions src/python/pants/engine/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class Snapshot(datatype('Snapshot', ['fingerprint', 'digest_length', 'path_stats
sandboxes.
"""

def __new__(cls, fingerprint, digest_length, path_stats):
# We get a unicode instance when this is instantiated, so ensure it is
# converted to a str.
return super(Snapshot, cls).__new__(cls, str(fingerprint), digest_length, path_stats)

@property
def dirs(self):
return [p for p in self.path_stats if type(p.stat) == Dir]
Expand All @@ -85,6 +90,18 @@ def __str__(self):
FilesContent = Collection.of(FileContent)


# TODO(cosmicexplorer): don't recreate this in python, get this from
# fs::EMPTY_DIGEST somehow.
_EMPTY_FINGERPRINT = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'


EMPTY_SNAPSHOT = Snapshot(
fingerprint=_EMPTY_FINGERPRINT,
digest_length=0,
path_stats=[],
)


@rule(Snapshot, [Select(PathGlobs)])
def snapshot_noop(*args):
raise Exception('This task is replaced intrinsically, and should never run.')
Expand Down
216 changes: 28 additions & 188 deletions src/python/pants/engine/isolated_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,205 +5,33 @@
from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)

import functools
import logging
import os
from abc import abstractproperty
from binascii import hexlify

from pants.engine.rules import RootRule, SingletonRule, TaskRule, rule
from pants.engine.fs import EMPTY_SNAPSHOT
from pants.engine.rules import RootRule, rule
from pants.engine.selectors import Select
from pants.util.contextutil import open_tar, temporary_dir
from pants.util.dirutil import safe_mkdir
from pants.util.objects import datatype
from pants.util.process_handler import subprocess


logger = logging.getLogger(__name__)


def _run_command(binary, sandbox_dir, process_request):
command = binary.prefix_of_command() + tuple(process_request.args)
logger.debug('Running command: "{}" in {}'.format(command, sandbox_dir))
popen = subprocess.Popen(command,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
cwd=sandbox_dir)
# TODO: At some point, we may want to replace this blocking wait with a timed one that returns
# some kind of in progress state.
popen.wait()
logger.debug('Done running command in {}'.format(sandbox_dir))
return popen


def _snapshot_path(snapshot, archive_root):
"""TODO: This is an abstraction leak... see _Snapshots."""
fingerprint_hex = hexlify(snapshot.fingerprint)
snapshot_dir = os.path.join(archive_root, fingerprint_hex[0:2], fingerprint_hex[2:4])
safe_mkdir(snapshot_dir)
return os.path.join(snapshot_dir, '{}.tar'.format(fingerprint_hex))


def _extract_snapshot(snapshot_archive_root, snapshot, sandbox_dir):
with open_tar(_snapshot_path(snapshot, snapshot_archive_root), errorlevel=1) as tar:
tar.extractall(sandbox_dir)


def _snapshotted_process(input_conversion,
output_conversion,
snapshot_directory,
binary,
*args):
"""A pickleable top-level function to execute a process.
Receives two conversion functions, some required inputs, and the user-declared inputs.
"""
process_request = input_conversion(*args)

# TODO resolve what to do with output files, then make these tmp dirs cleaned up.
with temporary_dir(cleanup=False) as sandbox_dir:
if process_request.snapshots:
for snapshot in process_request.snapshots:
_extract_snapshot(snapshot_directory.root, snapshot, sandbox_dir)

# All of the snapshots have been checked out now.
if process_request.directories_to_create:
for d in process_request.directories_to_create:
safe_mkdir(os.path.join(sandbox_dir, d))

popen = _run_command(binary, sandbox_dir, process_request)

process_result = SnapshottedProcessResult(popen.stdout.read(), popen.stderr.read(), popen.returncode)
if process_result.exit_code != 0:
raise Exception('Running {} failed with non-zero exit code: {}'.format(binary,
process_result.exit_code))

return output_conversion(process_result, sandbox_dir)


def _setup_process_execution(input_conversion, *args):
"""A pickleable top-level function to setup pre-execution.
"""
return input_conversion(*args)


def _post_process_execution(output_conversion, *args):
"""A pickleable top-level function to execute a process.
Receives two conversion functions, some required inputs, and the user-declared inputs.
"""
return output_conversion(*args)


class Binary(object):
"""Binary in the product graph.
TODO these should use BinaryUtil to find binaries.
"""

@abstractproperty
def bin_path(self):
pass

def prefix_of_command(self):
return tuple([self.bin_path])


class SnapshottedProcessRequest(datatype('SnapshottedProcessRequest',
['args', 'snapshots', 'directories_to_create'])):
"""Request for execution with binary args and snapshots to extract."""

def __new__(cls, args, snapshots=tuple(), directories_to_create=tuple(), **kwargs):
"""
:param args: Arguments to the binary being run.
:param snapshot_subjects: Subjects used to request snapshots that will be checked out into the sandbox.
:param directories_to_create: Directories to ensure exist in the sandbox before execution.
"""
if not isinstance(args, tuple):
raise ValueError('args must be a tuple.')
if not isinstance(snapshots, tuple):
raise ValueError('snapshots must be a tuple.')
if not isinstance(directories_to_create, tuple):
raise ValueError('directories_to_create must be a tuple.')
return super(SnapshottedProcessRequest, cls).__new__(cls, args, snapshots, directories_to_create, **kwargs)


class SnapshottedProcessResult(datatype('SnapshottedProcessResult', ['stdout', 'stderr', 'exit_code'])):
"""Contains the stdout, stderr and exit code from executing a process."""


class _Snapshots(datatype('_Snapshots', ['root'])):
"""Private singleton value to expose the snapshot directory (managed by rust) to python.
TODO: This is an abstraction leak, but it's convenient to be able to pipeline the input/output
conversion tasks into a single Task node.
"""


class SnapshottedProcess(object):
"""A static helper for defining a task rule to execute a snapshotted process."""

def __new__(cls, *args):
raise ValueError('Use `create` to declare a task function representing a process.')

@staticmethod
def create(product_type, binary_type, input_selectors, input_conversion, output_conversion):
"""TODO: Not clear that `binary_type` needs to be separate from the input selectors."""

# Select the concatenation of the snapshot directory, binary, and input selectors.
inputs = [Select(_Snapshots), Select(binary_type)] + list(input_selectors)

# Apply the input/output conversions to a top-level process-execution function which
# will receive all inputs, convert in, execute, and convert out.
func = functools.partial(_snapshotted_process,
input_conversion,
output_conversion)
func.__name__ = '{}_and_then_snapshotted_process_and_then_{}'.format(
input_conversion.__name__, output_conversion.__name__
)

# Return a task triple that executes the function to produce the product type.
return TaskRule(product_type, inputs, func)


def create_snapshot_rules():
"""Intrinsically replaced on the rust side."""
return [
SingletonRule(_Snapshots, _Snapshots('/dev/null'))
]


class ExecuteProcess(object):
"""A static helper for defining a task rule to execute a process."""

def __new__(cls, *args):
raise ValueError('Use `create` to declare a task function representing a process.')

@staticmethod
def create_in(product_type, input_selectors, input_conversion):
# TODO: combine create_in/create_out fucntions
func = functools.partial(_setup_process_execution, input_conversion)
func.__name__ = '{}_and_then_execute_process'.format(input_conversion.__name__)
inputs = list(input_selectors)

# Return a task triple that executes the function to produce the product type.
return TaskRule(product_type, inputs, func)

@staticmethod
def create_out(product_type, input_selectors, output_conversion):
func = functools.partial(_post_process_execution,
output_conversion)
func.__name__ = 'execute_process_and_then_{}'.format(output_conversion.__name__)
inputs = list(input_selectors)

# Return a task triple that executes the function to produce the product type.
return TaskRule(product_type, inputs, func)


class ExecuteProcessRequest(datatype('ExecuteProcessRequest', ['argv', 'env', 'input_files_digest', 'digest_length'])):
"""Request for execution with args and snapshots to extract."""

@classmethod
def create_from_snapshot(cls, argv, env, snapshot):
return ExecuteProcessRequest(
argv=argv,
env=env,
input_files_digest=snapshot.fingerprint,
digest_length=snapshot.digest_length,
)

@classmethod
def create_with_empty_snapshot(cls, argv, env):
return cls.create_from_snapshot(argv, env, EMPTY_SNAPSHOT)

def __new__(cls, argv, env, input_files_digest, digest_length):
"""
Expand All @@ -212,7 +40,19 @@ def __new__(cls, argv, env, input_files_digest, digest_length):
"""
if not isinstance(argv, tuple):
raise ValueError('argv must be a tuple.')
return super(ExecuteProcessRequest, cls).__new__(cls, argv, tuple(env), input_files_digest, digest_length)

if not isinstance(env, tuple):
raise ValueError('env must be a tuple.')

if not isinstance(input_files_digest, str):
raise ValueError('input_files_digest must be a str.')

if not isinstance(digest_length, int):
raise ValueError('digest_length must be an int.')
if digest_length < 0:
raise ValueError('digest_length must be >= 0.')

return super(ExecuteProcessRequest, cls).__new__(cls, argv, env, input_files_digest, digest_length)


class ExecuteProcessResult(datatype('ExecuteProcessResult', ['stdout', 'stderr', 'exit_code'])):
Expand Down
6 changes: 0 additions & 6 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@
Function,
Function,
Function,
Function,
TypeConstraint,
TypeConstraint,
TypeConstraint,
TypeConstraint,
Expand Down Expand Up @@ -720,7 +718,6 @@ def new_scheduler(self,
work_dir,
ignore_patterns,
construct_snapshot,
construct_snapshots,
construct_file_content,
construct_files_content,
construct_path_stat,
Expand All @@ -733,7 +730,6 @@ def new_scheduler(self,
constraint_variants,
constraint_path_globs,
constraint_snapshot,
constraint_snapshots,
constraint_files_content,
constraint_dir,
constraint_file,
Expand All @@ -752,7 +748,6 @@ def tc(constraint):
tasks,
# Constructors/functions.
func(construct_snapshot),
func(construct_snapshots),
func(construct_file_content),
func(construct_files_content),
func(construct_path_stat),
Expand All @@ -766,7 +761,6 @@ def tc(constraint):
tc(constraint_variants),
tc(constraint_path_globs),
tc(constraint_snapshot),
tc(constraint_snapshots),
tc(constraint_files_content),
tc(constraint_dir),
tc(constraint_file),
Expand Down
7 changes: 2 additions & 5 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
from pants.build_graph.address import Address
from pants.engine.addressable import SubclassesOf
from pants.engine.fs import FileContent, FilesContent, Path, PathGlobs, Snapshot
from pants.engine.isolated_process import (ExecuteProcessRequest, ExecuteProcessResult, _Snapshots,
create_snapshot_rules)
from pants.engine.isolated_process import ExecuteProcessRequest, ExecuteProcessResult
from pants.engine.native import Function, TypeConstraint, TypeId
from pants.engine.nodes import Return, State, Throw
from pants.engine.rules import RuleIndex, SingletonRule, TaskRule
Expand Down Expand Up @@ -91,7 +90,6 @@ def __init__(self, native, build_root, work_dir, ignore_patterns, rule_index):
work_dir,
ignore_patterns,
Snapshot,
_Snapshots,
FileContent,
FilesContent,
Path,
Expand All @@ -104,7 +102,6 @@ def __init__(self, native, build_root, work_dir, ignore_patterns, rule_index):
constraint_for(Variants),
constraint_for(PathGlobs),
constraint_for(Snapshot),
constraint_for(_Snapshots),
constraint_for(FilesContent),
constraint_for(Dir),
constraint_for(File),
Expand Down Expand Up @@ -326,7 +323,7 @@ def __init__(self,
self._execution_request = None

# Validate and register all provided and intrinsic tasks.
rules = list(rules) + create_snapshot_rules()
rules = list(rules)
rule_index = RuleIndex.create(rules)
self._scheduler = WrappedNativeScheduler(native,
project_tree.build_root,
Expand Down
4 changes: 0 additions & 4 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ pub extern "C" fn externs_val_for(key: Key) -> Value {
pub extern "C" fn scheduler_create(
tasks_ptr: *mut Tasks,
construct_snapshot: Function,
construct_snapshots: Function,
construct_file_content: Function,
construct_files_content: Function,
construct_path_stat: Function,
Expand All @@ -200,7 +199,6 @@ pub extern "C" fn scheduler_create(
type_has_variants: TypeConstraint,
type_path_globs: TypeConstraint,
type_snapshot: TypeConstraint,
type_snapshots: TypeConstraint,
type_files_content: TypeConstraint,
type_dir: TypeConstraint,
type_file: TypeConstraint,
Expand All @@ -226,7 +224,6 @@ pub extern "C" fn scheduler_create(
tasks,
Types {
construct_snapshot: construct_snapshot,
construct_snapshots: construct_snapshots,
construct_file_content: construct_file_content,
construct_files_content: construct_files_content,
construct_path_stat: construct_path_stat,
Expand All @@ -239,7 +236,6 @@ pub extern "C" fn scheduler_create(
has_variants: type_has_variants,
path_globs: type_path_globs,
snapshot: type_snapshot,
snapshots: type_snapshots,
files_content: type_files_content,
dir: type_dir,
file: type_file,
Expand Down
Loading

0 comments on commit 3f49af5

Please sign in to comment.