diff --git a/jtracker/gitracker.py b/jtracker/gitracker.py index 2472307..10c7888 100644 --- a/jtracker/gitracker.py +++ b/jtracker/gitracker.py @@ -375,20 +375,53 @@ def _check_task_dependency(self, worker=None, task_json=None, task_dict=None, jo worker_id = worker.worker_id host_id = worker.host_id - depends_on = task_dict.get('depends_on') - if not depends_on: return True - # constraint can be 'same_host', 'same_worker', 'shared_fs' or None constraint = self.workflow.workflow_dict.get('workflow',{}).get('execution',{}).get('constraint') + depends_on = task_dict.get('depends_on') + if not depends_on: + # if there is no task running for this job, and this task does not have depends_on + # then start it. + started_tasks = [] + for t_state in (TASK_STATE.RUNNING, TASK_STATE.COMPLETED, TASK_STATE.FAILED): + path_to_task = os.path.join( + self.gitracker_home, + JOB_STATE.RUNNING, + job_id, + t_state, + 'worker.*', + 'task.*', + 'task.*.json' + ) + started_tasks += glob.glob(path_to_task) + + if not started_tasks: + return True # this is the first task to run + else: + one_started_worker_id = started_tasks[0].split(os.path.sep)[-3] + if constraint == 'same_worker': + if worker_id == one_started_worker_id: + return True + else: + return False + elif constraint == 'same_host': + if worker_id.split('.host.')[-1] == one_started_worker_id.split('.host.')[-1]: + return True # this worker is on the same host + else: + return False + elif constraint == 'shared_fs': + # need more thoughts here, for now 'shared_fs' means no constraint + # all worker on all host can access the same shared file system + return True + else: + return False # constraint must set, otherwise won't run + worker_id_pattern = 'worker.*' - """ ## to be implemented properly, for now every worker can run a task if constraint and constraint == 'same_worker': worker_id_pattern = worker_id elif constraint and constraint == 'same_host': # more work is needed to handle tasks do not have dependencies, they will run without checking host worker_id_pattern = 'worker.*.host.%s.*' % host_id - """ # we will need better implementation, consider covering 'failed' parent task as well # right now it only check whether parent task state is COMPLETED @@ -414,7 +447,6 @@ def _check_task_dependency(self, worker=None, task_json=None, task_dict=None, jo 'task.%s.*.json' % parent_task ) if glob.glob(path_to_parent_task1_in_queue) + glob.glob(path_to_parent_task2_in_queue): - print glob.glob(path_to_parent_task2_in_queue) return False # non scatter task