Skip to content

Commit

Permalink
now properly support "same_host" / "same_worker" execution constraints
Browse files Browse the repository at this point in the history
  • Loading branch information
junjun-zhang committed May 6, 2017
1 parent 587222b commit a275a05
Showing 1 changed file with 38 additions and 6 deletions.
44 changes: 38 additions & 6 deletions jtracker/gitracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,20 +375,53 @@ def _check_task_dependency(self, worker=None, task_json=None, task_dict=None, jo
worker_id = worker.worker_id
host_id = worker.host_id

depends_on = task_dict.get('depends_on')
if not depends_on: return True

# constraint can be 'same_host', 'same_worker', 'shared_fs' or None
constraint = self.workflow.workflow_dict.get('workflow',{}).get('execution',{}).get('constraint')

depends_on = task_dict.get('depends_on')
if not depends_on:
# if there is no task running for this job, and this task does not have depends_on
# then start it.
started_tasks = []
for t_state in (TASK_STATE.RUNNING, TASK_STATE.COMPLETED, TASK_STATE.FAILED):
path_to_task = os.path.join(
self.gitracker_home,
JOB_STATE.RUNNING,
job_id,
t_state,
'worker.*',
'task.*',
'task.*.json'
)
started_tasks += glob.glob(path_to_task)

if not started_tasks:
return True # this is the first task to run
else:
one_started_worker_id = started_tasks[0].split(os.path.sep)[-3]
if constraint == 'same_worker':
if worker_id == one_started_worker_id:
return True
else:
return False
elif constraint == 'same_host':
if worker_id.split('.host.')[-1] == one_started_worker_id.split('.host.')[-1]:
return True # this worker is on the same host
else:
return False
elif constraint == 'shared_fs':
# need more thoughts here, for now 'shared_fs' means no constraint
# all worker on all host can access the same shared file system
return True
else:
return False # constraint must set, otherwise won't run

worker_id_pattern = 'worker.*'
"""
## to be implemented properly, for now every worker can run a task
if constraint and constraint == 'same_worker':
worker_id_pattern = worker_id
elif constraint and constraint == 'same_host': # more work is needed to handle tasks do not have dependencies, they will run without checking host
worker_id_pattern = 'worker.*.host.%s.*' % host_id
"""

# we will need better implementation, consider covering 'failed' parent task as well
# right now it only check whether parent task state is COMPLETED
Expand All @@ -414,7 +447,6 @@ def _check_task_dependency(self, worker=None, task_json=None, task_dict=None, jo
'task.%s.*.json' % parent_task
)
if glob.glob(path_to_parent_task1_in_queue) + glob.glob(path_to_parent_task2_in_queue):
print glob.glob(path_to_parent_task2_in_queue)
return False

# non scatter task
Expand Down

0 comments on commit a275a05

Please sign in to comment.