Skip to content

Commit

Permalink
Merge pull request #3 from icgc-dcc/feat/jt-relay
Browse files Browse the repository at this point in the history
first version with features to run complete workflows with scatter/gather calls
  • Loading branch information
junjun-zhang authored May 6, 2017
2 parents ad27b9b + f318fb4 commit 831393d
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 76 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ git clone [email protected]:icgc-dcc/jtracker.git
cd jtracker
pipsi install .
# or install from PyPI for latest release
# or install from PyPI for latest release (no need to clone the source)
pipsi install jtracker
# run it
Expand All @@ -29,16 +29,18 @@ jt

## Design and create your workflow

Follow this example to develop your workflow: https://github.com/icgc-dcc/ega-file-transfer-to-collab
Follow this example to develop your workflow: https://github.com/icgc-dcc/ega-file-transfer-to-collab-jt

Quick note on workflow development and testing:

1. You can have workflow definition and workflow execution in one single Git repo, although separating them in dedicated Git repos is recommended for production use. Here is an example: https://github.com/icgc-dcc/jtracker-example-workflows/tree/master/test.0.0.0.jtracker, note that job states folders and worflow folder are in one repo.

2. It is recommended to use a local Git server for development and testing. Follow this instruction to set up a local git server on a mac: http://www.tomdalling.com/blog/software-processes/how-to-set-up-a-secure-git-server-at-home-osx/. Once set up, you can access it same way as you access github. In my case, `git clone ssh://junjun@localhost:/Users/junjun/mygit/jtracker-demo-workflows.git`


## Create a Git repository to manage workflow execution
## Create a Git repository to manage and track workflow task execution

Here is an example: https://github.com/icgc-dcc/jtracker-example-workflows/tree/master/test.0.1.0.jtracker
Here is an example: https://github.com/icgc-dcc/jtracker-example-workflows/tree/master/ega-file-transfer-to-collab.0.4.0.jtracker

At this time, you will need to set up this Git repository on your own manually. In the near future, 'jt' cli tool will be able to set it up automatically for you.

Expand All @@ -48,7 +50,7 @@ At this time, you will need to set up this Git repository on your own manually.
On a task execution host, you can start a worker as follow assuming workflow definition and job json files exist as specified.

```
jt -g '[email protected]:icgc-dcc/jtracker-example-workflows' -w test -r 0.1.0 worker
jt -g '[email protected]:icgc-dcc/jtracker-example-workflows' -w ega-file-transfer-to-collab -r 0.4.0 worker
```

You can start multiple workers on the same host if there is enough computating resource. You can also start workers in different hosts at the same time. Workflow jobs/tasks will be picked up by individual workers as needed.
Expand Down
2 changes: 1 addition & 1 deletion jtracker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.1.0rc2'
__version__ = '0.1.0rc3'

from .worker import Worker
from .jtracker import JTracker
162 changes: 139 additions & 23 deletions jtracker/gitracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,27 +135,27 @@ def next_job(self, worker=None):

# TODO: create task folders under new_job_path/TASK_STATE.QUEUED
if t_state == TASK_STATE.QUEUED:
for call_name in self.workflow.workflow_calls: # workflow calls defined to call tasks
for task_name in self.workflow.workflow_tasks: # workflow tasks defined to call tools

call_input = self.workflow.workflow_calls[call_name].get('input')
call_input = self.workflow.workflow_tasks[task_name].get('input')

called_task = self.workflow.workflow_calls[call_name].get('task')
depends_on = self.workflow.workflow_calls[call_name].get('depends_on')
called_tool = self.workflow.workflow_tasks[task_name].get('tool')
depends_on = self.workflow.workflow_tasks[task_name].get('depends_on')
task_dict = {
'call': call_name,
'task': task_name,
'input': {},
'task': called_task,
'tool': called_tool,
'depends_on': depends_on,
'command': self.workflow.workflow_dict.get('tasks').get(called_task).get('command'),
'runtime': self.workflow.workflow_dict.get('tasks').get(called_task).get('runtime')
'command': self.workflow.workflow_dict.get('tools').get(called_tool).get('command'),
'runtime': self.workflow.workflow_dict.get('tools').get(called_tool).get('runtime')
}

# need to find out whether this is a task with scatter call
scatter_setting = self.workflow.workflow_calls[call_name].get('scatter')
scatter_setting = self.workflow.workflow_tasks[task_name].get('scatter')
if scatter_setting:
# TODO: if bad things happen here due to error in workflow definition or input Job JSON
# we will need to be able to move the relevant Job JSON to failed folder
scatter_call_name = scatter_setting.get('name')
scatter_task_name = scatter_setting.get('name')
input_variable = scatter_setting.get('input').keys()[0] # must always have one key (at least for now), will do error checking later
task_suffix_field = scatter_setting.get('input').get(input_variable).get('task_suffix')
if not task_suffix_field: task_suffix_field = input_variable
Expand Down Expand Up @@ -189,10 +189,14 @@ def next_job(self, worker=None):
task_suffix = re.sub('[^0-9a-zA-Z]+', '_', task_suffix) # need to avoid special character
task_suffix_set.add(task_suffix)

