Skip to content

Commit

Permalink
Import
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew-McNab-UK committed Oct 4, 2024
1 parent 4d36abf commit 4661892
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 122 deletions.
118 changes: 60 additions & 58 deletions agents/justin-finder
Original file line number Diff line number Diff line change
Expand Up @@ -771,64 +771,64 @@ def processSubmittedWorkflow(workflow):
except:
condorGroupID = 0

stages = justin.select('SELECT stage_id,processors,jobscript_git,'
'jobscript_image,wall_seconds,rss_bytes '
'FROM stages WHERE workflow_id=%d'
% workflowID)

for stage in stages:
stageID = stage['stage_id']
justin.logLine('Processing submitted w%ds%d' % (workflowID, stageID))

stagesOutputStorages = justin.select(
'SELECT rse_name FROM stages_output_storages '
'LEFT JOIN storages '
'ON storages.rse_id=stages_output_storages.rse_id '
'WHERE stages_output_storages.workflow_id=%d '
'AND stages_output_storages.stage_id=%s '
% (workflowID, stageID))

rseNames = []
for stageOutputStorage in stagesOutputStorages:
rseNames.append(stageOutputStorage['rse_name'])

stagesOutputs = justin.select(
'SELECT destination,lifetime_seconds,pattern_id,file_pattern '
'FROM stages_outputs '
'WHERE workflow_id=%d AND stage_id=%d' % (workflowID, stageID))

for stageOutput in stagesOutputs:
if not stageOutput['destination'].startswith('https://'):
justin.logLine('Processing submitted w%ds%dp%d'
% (workflowID, stageID, stageOutput['pattern_id']))
# A Rucio dataset

# Metadata for the main dataset in MetaCat
metadataDict = { "dune.workflow" :
{ "workflow_id" : workflowID,
"stage_id" : stageID,
"pattern_id" : stageOutput['pattern_id'],
"file_pattern" : stageOutput['file_pattern'],
"user" : workflow['principal_name'],
"processors" : stage['processors'],
"rss_bytes" : stage['rss_bytes'],
"wall_seconds" : stage['wall_seconds'],
"jobscript_image" : stage['jobscript_image']
}
}
# stages = justin.select('SELECT stage_id,processors,jobscript_git,'
# 'jobscript_image,wall_seconds,rss_bytes '
# 'FROM stages WHERE workflow_id=%d'
# % workflowID)
#
# for stage in stages:
# stageID = stage['stage_id']
# justin.logLine('Processing submitted w%ds%d' % (workflowID, stageID))
#
# stagesOutputStorages = justin.select(
# 'SELECT rse_name FROM stages_output_storages '
# 'LEFT JOIN storages '
# 'ON storages.rse_id=stages_output_storages.rse_id '
# 'WHERE stages_output_storages.workflow_id=%d '
# 'AND stages_output_storages.stage_id=%s '
# % (workflowID, stageID))
#
# rseNames = []
# for stageOutputStorage in stagesOutputStorages:
# rseNames.append(stageOutputStorage['rse_name'])
#
# stagesOutputs = justin.select(
# 'SELECT destination,lifetime_seconds,pattern_id,file_pattern '
# 'FROM stages_outputs '
# 'WHERE workflow_id=%d AND stage_id=%d' % (workflowID, stageID))
#
# for stageOutput in stagesOutputs:
# if not stageOutput['destination'].startswith('https://'):
# justin.logLine('Processing submitted w%ds%dp%d'
# % (workflowID, stageID, stageOutput['pattern_id']))
# # A Rucio dataset
#
# # Metadata for the main dataset in MetaCat
# metadataDict = { "dune.workflow" :
# { "workflow_id" : workflowID,
# "stage_id" : stageID,
# "pattern_id" : stageOutput['pattern_id'],
# "file_pattern" : stageOutput['file_pattern'],
# "user" : workflow['principal_name'],
# "processors" : stage['processors'],
# "rss_bytes" : stage['rss_bytes'],
# "wall_seconds" : stage['wall_seconds'],
# "jobscript_image" : stage['jobscript_image']
# }
# }
# ADD RCDS/Git repos in here too with tags and commit hashes

if stage['jobscript_git']:
metadataDict['dune.workflow']['jobscript_git'] = stage['jobscript_git']

createOutputDatasets(workflowID = workflowID,
stageID = stageID,
patternID = stageOutput['pattern_id'],
metadataDict = metadataDict,
scopeName = workflow['scope_name'],
destination = stageOutput['destination'],
lifetimeSeconds = stageOutput['lifetime_seconds'],
rseNames = rseNames)
#
# if stage['jobscript_git']:
# metadataDict['dune.workflow']['jobscript_git'] = stage['jobscript_git']
#
# createOutputDatasets(workflowID = workflowID,
# stageID = stageID,
# patternID = stageOutput['pattern_id'],
# metadataDict = metadataDict,
# scopeName = workflow['scope_name'],
# destination = stageOutput['destination'],
# lifetimeSeconds = stageOutput['lifetime_seconds'],
# rseNames = rseNames)

# Successful so mark workflow as running
# Define refind_next_time as now to force at least one finding cycle
Expand All @@ -843,9 +843,11 @@ def processSubmittedWorkflow(workflow):

justin.insertUpdate(query)

