Skip to content

Commit

Permalink
Showing 10 changed files with 75 additions and 35 deletions.
16 changes: 9 additions & 7 deletions agents/justin-finder
Original file line number Diff line number Diff line change
@@ -167,18 +167,20 @@ def findFilesMetaCat(workflowID, mql):
logLine("Parsing line from MetaCat fails with: " + str(e))
return

if 'namespace' not in fileDict or not fileDict['namespace'] \
or 'name' not in fileDict or not fileDict['name']:
logLine('Ignore file with invalid SCOPE:NAME from MetaCat: '
try:
namespace = fileDict['namespace']
name = fileDict['name']
size = int(fileDict['size'])
except:
logLine('Ignore file with invalid SCOPE:NAME/SIZE from MetaCat: '
+ str(fileDict))
continue

try:
query = ('INSERT INTO files SET '
'workflow_id=' + str(workflowID) + ',' +
'file_did="' + fileDict['namespace'] + ':'
+ fileDict['name'] + '" '
'workflow_id=%d,file_did="%s:%s",size_bytes=%d '
'ON DUPLICATE KEY UPDATE workflow_id=workflow_id'
% (workflowID, namespace, name, size)
)

justin.cur.execute(query)
28 changes: 15 additions & 13 deletions agents/justin-wrapper-job
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ import shutil
import base64
import urllib
import urllib.request
import random
import subprocess

awtWorkflowID = 1
@@ -424,7 +425,7 @@ for i in sorted(os.environ):

jobsubJobID = os.environ['JUSTIN_JOBSUB_ID']
siteName = os.environ.get('GLIDEIN_DUNESite', 'XX_UNKNOWN')
entryName = os.environ.get('GLIDEIN_Entry_Name', 'UNKNOWN')
entryName = os.environ.get('GLIDEIN_Entry_Name', 'XX_UNKNOWN')
jobPID = os.getpid()

justinWorkdir = os.environ['PWD']
@@ -739,7 +740,8 @@ for (forNextStage, destination, scope, pattern) in \
matches = glob.glob('home/workspace/' + pattern)
for match in matches:
matchingFile = match.split('/')[-1]
outputFiles.append((destination, scope, matchingFile))
fileSize = os.path.getsize(match)
outputFiles.append((destination, scope, matchingFile, fileSize))

if scope == '::URL::':
justinRecordResultsDict['output_urls'].append(destination + '/' +
@@ -851,7 +853,7 @@ confirmResultsDict = { 'method' : 'confirm_results',
rucioUploadedDIDs = []

# Go through the list of output files
for (destination, fileScope, fileName) in outputFiles:
for (destination, fileScope, fileName, fileSize) in outputFiles:

if fileScope == '::URL::':
# Uploading to user scratch
@@ -873,7 +875,6 @@ for (destination, fileScope, fileName) in outputFiles:
getJobscriptDict, jobscriptDict, justinRecordResultsDict)

fileAdler32 = calculateAdler32('home/workspace/' + fileName)
fileSize = os.path.getsize('home/workspace/' + fileName)
except Exception as e:
logLine('updateMetadataTmp() fails ' + str(e))
jobAborted(900, 'update_metadata', '')
@@ -884,15 +885,10 @@ for (destination, fileScope, fileName) in outputFiles:
'"%s:%s"'
% (fileSize, fileAdler32,
'dune', 'all'
# we now use the above hard-coded dataset in MetaCat for everything :-(
# we now use the above hard-coded dataset in MetaCat for everything :-(
# fileScope, destination
))

# # IF DEPLOYED METACAT SERVER DOES NOT NOW MATCH CVMFS VERSION !!!!!
# ret = 0
# print(open('tmp.json','r').read())
# # END CLOWN WORLD STUFF

if ret:
logLine('Failed to register %s:%s in MetaCat' % (fileScope, fileName))
jobAborted(900, 'metacat_registration', '')
@@ -901,7 +897,8 @@ for (destination, fileScope, fileName) in outputFiles:
for (rse,scheme) in jobscriptDict['output_rses'][:3]:
logLine('Try %s:%s to %s/%s'
% (fileScope, fileName, rse, scheme))


uploadStartTime = int(time.time())
ret = executeJustinRucioUpload('--rse %s '
'--protocol %s '
'--scope %s '
@@ -913,8 +910,10 @@ for (destination, fileScope, fileName) in outputFiles:
fileScope,
destination,
fileName))
uploadEndTime = int(time.time())
if ret == 0:
logLine('Uploaded %s:%s to %s' % (fileScope, fileName, rse))
logLine('Uploaded %s:%s to %s in %ds' %
(fileScope, fileName, rse, (uploadEndTime - uploadStartTime))
break
else:
logLine('Failed to upload %s:%s to %s' % (fileScope, fileName, rse))
@@ -936,7 +935,10 @@ for (destination, fileScope, fileName) in outputFiles:
rucioUploadedDIDs.append(fileScope + ':' + fileName)

# Add to list of uploaded files for confirm results
confirmResultsDict['output_dids'][fileScope + ':' + fileName] = rse
confirmResultsDict['output_dids'][fileScope + ':' + fileName] = \
{ "rse_name" : rse,
"seconds" : uploadEndTime - uploadStartTime,
"size_bytes" : fileSize }

# If all ok, then confirm that to the Workflow Allocator

6 changes: 5 additions & 1 deletion containers/start-justin-finder
Original file line number Diff line number Diff line change
@@ -31,4 +31,8 @@ if [ $? != 0 ] ; then
fi

# Run daemon in the foreground
/usr/sbin/justin-finder --container
if [ -x /var/run/justin/overrides/justin-finder ] ; then
/var/run/justin/overrides/justin-finder --container
else
/usr/sbin/justin-finder --container
fi
7 changes: 6 additions & 1 deletion containers/start-justin-finder~
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/bin/sh
# Start justin-finder, for execution inside the justin container
#
# Copyright 2013-24 Andrew McNab for the University of Manchester
@@ -30,4 +31,8 @@ if [ $? != 0 ] ; then
fi

# Run daemon in the foreground
/usr/sbin/justin-finder --container
if [ -e /var/run/justin/overrides/justin-finder ] ; then
/var/run/justin/overrides/justin-finder --container
else
/usr/sbin/justin-finder --container
fi
6 changes: 5 additions & 1 deletion containers/start-justin-info-collector
Original file line number Diff line number Diff line change
@@ -31,4 +31,8 @@ if [ $? != 0 ] ; then
fi

# Run daemon in the foreground
/usr/sbin/justin-info-collector --container
if [ -x /var/run/justin/overrides/justin-info-collector ] ; then
/var/run/justin/overrides/justin-info-collector --container
else
/usr/sbin/justin-info-collector --container
fi
6 changes: 5 additions & 1 deletion containers/start-justin-job-factory
Original file line number Diff line number Diff line change
@@ -31,4 +31,8 @@ if [ $? != 0 ] ; then
fi

# Run daemon in the foreground
/usr/sbin/justin-job-factory --container
if [ -x /var/run/justin/overrides/justin-job-factory ] ; then
/var/run/justin/overrides/justin-job-factory --container
else
/usr/sbin/justin-job-factory --container
fi
6 changes: 5 additions & 1 deletion dashboard/justin-wsgi-dashboard
Original file line number Diff line number Diff line change
@@ -1898,7 +1898,7 @@ def showFile(environ, user, cgiValues):
output += ("<h1>File %s</h1>"
% html.escape(fileDID, quote=True))

query = ('SELECT file_id,state,processed_time '
query = ('SELECT file_id,state,processed_time,size_bytes '
'FROM files '
'WHERE workflow_id=%d AND stage_id=%d and file_did="%s"' %
(workflowID, stageID, fileDID))
@@ -1941,6 +1941,10 @@ def showFile(environ, user, cgiValues):
'<td><a href="/dashboard/?method=show-stage&workflow_id=%d&stage_id=%d">%d</a></td></tr>'
% (workflowID, stageID, stageID))

output += ('<tr><td>Size</td>'
'<td>%d bytes (%.2fGB)</td></tr>'
% (fileRow["size_bytes"], fileRow["size_bytes"] / 1000000000.0 ))

output += ('<tr><td>State</td>'
'<td>%s</td></tr>'
% fileRow["state"])
3 changes: 2 additions & 1 deletion database/justindb-create-tables.sql
Original file line number Diff line number Diff line change
@@ -108,7 +108,7 @@ CREATE TABLE IF NOT EXISTS `events` (
`rse_id` smallint(5) unsigned NOT NULL DEFAULT 0,
`user_id` smallint(5) unsigned NOT NULL DEFAULT 0,
`event_time` datetime NOT NULL DEFAULT '1970-01-01 00:00:00',
`milliseconds` mediumint(8) unsigned NOT NULL DEFAULT 0,
`seconds` mediumint(8) unsigned NOT NULL DEFAULT 0,
PRIMARY KEY (`event_id`),
INDEX `awt` (`event_type_id`,`rse_id`,`site_id`,`event_time`),
INDEX `workflow_id` (`workflow_id`,`stage_id`,`event_type_id`,`rse_id`),
@@ -127,6 +127,7 @@ CREATE TABLE IF NOT EXISTS `files` (
`state` enum('finding','unallocated','allocated',
'outputting','processed','notfound','failed',
'recorded', 'output') NOT NULL DEFAULT 'finding',
`size_bytes` bigint not null default 0,
`justin_job_id` int(10) unsigned NOT NULL DEFAULT 0,
`processed_time` datetime NOT NULL DEFAULT '1970-01-01 00:00:00',
`processed_hour` mediumint(8) unsigned NOT NULL DEFAULT 0,
7 changes: 5 additions & 2 deletions modules/__init__.py
Original file line number Diff line number Diff line change
@@ -467,7 +467,8 @@ def logEvent(eventTypeID = event_UNDEFINED,
siteID = 0,
siteName = None,
rseID = 0,
rseName = None):
rseName = None,
milliseconds = 0):

if siteName:
siteExpr = ('(SELECT site_id FROM sites WHERE sites.site_name="%s")'
@@ -490,14 +491,16 @@ def logEvent(eventTypeID = event_UNDEFINED,
'justin_job_id=%d,'
'site_id=%s,'
'rse_id=%s,'
'milliseconds=%d,'
'event_time=NOW()' %
(eventTypeID,
workflowID,
stageID,
fileID,
justinJobID,
siteExpr,
rseExpr))
rseExpr,
milliseconds))

cur.execute(query)
return None
25 changes: 18 additions & 7 deletions services/justin-wsgi-allocator
Original file line number Diff line number Diff line change
@@ -1695,22 +1695,33 @@ def confirmResultsMethod(startResponse, jsonDict):
for fileRow in fileRows:

try:
justin.insertUpdate('UPDATE files SET state="%s" WHERE file_id=%d' %
('finding' if (fileRow['stage_id'] > 0) else 'output',
fileRow['file_id']))

try:
rseName = jsonDict['output_dids'][fileRow['file_did']]
rseName = jsonDict['output_dids'][fileRow['file_did']]['rse_name']
sizeBytes = jsonDict['output_dids'][fileRow['file_did']]['size_bytes']
seconds = jsonDict['output_dids'][fileRow['file_did']]['seconds']
except:
rseName = None
sizeBytes = 0
seconds = 0
try:
# THIS IS ONLY NEEDED WHILE WE HAVE OLD STYLE WRAPPER JOBS AROUND
rseName = jsonDict['output_dids'][fileRow['file_did']]
except:
rseName = None

justin.insertUpdate('UPDATE files SET state="%s",size_bytes=%d '
'WHERE file_id=%d' %
('finding' if (fileRow['stage_id'] > 0) else 'output',
sizeBytes,
fileRow['file_id']))

justin.logEvent(eventTypeID = justin.event_FILE_CREATED,
workflowID = workflowID,
stageID = stageID,
fileID = fileRow['file_id'],
justinJobID = justinJobID,
siteID = siteID,
rseName = rseName
rseName = rseName,
seconds = seconds
)
except Exception as e:
return httpError(startResponse,

0 comments on commit 53e8240

Please sign in to comment.