task_folder = os.path.join(t_path, 'task.%s.%s' % (call_name, task_suffix)) # one of the scattered tasks
task_folder = os.path.join(t_path, 'task.%s.%s' % (task_name, task_suffix)) # one of the scattered tasks
os.makedirs(task_folder) # create the task folder

original_depends_on = task_dict['depends_on']
if original_depends_on and not isinstance(original_depends_on, list):
# TODO: move the job to failed folder with clear error message that depends_on setting invalid
pass

if original_depends_on:
updated_depends_on = []
for parent_call in original_depends_on:
Expand All @@ -215,7 +219,7 @@ def next_job(self, worker=None):

task_dict['input'][i] = value

with open(os.path.join(task_folder, 'task.%s.%s.json' % (call_name, task_suffix)), 'w') as f:
with open(os.path.join(task_folder, 'task.%s.%s.json' % (task_name, task_suffix)), 'w') as f:
f.write(json.dumps(task_dict, indent=2))

# reset for the next iteration
Expand All @@ -228,7 +232,7 @@ def next_job(self, worker=None):
pass

else:
task_folder = os.path.join(t_path, 'task.%s' % call_name)
task_folder = os.path.join(t_path, 'task.%s' % task_name)
os.makedirs(task_folder) # create the task folder

for i in call_input:
Expand All @@ -239,7 +243,7 @@ def next_job(self, worker=None):

task_dict['input'][i] = value

with open(os.path.join(task_folder, 'task.%s.json' % call_name), 'w') as f:
with open(os.path.join(task_folder, 'task.%s.json' % task_name), 'w') as f:
f.write(json.dumps(task_dict, indent=2))

self._git_cmd(['add', self.gitracker_home]) # stage the change
Expand All @@ -264,7 +268,7 @@ def next_task(self, worker=None, jtracker=None, timeout=None):
with open(task_json, 'r') as f:
task_dict = json.loads(f.read())

if not self._check_task_dependency(worker=worker, task_dict=task_dict, job_id=job_id): continue
if not self._check_task_dependency(worker=worker, task_json=task_json, task_dict=task_dict, job_id=job_id): continue

running_worker_path = os.path.join(root.replace(TASK_STATE.QUEUED, TASK_STATE.RUNNING), worker.worker_id)
if not os.path.isdir(running_worker_path): os.makedirs(running_worker_path)
Expand Down Expand Up @@ -367,29 +371,85 @@ def _git_cmd(self, cmds=[]):
os.chdir(origWD)


def _check_task_dependency(self, worker=None, task_dict=None, job_id=None):
def _check_task_dependency(self, worker=None, task_json=None, task_dict=None, job_id=None):
worker_id = worker.worker_id
host_id = worker.host_id

depends_on = task_dict.get('depends_on')
if not depends_on: return True

# constraint can be 'same_host', 'same_worker', 'shared_fs' or None
constraint = self.workflow.workflow_dict.get('workflow',{}).get('execution',{}).get('constraint')

depends_on = task_dict.get('depends_on')
if not depends_on:
# if there is no task running for this job, and this task does not have depends_on
# then start it.
started_tasks = []
for t_state in (TASK_STATE.RUNNING, TASK_STATE.COMPLETED, TASK_STATE.FAILED):
path_to_task = os.path.join(
self.gitracker_home,
JOB_STATE.RUNNING,
job_id,
t_state,
'worker.*',
'task.*',
'task.*.json'
)
started_tasks += glob.glob(path_to_task)

if not started_tasks:
return True # this is the first task to run
else:
one_started_worker_id = started_tasks[0].split(os.path.sep)[-3]
if constraint == 'same_worker':
if worker_id == one_started_worker_id:
return True
else:
return False
elif constraint == 'same_host':
if worker_id.split('.host.')[-1] == one_started_worker_id.split('.host.')[-1]:
return True # this worker is on the same host
else:
return False
elif constraint == 'shared_fs':
# need more thoughts here, for now 'shared_fs' means no constraint
# all worker on all host can access the same shared file system
return True
else:
return False # constraint must set, otherwise won't run

worker_id_pattern = 'worker.*'
"""
## to be implemented properly, for now every worker can run a task
if constraint and constraint == 'same_worker':
worker_id_pattern = worker_id
elif constraint and constraint == 'same_host': # more work is needed to handle tasks do not have dependencies, they will run without checking host
worker_id_pattern = 'worker.*.host.%s.*' % host_id
"""

