Skip to content

Commit

Permalink
Enforce max files per job
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew-McNab-UK committed Nov 29, 2024
1 parent 48c5eb9 commit a6e844a
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 10 deletions.
12 changes: 6 additions & 6 deletions agents/justin-finder
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ def findStalledAbortedJobs():
# Find jobs which have
# (1) not sent a heartbeat in jobStalledSeconds, or
# (2) which have reported they have aborted or that outputting failed
# and still have has_allocations set
# and still have allocated_files
# and then reset their file allocations
#
# We handle bad waiting jobs in findStalledCondorJobs()
Expand All @@ -1263,7 +1263,7 @@ def findStalledAbortedJobs():
'(heartbeat_time < DATE_SUB(NOW(),INTERVAL %d SECOND))) OR '
'((job_state="aborted" OR '
' job_state="jobscript_error" OR '
' job_state="outputting_failed") AND has_allocations) '
' job_state="outputting_failed") AND allocated_files) '
'ORDER BY justin_job_id' % justin.jobStallSeconds)

justin.cur.execute(query)
Expand All @@ -1284,7 +1284,7 @@ def findStalledAbortedJobs():
# If not here due to an abort, we mark the job as stalled
try:
query = ('UPDATE jobs SET job_state="stalled",finished_time=NOW(),'
'has_allocations=FALSE '
'allocated_files=0 '
'WHERE justin_job_id=%d' % jobRow['justin_job_id'])

justin.cur.execute(query)
Expand All @@ -1303,14 +1303,14 @@ def findStalledAbortedJobs():
siteID = jobRow['site_id'],
entryID = jobRow['entry_id'])
else:
# An aborted, error, failed state, so just reset has_allocations
# An aborted, error, failed state, so just reset allocated_files
try:
query = ('UPDATE jobs SET has_allocations=FALSE '
query = ('UPDATE jobs SET allocated_files=0 '
'WHERE justin_job_id=%d' % jobRow['justin_job_id'])

justin.cur.execute(query)
except Exception as e:
justin.logLine('Failed to set job %s to has_allocations=FALSE: %s'
justin.logLine('Failed to set job %s to allocated_files=0: %s'
% (jobRow['jobsub_id'], str(e)))
continue

Expand Down
4 changes: 2 additions & 2 deletions database/justindb-create-tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ CREATE TABLE IF NOT EXISTS `jobs` (
'finished','notused','aborted','stalled','jobscript_error',
'outputting_failed', 'none_processed')
NOT NULL DEFAULT 'submitted',
`has_allocations` tinyint(1) NOT NULL DEFAULT 0,
`allocated_files` tinyint(1) unsigned NOT NULL DEFAULT 0,
`sent_get_file` tinyint(1) NOT NULL DEFAULT 0,
`allocator_name` varchar(255) NOT NULL DEFAULT '',
`allocation_error` varchar(255) NOT NULL DEFAULT '',
Expand Down Expand Up @@ -92,7 +92,7 @@ CREATE TABLE IF NOT EXISTS `jobs` (
INDEX `job_state_site_id` (`job_state`,`site_id`,
`submitted_time`),
INDEX `workflow_stage_allocation` (`workflow_id`,`stage_id`,`job_state`),
INDEX `has_allocations` (`has_allocations`,`job_state`),
INDEX `allocated_files` (`allocated_files`,`job_state`),
INDEX `heartbeat_time` (`job_state`,`heartbeat_time`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Expand Down
1 change: 1 addition & 0 deletions modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
+ jobStatesTerminal

jobStallSeconds = 3660
maxFilesPerJob = 20

filesPerNumberedDestination = 1000

Expand Down
13 changes: 11 additions & 2 deletions services/justin-wsgi-allocator
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ def makeJobDict(jsonDict, jobscriptSecret = None):
'jobs.requested_wall_seconds,'
'jobs.has_inner_apptainer,'
'jobs.jobscript_exit,'
'jobs.allocated_files,'
'sites.site_name,'
'sites.running_jobs,'
'sites.enabled,'
Expand Down Expand Up @@ -361,6 +362,7 @@ def makeJobDict(jsonDict, jobscriptSecret = None):
"requested_wall_seconds" : job['requested_wall_seconds'],
"has_inner_apptainer" : job['has_inner_apptainer'],
"jobscript_exit" : job['jobscript_exit'],
"allocated_files" : job['allocated_files'],
"scope_name" : job['scope_name']
if job['scope_name'] else '',
"wlcg_group_name" : job['wlcg_group_name']
Expand Down Expand Up @@ -1302,6 +1304,12 @@ def getFileMethod(startResponse, jsonDict):
'Job in wrong state to find file (%s)' %
jobDict['job_state'])

if jobDict['allocated_files'] >= justin.maxFilesPerJob:
return httpError(startResponse,
'404 Not Found',
'Only %d files can be allocated to each job'
% justin.maxFilesPerJob)

# Create a stage dictionary with the next file in this stage
oneFile = findBestFile(jobDict)

Expand All @@ -1314,7 +1322,8 @@ def getFileMethod(startResponse, jsonDict):
try:
# Update heartbeat_time and make sure job is set to processing now
justin.insertUpdate('UPDATE jobs SET job_state="processing",'
'heartbeat_time=NOW(),has_allocations=TRUE,'
'heartbeat_time=NOW(),'
'allocated_files=allocated_files+1,'
'sent_get_file=TRUE '
'WHERE jobsub_id="' + jsonDict['jobsub_id'] + '"'
)
Expand Down Expand Up @@ -1358,7 +1367,7 @@ def getFileMethod(startResponse, jsonDict):
'Unable to update job to sent_get_file=TRUE: ' + str(e))

# Use httpOK() since this does not do a rollback
return httpOK(startResponse,
return httpOK(startResponse,
'No eligible file found',
codeStr = '404 Not Found')

Expand Down

0 comments on commit a6e844a

Please sign in to comment.