From a6e844a78c1fb0cd21d23b916e2593f778d9bcb8 Mon Sep 17 00:00:00 2001 From: Andrew McNab Date: Fri, 29 Nov 2024 17:13:15 +0000 Subject: [PATCH] Enforce max files per job --- agents/justin-finder | 12 ++++++------ database/justindb-create-tables.sql | 4 ++-- modules/__init__.py | 1 + services/justin-wsgi-allocator | 13 +++++++++++-- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/agents/justin-finder b/agents/justin-finder index 9669e53..97816e0 100755 --- a/agents/justin-finder +++ b/agents/justin-finder @@ -1246,7 +1246,7 @@ def findStalledAbortedJobs(): # Find jobs which have # (1) not sent a heartbeat in jobStalledSeconds, or # (2) which have reported they have aborted or that outputting failed - # and still have has_allocations set + # and still have allocated_files # and then reset their file allocations # # We handle bad waiting jobs in findStalledCondorJobs() @@ -1263,7 +1263,7 @@ def findStalledAbortedJobs(): '(heartbeat_time < DATE_SUB(NOW(),INTERVAL %d SECOND))) OR ' '((job_state="aborted" OR ' ' job_state="jobscript_error" OR ' - ' job_state="outputting_failed") AND has_allocations) ' + ' job_state="outputting_failed") AND allocated_files) ' 'ORDER BY justin_job_id' % justin.jobStallSeconds) justin.cur.execute(query) @@ -1284,7 +1284,7 @@ def findStalledAbortedJobs(): # If not here due to an abort, we mark the job as stalled try: query = ('UPDATE jobs SET job_state="stalled",finished_time=NOW(),' - 'has_allocations=FALSE ' + 'allocated_files=0 ' 'WHERE justin_job_id=%d' % jobRow['justin_job_id']) justin.cur.execute(query) @@ -1303,14 +1303,14 @@ def findStalledAbortedJobs(): siteID = jobRow['site_id'], entryID = jobRow['entry_id']) else: - # An aborted, error, failed state, so just reset has_allocations + # An aborted, error, failed state, so just reset allocated_files try: - query = ('UPDATE jobs SET has_allocations=FALSE ' + query = ('UPDATE jobs SET allocated_files=0 ' 'WHERE justin_job_id=%d' % jobRow['justin_job_id']) justin.cur.execute(query) except Exception as e: - justin.logLine('Failed to set job %s to has_allocations=FALSE: %s' + justin.logLine('Failed to set job %s to allocated_files=0: %s' % (jobRow['jobsub_id'], str(e))) continue diff --git a/database/justindb-create-tables.sql b/database/justindb-create-tables.sql index e7c293b..72d683e 100644 --- a/database/justindb-create-tables.sql +++ b/database/justindb-create-tables.sql @@ -56,7 +56,7 @@ CREATE TABLE IF NOT EXISTS `jobs` ( 'finished','notused','aborted','stalled','jobscript_error', 'outputting_failed', 'none_processed') NOT NULL DEFAULT 'submitted', - `has_allocations` tinyint(1) NOT NULL DEFAULT 0, + `allocated_files` tinyint(1) unsigned NOT NULL DEFAULT 0, `sent_get_file` tinyint(1) NOT NULL DEFAULT 0, `allocator_name` varchar(255) NOT NULL DEFAULT '', `allocation_error` varchar(255) NOT NULL DEFAULT '', @@ -92,7 +92,7 @@ CREATE TABLE IF NOT EXISTS `jobs` ( INDEX `job_state_site_id` (`job_state`,`site_id`, `submitted_time`), INDEX `workflow_stage_allocation` (`workflow_id`,`stage_id`,`job_state`), - INDEX `has_allocations` (`has_allocations`,`job_state`), + INDEX `allocated_files` (`allocated_files`,`job_state`), INDEX `heartbeat_time` (`job_state`,`heartbeat_time`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; diff --git a/modules/__init__.py b/modules/__init__.py index 5b88f3b..f02b0d5 100644 --- a/modules/__init__.py +++ b/modules/__init__.py @@ -127,6 +127,7 @@ + jobStatesTerminal jobStallSeconds = 3660 +maxFilesPerJob = 20 filesPerNumberedDestination = 1000 diff --git a/services/justin-wsgi-allocator b/services/justin-wsgi-allocator index 1e7f9cf..8c1915e 100755 --- a/services/justin-wsgi-allocator +++ b/services/justin-wsgi-allocator @@ -302,6 +302,7 @@ def makeJobDict(jsonDict, jobscriptSecret = None): 'jobs.requested_wall_seconds,' 'jobs.has_inner_apptainer,' 'jobs.jobscript_exit,' + 'jobs.allocated_files,' 'sites.site_name,' 'sites.running_jobs,' 'sites.enabled,' @@ -361,6 +362,7 @@ def makeJobDict(jsonDict, jobscriptSecret = None): "requested_wall_seconds" : job['requested_wall_seconds'], "has_inner_apptainer" : job['has_inner_apptainer'], "jobscript_exit" : job['jobscript_exit'], + "allocated_files" : job['allocated_files'], "scope_name" : job['scope_name'] if job['scope_name'] else '', "wlcg_group_name" : job['wlcg_group_name'] @@ -1302,6 +1304,12 @@ def getFileMethod(startResponse, jsonDict): 'Job in wrong state to find file (%s)' % jobDict['job_state']) + if jobDict['allocated_files'] >= justin.maxFilesPerJob: + return httpError(startResponse, + '404 Not Found', + 'Only %d files can be allocated to each job' + % justin.maxFilesPerJob) + # Create a stage dictionary with the next file in this stage oneFile = findBestFile(jobDict) @@ -1314,7 +1322,8 @@ def getFileMethod(startResponse, jsonDict): try: # Update heartbeat_time and make sure job is set to processing now justin.insertUpdate('UPDATE jobs SET job_state="processing",' - 'heartbeat_time=NOW(),has_allocations=TRUE,' + 'heartbeat_time=NOW(),' + 'allocated_files=allocated_files+1,' 'sent_get_file=TRUE ' 'WHERE jobsub_id="' + jsonDict['jobsub_id'] + '"' ) @@ -1358,7 +1367,7 @@ def getFileMethod(startResponse, jsonDict): 'Unable to update job to sent_get_file=TRUE: ' + str(e)) # Use httpOK() since this does not do a rollback - return httpOK(startResponse, + return httpOK(startResponse, 'No eligible file found', codeStr = '404 Not Found')