# we will need better implementation, consider covering 'failed' parent task as well
# right now it only check whether parent task state is COMPLETED
parent_task_to_file = {}
for d in depends_on:
parent_state, parent_task, execution_constraint = (d.split('@') + [None] * 3)[:3]

# check queued tasks, if parent task is still queued then return False, ie, current task not ready to run
path_to_parent_task1_in_queue = os.path.join(
self.gitracker_home,
JOB_STATE.RUNNING,
job_id,
TASK_STATE.QUEUED,
'task.%s' % parent_task,
'task.%s.json' % parent_task
)
path_to_parent_task2_in_queue = os.path.join(
self.gitracker_home,
JOB_STATE.RUNNING,
job_id,
TASK_STATE.QUEUED,
'task.%s.*' % parent_task, # scatter task
'task.%s.*.json' % parent_task
)
if glob.glob(path_to_parent_task1_in_queue) + glob.glob(path_to_parent_task2_in_queue):
return False

# non scatter task
path_to_parent_task_file1 = os.path.join(
self.gitracker_home,
JOB_STATE.RUNNING,
Expand All @@ -400,6 +460,7 @@ def _check_task_dependency(self, worker=None, task_dict=None, job_id=None):
'task.%s.json' % parent_task
)

# scatter task
path_to_parent_task_file2 = os.path.join(
self.gitracker_home,
JOB_STATE.RUNNING,
Expand All @@ -411,16 +472,71 @@ def _check_task_dependency(self, worker=None, task_dict=None, job_id=None):
)

parent_task_files = glob.glob(path_to_parent_task_file1) + glob.glob(path_to_parent_task_file2)
parent_task_states = set()
parent_task_states = set([])
for pt in parent_task_files:
parent_task_states.add(str(pt).split(os.path.sep)[-4])

if not parent_task_states or parent_task_states - set([TASK_STATE.COMPLETED]): # there are other state than 'completed'
parent_task_name = re.sub(r'^task\.', '', str(pt).split(os.path.sep)[-2])
parent_task_to_file[parent_task_name] = str(pt)

if len(parent_task_name.split('.')) == 2: # support one level scatter call only
parent_root_task_name = parent_task_name.split('.')[0]
if not parent_task_to_file.get(parent_root_task_name):
parent_task_to_file[parent_root_task_name] = []

parent_task_to_file[parent_root_task_name].append(str(pt))

# if no parent_task_state indicates parent_task is still in the queue
# when there are other states than 'completed', parent_task has run but not in completed state
if not parent_task_states or parent_task_states - set([TASK_STATE.COMPLETED]):
return False

# retrieve output from parent task and add it to the 'input' of the current task
task_json_rewrite = False
for i in task_dict.get('input'):
if '@' in str(task_dict.get('input').get(i)) \
and task_dict.get('input').get(i).startswith('{{') \
and task_dict.get('input').get(i).endswith('}}'):
input_ref = task_dict.get('input').get(i)
input_ref = re.sub(r'^\{\{', '', input_ref)
input_ref = re.sub(r'\}\}$', '', input_ref)

parent_output_field, parent_task = input_ref.split('@')

task_dict['input'][i] = self._get_task_output(parent_task_to_file.get(parent_task), parent_output_field)

task_json_rewrite = True

if task_json_rewrite:
with open(task_json, 'w') as f:
f.write(json.dumps(task_dict))

return True


def _get_task_output(self, task_json_file, output_field):
if isinstance(task_json_file, str):
with open(task_json_file, 'r') as f:
task_dict = json.loads(f.read())

if not task_dict.get('output'):
return
else:
return task_dict.get('output')[-1].get(output_field)

ret = []
for tjf in task_json_file:
with open(tjf, 'r') as f:
task_dict = json.loads(f.read())

if not task_dict.get('output'):
ret.append(None)
else:
ret.append(task_dict.get('output')[-1].get(output_field))

return ret


def _sync_local_git_with_server(self):
self._git_cmd(["checkout", "-q", "master"])
self._git_cmd(["reset", "--hard", "origin/master"])
Expand Down
2 changes: 1 addition & 1 deletion jtracker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def run(self, retry=None):
# if current task not exists, return False
if not self.task: return False

cmd = "PATH=%s:$PATH %s" % (os.path.join(self.jtracker.workflow_home, 'tasks'), self.task.task_dict.get('command'))
cmd = "PATH=%s:$PATH %s" % (os.path.join(self.jtracker.workflow_home, 'tools'), self.task.task_dict.get('command'))
arg = "'%s'" % json.dumps(self.task.task_dict) if self.task.task_dict else ''

try:
Expand Down
Loading

0 comments on commit 831393d

Please sign in to comment.