# THIS FUNCTION IS NO LONGER USED AND CAN BE REMOVED ONCE FINISHED PUTTING
# THIS FUNCIONALITY IN THE WRAPPER JOBS
def createOutputDatasets(workflowID, stageID, patternID, metadataDict,
scopeName, destination, lifetimeSeconds, rseNames):
# Create missing main pattern dataset and per-RSE datasets
# Create missing main pattern datasets (per-RSEs done in wrapper job)
# All exceptions will be caught by the caller

justin.logLine('Try to create DID and Rule Rucio clients')
Expand Down
48 changes: 40 additions & 8 deletions agents/justin-wrapper-job
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,14 @@ def updateMetadataTmp(fileName,

with open('tmp.json', 'w') as f:
f.write(json.dumps(metadata, indent = 4, sort_keys = True))

def createDataset(dataset, jobscriptDict):

# Create dataset in MetaCat

# Create dataset in Rucio

# Create Rule

########################################################################
os.environ['TZ'] = 'UTC0'
Expand Down Expand Up @@ -821,10 +829,20 @@ outputFiles = []

# Find files matching output patterns specified for this stage
for (patternType, pattern, patternID) in jobscriptDict['patterns']:
matches = glob.glob('home/workspace/' + pattern)
try:
matches = glob.glob('home/workspace/' + pattern)
except Exception as e:
logLine('Got exception from glob matching of patterns: ' + str(e))
continue

for match in matches:
matchingFile = match.split('/')[-1]
fileSize = os.path.getsize(match)
try:
fileSize = os.path.getsize(match)
except:
logLine('Got exception from os.path.getsize: ' + str(e))
continue

outputFiles.append((matchingFile, fileSize, patternID))
recordResultsDict['output_files'].append((matchingFile, patternID))

Expand Down Expand Up @@ -940,9 +958,10 @@ logLine('Output RSEs: ' + str(resultsResponseDict['output_rses']))
# f.write(resultsResponseDict['user_access_token'])

confirmResultsDict = { 'method' : 'confirm_results',
'output_files' : [] }
'output_files' : {} }

rucioUploadedDIDs = []
createdDatasets = set()

# Go through the list of output files
for (fileName, fileSize, patternID) in outputFiles:
Expand All @@ -961,14 +980,28 @@ for (fileName, fileSize, patternID) in outputFiles:
uploadEndTime = time.time()
except:
jobAborted(317, 'webdav_upload', '')

confirmResultsDict['output_files'].append(
{ 'fileName' : fileName,

confirmResultsDict['output_files'][fileName] = {
'pattern_id' : patternID,
'seconds' : uploadEndTime - uploadStartTime,
'size_bytes' : fileSize }
)
elif resultsResponseDict['output_rses']:
# Create per-RSE dataset if needed
try:
createDataset('w%ds%dp%d-%s' % (workflowID, stageID, patternID, rse),
jobscriptDict)
except Exception as e:
logLine('createDataset fails for per-RSE dataset')
continue

# Create destination dataset if needed
try:
createDataset(destination, jobscriptDict)
except Exception as e:
logLine('createDataset fails for destination dataset')
continue

# Uploading to Rucio managed storage

# Create tmp.json metadata file for this output file
Expand Down Expand Up @@ -1052,8 +1085,7 @@ for (fileName, fileSize, patternID) in outputFiles:
rucioUploadedDIDs.append(jobscriptDict['scope'] + ':' + fileName)

# Add to list of uploaded files for confirm results
confirmResultsDict['output_files'].append(
{ 'fileName' : fileName,
confirmResultsDict['output_files'][fileName] = {
'pattern_id' : patternID,
'rse_name' : rse,
'seconds' : uploadEndTime - uploadStartTime,
Expand Down
9 changes: 9 additions & 0 deletions database/justindb-create-tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ CREATE TABLE IF NOT EXISTS `stages_outputs` (
`lifetime_seconds` int(10) unsigned NOT NULL DEFAULT 86400,
`file_pattern` varchar(255) NOT NULL,
`destination` varchar(512) NOT NULL,
`number_files` int(10) unsigned NOT NULL DEFAULT 0,
`for_next_stage` tinyint(1) NOT NULL DEFAULT '0',
UNIQUE KEY `workflow_stage_pattern` (`workflow_id`,`stage_id`,`pattern_id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
Expand All @@ -324,6 +325,14 @@ CREATE TABLE IF NOT EXISTS `stages_output_storages` (
UNIQUE KEY `workflow_stage_rse` (`workflow_id`,`stage_id`,`rse_id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

CREATE TABLE IF NOT EXISTS `workflows_output_datasets` (
`workflow_id` mediumint(8) unsigned NOT NULL,
`dataset_did` varchar(255) NOT NULL,
`closed` tinyint(1) NOT NULL DEFAULT '0',
`created` datetime NOT NULL DEFAULT '1970-01-01 00:00:00',
UNIQUE KEY `workflow_dataset` (`workflow_id`,`closed`,`dataset_did`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

CREATE TABLE IF NOT EXISTS `stages_input_storages` (
`workflow_id` mediumint(8) unsigned NOT NULL,
`stage_id` tinyint(3) unsigned NOT NULL,
Expand Down
Loading

0 comments on commit 4661892

Please sign in to comment.