From e35c12dc51bc7bd2d94cbfcc9b71fc86383e4e95 Mon Sep 17 00:00:00 2001 From: Junjun Zhang Date: Fri, 5 May 2017 00:05:31 -0400 Subject: [PATCH 1/4] refactor to change tasks to tools, calls to tasks --- jtracker/gitracker.py | 32 ++++++++------- jtracker/worker.py | 2 +- jtracker/workflow.py | 92 +++++++++++++++++++++---------------------- 3 files changed, 65 insertions(+), 61 deletions(-) diff --git a/jtracker/gitracker.py b/jtracker/gitracker.py index 8010ff6..0000519 100644 --- a/jtracker/gitracker.py +++ b/jtracker/gitracker.py @@ -135,27 +135,27 @@ def next_job(self, worker=None): # TODO: create task folders under new_job_path/TASK_STATE.QUEUED if t_state == TASK_STATE.QUEUED: - for call_name in self.workflow.workflow_calls: # workflow calls defined to call tasks + for task_name in self.workflow.workflow_tasks: # workflow tasks defined to call tools - call_input = self.workflow.workflow_calls[call_name].get('input') + call_input = self.workflow.workflow_tasks[task_name].get('input') - called_task = self.workflow.workflow_calls[call_name].get('task') - depends_on = self.workflow.workflow_calls[call_name].get('depends_on') + called_tool = self.workflow.workflow_tasks[task_name].get('tool') + depends_on = self.workflow.workflow_tasks[task_name].get('depends_on') task_dict = { - 'call': call_name, + 'task': task_name, 'input': {}, - 'task': called_task, + 'tool': called_tool, 'depends_on': depends_on, - 'command': self.workflow.workflow_dict.get('tasks').get(called_task).get('command'), - 'runtime': self.workflow.workflow_dict.get('tasks').get(called_task).get('runtime') + 'command': self.workflow.workflow_dict.get('tools').get(called_tool).get('command'), + 'runtime': self.workflow.workflow_dict.get('tools').get(called_tool).get('runtime') } # need to find out whether this is a task with scatter call - scatter_setting = self.workflow.workflow_calls[call_name].get('scatter') + scatter_setting = self.workflow.workflow_tasks[task_name].get('scatter') if scatter_setting: # TODO: if bad things happen here due to error in workflow definition or input Job JSON # we will need to be able to move the relevant Job JSON to failed folder - scatter_call_name = scatter_setting.get('name') + scatter_task_name = scatter_setting.get('name') input_variable = scatter_setting.get('input').keys()[0] # must always have one key (at least for now), will do error checking later task_suffix_field = scatter_setting.get('input').get(input_variable).get('task_suffix') if not task_suffix_field: task_suffix_field = input_variable @@ -189,10 +189,14 @@ def next_job(self, worker=None): task_suffix = re.sub('[^0-9a-zA-Z]+', '_', task_suffix) # need to avoid special character task_suffix_set.add(task_suffix) - task_folder = os.path.join(t_path, 'task.%s.%s' % (call_name, task_suffix)) # one of the scattered tasks + task_folder = os.path.join(t_path, 'task.%s.%s' % (task_name, task_suffix)) # one of the scattered tasks os.makedirs(task_folder) # create the task folder original_depends_on = task_dict['depends_on'] + if original_depends_on and not isinstance(original_depends_on, list): + # TODO: move the job to failed folder with clear error message that depends_on setting invalid + pass + if original_depends_on: updated_depends_on = [] for parent_call in original_depends_on: @@ -215,7 +219,7 @@ def next_job(self, worker=None): task_dict['input'][i] = value - with open(os.path.join(task_folder, 'task.%s.%s.json' % (call_name, task_suffix)), 'w') as f: + with open(os.path.join(task_folder, 'task.%s.%s.json' % (task_name, task_suffix)), 'w') as f: f.write(json.dumps(task_dict, indent=2)) # reset for the next iteration @@ -228,7 +232,7 @@ def next_job(self, worker=None): pass else: - task_folder = os.path.join(t_path, 'task.%s' % call_name) + task_folder = os.path.join(t_path, 'task.%s' % task_name) os.makedirs(task_folder) # create the task folder for i in call_input: @@ -239,7 +243,7 @@ def next_job(self, worker=None): task_dict['input'][i] = value - with open(os.path.join(task_folder, 'task.%s.json' % call_name), 'w') as f: + with open(os.path.join(task_folder, 'task.%s.json' % task_name), 'w') as f: f.write(json.dumps(task_dict, indent=2)) self._git_cmd(['add', self.gitracker_home]) # stage the change diff --git a/jtracker/worker.py b/jtracker/worker.py index abd2fe9..adab51d 100644 --- a/jtracker/worker.py +++ b/jtracker/worker.py @@ -98,7 +98,7 @@ def run(self, retry=None): # if current task not exists, return False if not self.task: return False - cmd = "PATH=%s:$PATH %s" % (os.path.join(self.jtracker.workflow_home, 'tasks'), self.task.task_dict.get('command')) + cmd = "PATH=%s:$PATH %s" % (os.path.join(self.jtracker.workflow_home, 'tools'), self.task.task_dict.get('command')) arg = "'%s'" % json.dumps(self.task.task_dict) if self.task.task_dict else '' try: diff --git a/jtracker/workflow.py b/jtracker/workflow.py index 83bb471..c581922 100644 --- a/jtracker/workflow.py +++ b/jtracker/workflow.py @@ -9,10 +9,10 @@ def __init__(self, workflow_yaml_file=None): self._name = self.workflow_dict.get('workflow').get('name') self._version = self.workflow_dict.get('workflow').get('version') - self._get_workflow_calls() - #print json.dumps(self.workflow_calls) # debug + self._get_workflow_tasks() + #print json.dumps(self.workflow_tasks) # debug - self._add_default_runtime_to_tasks() + self._add_default_runtime_to_tools() @property @@ -31,68 +31,68 @@ def workflow_dict(self): @property - def workflow_calls(self): - return self._workflow_calls + def workflow_tasks(self): + return self._workflow_tasks - def _get_workflow_calls(self): - calls = self.workflow_dict.get('workflow', {}).get('calls', {}) + def _get_workflow_tasks(self): + tasks = self.workflow_dict.get('workflow', {}).get('tasks', {}) - # converting sub_calls under scatter calls to top level, this way - # it's easier to just handle flattened one level calls, at runtime - # sub_calls will be instantiated with multiple parallel calls with + # converting sub_tasks under scatter tasks to top level, this way + # it's easier to just handle flattened one level tasks, at runtime + # sub_tasks will be instantiated with multiple parallel tasks with # previously defined interations (ie, field defined in 'with_items') - scatter_calls = [] - sub_calls = {} - for c in calls: - if '.' in c or '@' in c: - print "Workflow definition error: call name canot contain '.' or '@', offending name: '%s'" % c + scatter_tasks = [] + sub_tasks = {} + for t in tasks: + if '.' in t or '@' in t: + print "Workflow definition error: task name canot contain '.' or '@', offending name: '%s'" % t raise - if calls[c].get('scatter'): # this is a scatter call - scatter_input = calls[c].get('scatter', {}).get('input', {}) + if tasks[t].get('scatter'): # this is a scatter task + scatter_input = tasks[t].get('scatter', {}).get('input', {}) # quick way to verify the syntax is correct, more thorough validation is needed - # it's possible to support two level of nested scatter calls, + # it's possible to support two level of nested scatter tasks, # for now one level only if not len(scatter_input) == 1 and 'with_items' in scatter_input: - print "Workflow definition error: invalid scatter call definition in '%s'" % c + print "Workflow definition error: invalid scatter task definition in '%s'" % t raise # better exception handle is needed - scatter_calls.append(c) - # expose sub calls in scatter call to top level - for sc in calls[c].get('calls', {}): - if '.' in c or '@' in sc: - print "Workflow definition error: call name canot contain '.' or '@', offending name: '%s'" % sc + scatter_tasks.append(t) + # expose sub tasks in scatter task to top level + for st in tasks[t].get('tasks', {}): + if '.' in t or '@' in st: + print "Workflow definition error: task name canot contain '.' or '@', offending name: '%s'" % st raise - if sub_calls.get(sc): - print "Workflow definition error: call name duplication detected '%s'" % sc + if sub_tasks.get(st): + print "Workflow definition error: task name duplication detected '%s'" % st raise - sub_calls[sc] = calls[c]['calls'][sc] - sub_calls[sc]['scatter'] = calls[c]['scatter'] # assign scatter definition to under each sub call - sub_calls[sc]['scatter']['name'] = c # add name of the scatter call here + sub_tasks[st] = tasks[t]['tasks'][st] + sub_tasks[st]['scatter'] = tasks[t]['scatter'] # assign scatter definition to under each sub task + sub_tasks[st]['scatter']['name'] = t # add name of the scatter task here - # now delete the top level scatter calls - for sc in scatter_calls: - calls.pop(sc) + # now delete the top level scatter tasks + for st in scatter_tasks: + tasks.pop(st) - # merge sub_calls into top level calls - duplicated_calls = set(calls).intersection(set(sub_calls)) - if duplicated_calls: - print "Workflow definition error: call name duplication detected '%s'" % ', '.join(duplicated_calls) + # merge sub_tasks into top level tasks + duplicated_tasks = set(tasks).intersection(set(sub_tasks)) + if duplicated_tasks: + print "Workflow definition error: task name duplication detected '%s'" % ', '.join(duplicated_tasks) raise - calls.update(sub_calls) + tasks.update(sub_tasks) - for c in calls: - task_called = calls[c].get('task') - if not task_called: - calls[c]['task'] = c + for t in tasks: + tool_tasked = tasks[t].get('tool') + if not tool_tasked: + tasks[t]['tool'] = t - self._workflow_calls = calls + self._workflow_tasks = tasks - def _add_default_runtime_to_tasks(self): - for t in self.workflow_dict.get('tasks', {}): - if not 'runtime' in t: # no runtime defined in the task, add the default one - self.workflow_dict['tasks'][t]['runtime'] = self.workflow_dict.get('workflow', {}).get('runtime') + def _add_default_runtime_to_tools(self): + for t in self.workflow_dict.get('tools', {}): + if not 'runtime' in t: # no runtime defined in the tool, add the default one + self.workflow_dict['tools'][t]['runtime'] = self.workflow_dict.get('workflow', {}).get('runtime') From 587222bbd9ea7399f2749c81a3436cc2e297615e Mon Sep 17 00:00:00 2001 From: Junjun Zhang Date: Sat, 6 May 2017 12:11:26 -0400 Subject: [PATCH 2/4] output from parent task is now copied over to child task with needed fields --- jtracker/gitracker.py | 88 +++++++++++++++++++++++++++++++++++++++++-- jtracker/workflow.py | 27 ++++++++++++- 2 files changed, 110 insertions(+), 5 deletions(-) diff --git a/jtracker/gitracker.py b/jtracker/gitracker.py index 0000519..2472307 100644 --- a/jtracker/gitracker.py +++ b/jtracker/gitracker.py @@ -268,7 +268,7 @@ def next_task(self, worker=None, jtracker=None, timeout=None): with open(task_json, 'r') as f: task_dict = json.loads(f.read()) - if not self._check_task_dependency(worker=worker, task_dict=task_dict, job_id=job_id): continue + if not self._check_task_dependency(worker=worker, task_json=task_json, task_dict=task_dict, job_id=job_id): continue running_worker_path = os.path.join(root.replace(TASK_STATE.QUEUED, TASK_STATE.RUNNING), worker.worker_id) if not os.path.isdir(running_worker_path): os.makedirs(running_worker_path) @@ -371,7 +371,7 @@ def _git_cmd(self, cmds=[]): os.chdir(origWD) - def _check_task_dependency(self, worker=None, task_dict=None, job_id=None): + def _check_task_dependency(self, worker=None, task_json=None, task_dict=None, job_id=None): worker_id = worker.worker_id host_id = worker.host_id @@ -392,8 +392,32 @@ def _check_task_dependency(self, worker=None, task_dict=None, job_id=None): # we will need better implementation, consider covering 'failed' parent task as well # right now it only check whether parent task state is COMPLETED + parent_task_to_file = {} for d in depends_on: parent_state, parent_task, execution_constraint = (d.split('@') + [None] * 3)[:3] + + # check queued tasks, if parent task is still queued then return False, ie, current task not ready to run + path_to_parent_task1_in_queue = os.path.join( + self.gitracker_home, + JOB_STATE.RUNNING, + job_id, + TASK_STATE.QUEUED, + 'task.%s' % parent_task, + 'task.%s.json' % parent_task + ) + path_to_parent_task2_in_queue = os.path.join( + self.gitracker_home, + JOB_STATE.RUNNING, + job_id, + TASK_STATE.QUEUED, + 'task.%s.*' % parent_task, # scatter task + 'task.%s.*.json' % parent_task + ) + if glob.glob(path_to_parent_task1_in_queue) + glob.glob(path_to_parent_task2_in_queue): + print glob.glob(path_to_parent_task2_in_queue) + return False + + # non scatter task path_to_parent_task_file1 = os.path.join( self.gitracker_home, JOB_STATE.RUNNING, @@ -404,6 +428,7 @@ def _check_task_dependency(self, worker=None, task_dict=None, job_id=None): 'task.%s.json' % parent_task ) + # scatter task path_to_parent_task_file2 = os.path.join( self.gitracker_home, JOB_STATE.RUNNING, @@ -415,16 +440,71 @@ def _check_task_dependency(self, worker=None, task_dict=None, job_id=None): ) parent_task_files = glob.glob(path_to_parent_task_file1) + glob.glob(path_to_parent_task_file2) - parent_task_states = set() + parent_task_states = set([]) for pt in parent_task_files: parent_task_states.add(str(pt).split(os.path.sep)[-4]) - if not parent_task_states or parent_task_states - set([TASK_STATE.COMPLETED]): # there are other state than 'completed' + parent_task_name = re.sub(r'^task\.', '', str(pt).split(os.path.sep)[-2]) + parent_task_to_file[parent_task_name] = str(pt) + + if len(parent_task_name.split('.')) == 2: # support one level scatter call only + parent_root_task_name = parent_task_name.split('.')[0] + if not parent_task_to_file.get(parent_root_task_name): + parent_task_to_file[parent_root_task_name] = [] + + parent_task_to_file[parent_root_task_name].append(str(pt)) + + # if no parent_task_state indicates parent_task is still in the queue + # when there are other states than 'completed', parent_task has run but not in completed state + if not parent_task_states or parent_task_states - set([TASK_STATE.COMPLETED]): return False + # retrieve output from parent task and add it to the 'input' of the current task + task_json_rewrite = False + for i in task_dict.get('input'): + if '@' in str(task_dict.get('input').get(i)) \ + and task_dict.get('input').get(i).startswith('{{') \ + and task_dict.get('input').get(i).endswith('}}'): + input_ref = task_dict.get('input').get(i) + input_ref = re.sub(r'^\{\{', '', input_ref) + input_ref = re.sub(r'\}\}$', '', input_ref) + + parent_output_field, parent_task = input_ref.split('@') + + task_dict['input'][i] = self._get_task_output(parent_task_to_file.get(parent_task), parent_output_field) + + task_json_rewrite = True + + if task_json_rewrite: + with open(task_json, 'w') as f: + f.write(json.dumps(task_dict)) + return True + def _get_task_output(self, task_json_file, output_field): + if isinstance(task_json_file, str): + with open(task_json_file, 'r') as f: + task_dict = json.loads(f.read()) + + if not task_dict.get('output'): + return + else: + return task_dict.get('output')[-1].get(output_field) + + ret = [] + for tjf in task_json_file: + with open(tjf, 'r') as f: + task_dict = json.loads(f.read()) + + if not task_dict.get('output'): + ret.append(None) + else: + ret.append(task_dict.get('output')[-1].get(output_field)) + + return ret + + def _sync_local_git_with_server(self): self._git_cmd(["checkout", "-q", "master"]) self._git_cmd(["reset", "--hard", "origin/master"]) diff --git a/jtracker/workflow.py b/jtracker/workflow.py index c581922..faac528 100644 --- a/jtracker/workflow.py +++ b/jtracker/workflow.py @@ -10,9 +10,10 @@ def __init__(self, workflow_yaml_file=None): self._version = self.workflow_dict.get('workflow').get('version') self._get_workflow_tasks() - #print json.dumps(self.workflow_tasks) # debug self._add_default_runtime_to_tools() + self._update_dependency() + #print json.dumps(self.workflow_tasks, indent=2) # debug @property @@ -96,3 +97,27 @@ def _add_default_runtime_to_tools(self): for t in self.workflow_dict.get('tools', {}): if not 'runtime' in t: # no runtime defined in the tool, add the default one self.workflow_dict['tools'][t]['runtime'] = self.workflow_dict.get('workflow', {}).get('runtime') + + + def _update_dependency(self): + for task in self.workflow_tasks: + input_tasks = set([]) + + for input_key in self.workflow_tasks.get(task).get('input'): + input_ = self.workflow_tasks.get(task).get('input').get(input_key) + if len(input_.split('@')) == 2: + input_tasks.add('completed@%s' % input_.split('@')[1]) + + existing_dependency = set([]) + if self.workflow_tasks.get(task).get('depends_on'): + for parent_task in self.workflow_tasks.get(task).get('depends_on', []): + existing_dependency.add('@'.join(parent_task.split('@')[:2])) + + dependency_to_add = input_tasks - existing_dependency + + if dependency_to_add: + if self.workflow_tasks.get(task).get('depends_on'): + self.workflow_tasks.get(task)['depends_on'] += list(dependency_to_add) + else: + self.workflow_tasks.get(task)['depends_on'] = list(dependency_to_add) + From a275a05f5fe324411b268bdb4c733aca1fc7fd96 Mon Sep 17 00:00:00 2001 From: Junjun Zhang Date: Sat, 6 May 2017 13:32:56 -0400 Subject: [PATCH 3/4] now properly support "same_host" / "same_worker" execution constraints --- jtracker/gitracker.py | 44 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/jtracker/gitracker.py b/jtracker/gitracker.py index 2472307..10c7888 100644 --- a/jtracker/gitracker.py +++ b/jtracker/gitracker.py @@ -375,20 +375,53 @@ def _check_task_dependency(self, worker=None, task_json=None, task_dict=None, jo worker_id = worker.worker_id host_id = worker.host_id - depends_on = task_dict.get('depends_on') - if not depends_on: return True - # constraint can be 'same_host', 'same_worker', 'shared_fs' or None constraint = self.workflow.workflow_dict.get('workflow',{}).get('execution',{}).get('constraint') + depends_on = task_dict.get('depends_on') + if not depends_on: + # if there is no task running for this job, and this task does not have depends_on + # then start it. + started_tasks = [] + for t_state in (TASK_STATE.RUNNING, TASK_STATE.COMPLETED, TASK_STATE.FAILED): + path_to_task = os.path.join( + self.gitracker_home, + JOB_STATE.RUNNING, + job_id, + t_state, + 'worker.*', + 'task.*', + 'task.*.json' + ) + started_tasks += glob.glob(path_to_task) + + if not started_tasks: + return True # this is the first task to run + else: + one_started_worker_id = started_tasks[0].split(os.path.sep)[-3] + if constraint == 'same_worker': + if worker_id == one_started_worker_id: + return True + else: + return False + elif constraint == 'same_host': + if worker_id.split('.host.')[-1] == one_started_worker_id.split('.host.')[-1]: + return True # this worker is on the same host + else: + return False + elif constraint == 'shared_fs': + # need more thoughts here, for now 'shared_fs' means no constraint + # all worker on all host can access the same shared file system + return True + else: + return False # constraint must set, otherwise won't run + worker_id_pattern = 'worker.*' - """ ## to be implemented properly, for now every worker can run a task if constraint and constraint == 'same_worker': worker_id_pattern = worker_id elif constraint and constraint == 'same_host': # more work is needed to handle tasks do not have dependencies, they will run without checking host worker_id_pattern = 'worker.*.host.%s.*' % host_id - """ # we will need better implementation, consider covering 'failed' parent task as well # right now it only check whether parent task state is COMPLETED @@ -414,7 +447,6 @@ def _check_task_dependency(self, worker=None, task_json=None, task_dict=None, jo 'task.%s.*.json' % parent_task ) if glob.glob(path_to_parent_task1_in_queue) + glob.glob(path_to_parent_task2_in_queue): - print glob.glob(path_to_parent_task2_in_queue) return False # non scatter task From f318fb4cb010843704bf80aeb17207cd9b0d742f Mon Sep 17 00:00:00 2001 From: Junjun Zhang Date: Sat, 6 May 2017 15:26:04 -0400 Subject: [PATCH 4/4] bump to 0.1.0rc3 --- README.md | 12 +++++++----- jtracker/__init__.py | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 32e81b0..22e487c 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ git clone git@github.com:icgc-dcc/jtracker.git cd jtracker pipsi install . -# or install from PyPI for latest release +# or install from PyPI for latest release (no need to clone the source) pipsi install jtracker # run it @@ -29,16 +29,18 @@ jt ## Design and create your workflow -Follow this example to develop your workflow: https://github.com/icgc-dcc/ega-file-transfer-to-collab +Follow this example to develop your workflow: https://github.com/icgc-dcc/ega-file-transfer-to-collab-jt Quick note on workflow development and testing: + 1. You can have workflow definition and workflow execution in one single Git repo, although separating them in dedicated Git repos is recommended for production use. Here is an example: https://github.com/icgc-dcc/jtracker-example-workflows/tree/master/test.0.0.0.jtracker, note that job states folders and worflow folder are in one repo. + 2. It is recommended to use a local Git server for development and testing. Follow this instruction to set up a local git server on a mac: http://www.tomdalling.com/blog/software-processes/how-to-set-up-a-secure-git-server-at-home-osx/. Once set up, you can access it same way as you access github. In my case, `git clone ssh://junjun@localhost:/Users/junjun/mygit/jtracker-demo-workflows.git` -## Create a Git repository to manage workflow execution +## Create a Git repository to manage and track workflow task execution -Here is an example: https://github.com/icgc-dcc/jtracker-example-workflows/tree/master/test.0.1.0.jtracker +Here is an example: https://github.com/icgc-dcc/jtracker-example-workflows/tree/master/ega-file-transfer-to-collab.0.4.0.jtracker At this time, you will need to set up this Git repository on your own manually. In the near future, 'jt' cli tool will be able to set it up automatically for you. @@ -48,7 +50,7 @@ At this time, you will need to set up this Git repository on your own manually. On a task execution host, you can start a worker as follow assuming workflow definition and job json files exist as specified. ``` -jt -g 'git@github.com:icgc-dcc/jtracker-example-workflows' -w test -r 0.1.0 worker +jt -g 'git@github.com:icgc-dcc/jtracker-example-workflows' -w ega-file-transfer-to-collab -r 0.4.0 worker ``` You can start multiple workers on the same host if there is enough computating resource. You can also start workers in different hosts at the same time. Workflow jobs/tasks will be picked up by individual workers as needed. diff --git a/jtracker/__init__.py b/jtracker/__init__.py index a9c1b3d..a8ff662 100644 --- a/jtracker/__init__.py +++ b/jtracker/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.1.0rc2' +__version__ = '0.1.0rc3' from .worker import Worker from .jtracker import JTracker \ No newline at end of file