diff --git a/src/python/CRABClient/ClientMapping.py b/src/python/CRABClient/ClientMapping.py index 9c9c3fe9..08291fed 100644 --- a/src/python/CRABClient/ClientMapping.py +++ b/src/python/CRABClient/ClientMapping.py @@ -79,6 +79,7 @@ {'default': False, 'config': ['JobType.disableAutomaticOutputCollection'],'type': 'BooleanType', 'required': False}, {'default': None, 'config': ['JobType.copyCatTaskname'], 'type': 'StringType', 'required': False}, {'default': 'prod', 'config': ['JobType.copyCatInstance'], 'type': 'StringType', 'required': False}, + {'default': None, 'config': ['JobType.copyCatWorkdir'], 'type': 'StringType', 'required': False}, {'default': [], 'config': ['JobType.inputFiles'], 'type': 'ListType', 'required': False} ] } @@ -152,6 +153,8 @@ 'tasks' : {'acceptsArguments': False, 'requiresREST': True, 'requiresRucio': False, 'requiresDirOption': False, 'useCache': False, 'requiresProxyVOOptions': False, 'requiresLocalCache': False}, 'uploadlog' : {'acceptsArguments': False, 'requiresREST': True, 'requiresRucio': False, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': False, 'requiresLocalCache': False}, 'preparelocal' : {'acceptsArguments': False, 'requiresREST': True, 'requiresRucio': False, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': False, 'requiresLocalCache': True}, + 'recover' : {'acceptsArguments': False, 'requiresREST': True, 'requiresRucio': False, 'requiresDirOption': True, 'useCache': False, 'requiresProxyVOOptions': False, 'requiresLocalCache': False}, + 'getsandbox' : {'acceptsArguments': False, 'requiresREST': True, 'requiresRucio': False, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': False, 'requiresLocalCache': True }, } diff --git a/src/python/CRABClient/ClientUtilities.py b/src/python/CRABClient/ClientUtilities.py index 5b133ded..80c61d62 100644 --- a/src/python/CRABClient/ClientUtilities.py +++ b/src/python/CRABClient/ClientUtilities.py @@ -356,7 +356,7 @@ def getRequestName(requestName=None): prefix = 'crab_' postfix = str(datetime.datetime.now().strftime("%Y%m%d_%H%M%S")) - if requestName is None or not isinstance(requestName, str) or len(requestName) == 0: + if requestName is None or requestName == "None" or not isinstance(requestName, str) or len(requestName) == 0: return prefix + postfix elif '/' in requestName: msg = "%sError%s: The '/' character is not accepted in the requestName parameter." % (colors.RED, colors.NORMAL) diff --git a/src/python/CRABClient/Commands/SubCommand.py b/src/python/CRABClient/Commands/SubCommand.py index 7f87f7b5..4d0c97f5 100644 --- a/src/python/CRABClient/Commands/SubCommand.py +++ b/src/python/CRABClient/Commands/SubCommand.py @@ -270,6 +270,7 @@ def __init__(self, logger, cmdargs=None, disable_interspersed_args=False): # Parse the command options/arguments. cmdargs = cmdargs or [] + self.cmdargs = cmdargs (self.options, self.args) = self.parser.parse_args(cmdargs) self.transferringIds = None diff --git a/src/python/CRABClient/Commands/getsandbox.py b/src/python/CRABClient/Commands/getsandbox.py new file mode 100644 index 00000000..8416ac69 --- /dev/null +++ b/src/python/CRABClient/Commands/getsandbox.py @@ -0,0 +1,95 @@ +import os + +from CRABClient.Commands.SubCommand import SubCommand + +from CRABClient.UserUtilities import curlGetFileFromURL, getColumn + +from ServerUtilities import downloadFromS3, getProxiedWebDir + + +class getsandbox(SubCommand): + """ + given a projdir, downloads locally the user sandbox. + It will try s3 first, otherwise it will fall back to the schedd WEBDIR. + """ + + name = "getsandbox" + + def __call__(self): + + # init. debug. print useful info + self.logger.debug("requestarea: %s", self.requestarea) + self.logger.debug("cachedinfo: %s", self.cachedinfo) + + # get information necessary for next steps + # Get all of the columns from the database for a certain task + self.taskname = self.cachedinfo['RequestName'] + self.crabDBInfo, _, _ = self.crabserver.get(api='task', data={'subresource':'search', 'workflow':self.taskname}) + self.logger.debug("Got information from server oracle database: %s", self.crabDBInfo) + + # arguments used by following functions + self.downloadDir = os.path.join(self.requestarea, "taskconfig") + + # download files: user sandbox, debug sandbox + filelist = [] + # usersandbox = self.downloadUserSandbox() + usersandbox = self.downloadSandbox( + remotefile=getColumn(self.crabDBInfo, 'tm_user_sandbox'), + localfile='sandbox.tar.gz') + filelist.append(usersandbox) + # debugfiles = self.downloadDebug() + debugfiles = self.downloadSandbox( + remotefile=getColumn(self.crabDBInfo, 'tm_debug_files'), + localfile='debug_files.tar.gz') + filelist.append(debugfiles) + + returnDict = {"commandStatus": "FAILED"} + if filelist: + returnDict = {"commandStatus": "SUCCESS", "sandbox_paths": filelist } + + return returnDict + + def downloadSandbox(self, remotefile, localfile): + """ + Copy remotefile from s3 to localfile on local disk. + + If remotefile is not s3, then as a fallback we look for the corresponding + localfile in the schedd webdir. + """ + username = getColumn(self.crabDBInfo, 'tm_username') + sandboxFilename = remotefile + + self.logger.debug("will download sandbox from s3: %s",sandboxFilename) + + if not os.path.isdir(self.downloadDir): + os.mkdir(self.downloadDir) + localSandboxPath = os.path.join(self.downloadDir, localfile) + + try: + downloadFromS3(crabserver=self.crabserver, + filepath=localSandboxPath, + objecttype='sandbox', logger=self.logger, + tarballname=sandboxFilename, + username=username + ) + except Exception as e: + self.logger.info("Sandbox download failed with %s", e) + self.logger.info("We will look for the sandbox on the webdir of the schedd") + + webdir = getProxiedWebDir(crabserver=self.crabserver, task=self.taskname, + logFunction=self.logger.debug) + if not webdir: + webdir = getColumn(self.crabDBInfo, 'tm_user_webdir') + self.logger.debug("Downloading %s from %s", localfile, webdir) + httpCode = curlGetFileFromURL(webdir + '/' + localfile, + localSandboxPath, self.proxyfilename, + logger=self.logger) + if httpCode != 200: + self.logger.error("Failed to download %s from %s", localfile, webdir) + raise Exception("We could not locate the sandbox in the webdir neither.") + # we should use + # raise Exception("We could not locate the sandbox in the webdir neither.") from e + # but that is not py2 compatible... + + return localSandboxPath + diff --git a/src/python/CRABClient/Commands/recover.py b/src/python/CRABClient/Commands/recover.py new file mode 100644 index 00000000..ebeb1ec3 --- /dev/null +++ b/src/python/CRABClient/Commands/recover.py @@ -0,0 +1,689 @@ +import re +import os +import tarfile +import datetime + +from CRABClient.Commands.SubCommand import SubCommand + +# step: remake +from CRABClient.Commands.remake import remake +from CRABClient.ClientUtilities import colors +from CRABClient.ClientExceptions import MissingOptionException, ConfigurationException + +# step kill +from CRABClient.Commands.kill import kill +from CRABClient.UserUtilities import getUsername + +# step report +from CRABClient.Commands.report import report +from CRABClient.JobType.BasicJobType import BasicJobType + +# step status +from CRABClient.Commands.status import status +from CRABClient.ClientUtilities import LOGLEVEL_MUTE + +# step getsandbox +from CRABClient.Commands.getsandbox import getsandbox + +# step submit +from CRABClient.Commands.submit import submit +from CRABClient.UserUtilities import getColumn +from CRABClient.ClientUtilities import colors +from ServerUtilities import SERVICE_INSTANCES + +SPLITTING_RECOVER_LUMIBASED = set(("LumiBased", "Automatic", "EventAwareLumiBased")) +SPLITTING_RECOVER_FILEBASED = set(("FileBased")) + +class recover(SubCommand): + """ + given a taskname, create a new task that process only what the original task + did not process yet + """ + + name = "recover" + shortnames = ["rec"] + + def __call__(self): + + retval = self.stepInit() + if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + + if not self.cmdconf["requiresDirOption"]: + self.failingTaskName = self.options.cmptask + retval = self.stepRemake() + if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + self.crabProjDir = retval["workDir"] + else: + self.logger.debug("no need to run crab remake - self.cachedinfo %s", self.cachedinfo) + self.failingTaskName = self.cachedinfo['RequestName'] + self.restHostCommonname = findServerInstance(self.serverurl, self.instance) + self.logger.debug("no need to run crab remake - self.serverurl %s", self.serverurl) + self.logger.debug("no need to run crab remake - self.instance %s", self.instance) + self.logger.debug("no need to run crab remake - self.restHostCommonname %s", self.restHostCommonname) + self.crabProjDir = self.requestarea + + retval = self.stepValidate() + if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + + retval = self.stepStatus() + if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + + retval = self.stepKill() + if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + retval = self.stepCheckKill() + if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + + retval = self.stepGetsandbox() + if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + retval = self.stepExtractSandbox(retval["sandbox_paths"]) + if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + + if self.failingTaskInfo["splitalgo"] in SPLITTING_RECOVER_LUMIBASED: + retval = self.stepReport() + if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + + if "recoverLumimaskPath" not in retval: + return retval + + retval = self.stepSubmitLumiBased(retval["recoverLumimaskPath"]) + if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + + elif self.failingTaskInfo["splitalgo"] in SPLITTING_RECOVER_FILEBASED: + retval = self.stepSubmitFileBased() + if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + + # no need for "else" here, the splitting algo should already be checked in + # stepRemakeAndValidate + + self.logger.debug("recover - retval %s", retval) + self.logger.info("crab recover - submitted recovery task %s", retval["uniquerequestname"]) + return retval + + def stepExit(self, retval): + """ + Callback to be executed after every step executes. + Handy if you want to add some logging before the crab recover exits, + whatever step the recover fails at. + + Intended to be used as: + + > retval = self.stepYYY() + > if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval) + """ + return retval + + def stepInit(self): + """ + whatever need to be done before starting with the actual recover process + + - [x] (debug) log the value of some internal variables + + side effects: none + """ + # self.options and self.args are automatically filled by the __init__() + # that recover inherits from SubCommand. + + self.logger.debug("stepInit() - self.cmdconf %s", self.cmdconf) + self.logger.debug("stepInit() - self.cmdargs %s", self.cmdargs) + self.logger.debug("stepInit() - self.options %s", self.options) + self.logger.debug("stepInit() - self.args %s", self.args) + + self.failingTaskStatus = None + self.failingTaskInfo = {} + self.failedJobs = [] + + return {"commandStatus": "SUCCESS", "init": None } + + def stepRemake(self): + """ + run crab remake then download the info from task DB. + Use it to perform basic validation of the task that the user wants to recover. + we support: + - analysis tasks, not PrivateMC + - tasks that have been submitted less than 30d ago + - because crab report needs info from the schedd + - splitting algorithms based on lumisections and files. + + side effects: + - if needed, create a new directory locally with requestcache for the + original failing task + + TODO an alternative would be to use calling other commands via the crabapi, + as done with "multicrab". + https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRABClientLibraryAPI#Multicrab_using_the_crabCommand + """ + + # step1: remake + cmdargs = [] + cmdargs.append("--task") + cmdargs.append(self.failingTaskName) + if "instance" in self.options.__dict__.keys(): + cmdargs.append("--instance") + cmdargs.append(self.options.__dict__["instance"]) + if "proxy" in self.options.__dict__.keys(): + cmdargs.append("--proxy") + cmdargs.append(self.options.__dict__["proxy"]) + self.logger.debug("stepRemakeAndValidate() - remake, cmdargs: %s", cmdargs) + remakeCmd = remake(logger=self.logger, cmdargs=cmdargs) + with SubcommandExecution(self.logger, "remake") as _: + retval = remakeCmd.remakecache(self.failingTaskName) + self.logger.debug("stepRemakeAndValidate() - remake, retval: %s", retval) + self.logger.debug("stepRemakeAndValidate() - remake, after, self.configuration: %s", self.configuration) + return retval + + def stepValidate(self): + """ + + """ + + ## validate + ## - we can not recover a task that is older than 30d, because we need + ## files from the schedd about the status of each job + ## - we want to recover only "analysis" tasks + self.failingCrabDBInfo, _, _ = self.crabserver.get(api='task', data={'subresource':'search', 'workflow':self.failingTaskName}) + self.logger.debug("stepRemakeAndValidate() - Got information from server oracle database: %s", self.failingCrabDBInfo) + startTimeDb = getColumn(self.failingCrabDBInfo, 'tm_start_time') + # 2023-10-24 10:56:26.573303 + # datetime.fromisoformat is not available on py3, we need to use strptime + # startTime = datetime.datetime.fromisoformat(startTimeDb) + # https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes + startTime = datetime.datetime.strptime(startTimeDb, "%Y-%m-%d %H:%M:%S.%f") + self.logger.debug("Failing task start time %s %s %s", startTimeDb, startTime, type(startTime)) + + if not startTime >= (datetime.datetime.now() - datetime.timedelta(days=30)): + msg = "The failing task was submitted more than 30d ago. We can not recover it." + return {"commandStatus": "FAILED", "step": "RemakeAndValidate" , "msg": msg } + + failingJobType = getColumn(self.failingCrabDBInfo, 'tm_job_type') + if not failingJobType == "Analysis": + msg = 'crab recover supports only tasks with JobType.pluginName=="Analysis", you have {}'.format(failingJobType) + return {"commandStatus": "FAILED", "step": "RemakeAndValidate" , "msg": msg } + + splitalgo = getColumn(self.failingCrabDBInfo, 'tm_split_algo') + if not splitalgo in SPLITTING_RECOVER_LUMIBASED.union(SPLITTING_RECOVER_FILEBASED): + msg = 'crab recover supports only tasks with LumiBased and FileBased splitting, you have {}'.format(splitalgo) + return {"commandStatus": "FAILED", "step": "RemakeAndValidate" , "msg": msg } + + self.failingTaskInfo["splitalgo"] = splitalgo + self.failingTaskInfo["publication"] = True if getColumn(self.failingCrabDBInfo, 'tm_publication') == "T" else False + self.failingTaskInfo["username"] = getColumn(self.failingCrabDBInfo, 'tm_username') + + self.logger.debug("stepRemakeAndValidate() - failingtaskinfo - %s", self.failingTaskInfo) + + return {"commandStatus": "SUCCESS", "validate": None } + + def stepStatus(self): + """ + designed for: + + - [x] filebased splitting + - [x] step check kill + + side effects: none + """ + + cmdargs = [] + cmdargs.append("-d") + cmdargs.append(str(self.crabProjDir)) + if "instance" in self.options.__dict__.keys(): + cmdargs.append("--instance") + cmdargs.append(self.options.__dict__["instance"]) + if "proxy" in self.options.__dict__.keys(): + cmdargs.append("--proxy") + cmdargs.append(self.options.__dict__["proxy"]) + self.logger.debug("stepStatus() - status, cmdargs: %s", cmdargs) + statusCmd = status(logger=self.logger, cmdargs=cmdargs) + self.logger.debug("stepStatus() - handlers %s", self.logger.handlers) + + ## old + # handlerLevels = [] + # for h in self.logger.handlers: + # handlerLevels.append(h.level) + # h.setLevel(LOGLEVEL_MUTE) + # retval = statusCmd() + # self.failingTaskStatus = retval + # for idx, h in enumerate(self.logger.handlers): + # h.setLevel(handlerLevels[idx]) + # self.logger.debug("stepStatus() - handlers %s", self.logger.handlers) + # self.logger.debug("stepStatus() - handlers %s", handlerLevels) + + ## new + with SubcommandExecution(self.logger, "status") as _: + retval = statusCmd() + self.failingTaskStatus = retval + + self.logger.debug("stepStatus() - status, retval: %s", retval) + + ## useful for filebased splitting + # convert + # 'jobList': [['failed', '1'], ['failed', '2'], ['finished', '3'], ['failed', '4'], ['finished', '5']] + # to + # [1, 2, 4] + self.failedJobs = [job[1] for job in retval["jobList"] if job[0] == "failed"] + self.logger.debug("stepStatus() - status, failedJobs: %s", self.failedJobs) + + return retval + + def stepKill(self): + """ + side effects: + - kills the original failing task + """ + ## step2: kill + + # if the task is already killed or about to be killed, do not kill again + if self.failingTaskStatus["dbStatus"] == "KILLED" or \ + (self.failingTaskStatus["dbStatus"] in ("NEW", "QUEUED") and self.failingTaskStatus["command"] == "KILL"): + returnDict = {'kill' : 'already killed', 'commandStatus': 'SUCCESS'} + self.logger.info("step kill - task already killed") + return returnDict + + # avoid that crab operators kill users tasks by mistake. + # if the user who is running crab recover differs from the one who submitted the original task, + # then kill the task only if the option "--forcekill" is used. + username = getUsername(self.proxyfilename, logger=self.logger) + if self.failingTaskInfo["username"] != username and not self.options.__dict__["forceKill"]: + returnDict = {'kill' : 'do not kill task submitted by another user', 'commandStatus': 'FAILED'} + self.logger.info("step kill - task submitted by another user, will not kill it") + return returnDict + + cmdargs = [] + cmdargs.append("-d") + cmdargs.append(str(self.crabProjDir)) + cmdargs.append("--killwarning") + cmdargs.append("Task killed by crab recover on '{}', by '{}'".format(datetime.datetime.now(), username)) + if "instance" in self.options.__dict__.keys(): + cmdargs.append("--instance") + cmdargs.append(self.options.__dict__["instance"]) + if "proxy" in self.options.__dict__.keys(): + cmdargs.append("--proxy") + cmdargs.append(self.options.__dict__["proxy"]) + self.logger.debug("stepKill() - cmdargs: %s", cmdargs) + killCmd = kill(logger=self.logger, cmdargs=cmdargs) + with SubcommandExecution(self.logger, "kill") as _: + retval = killCmd() + + self.logger.debug("stepKill() - retval: %s", retval) + self.logger.debug("stepKill() - after, self.configuration: %s", self.configuration) + + return retval + + def stepCheckKill(self): + """ + make sure that no more files will be produced in the original failing task + + - make sure that the failing task is killed, or about to be killed + - "about to be killed" means "(new|queued) on command kill" + - make sure that all jobs are either finished or failed + - job status comes from status_cache. job+stageout + - no info about publication + - TODO make sure that we can identify this also in the case of an autosplitting task + + side effects: none + + --- + + jobsPerStatus can be: + - final: finished + - final: failed + - final: killed + - transient: idle + - transient: running + - transient: transferring + - transient: cooloff + - transient: held TODO + jobs should not go into held unless for systemPeriodicHold + we are not sure if this will ever happen. In order to be safe and cautious, + we consider this status as transiend and refuse task recovery. + The user will encounter a problem and contact us + + ## These are all possible statuses of a task in the TaskDB. + TASKDBSTATUSES_TMP = ['NEW', 'HOLDING', 'QUEUED', 'TAPERECALL', 'KILLRECALL'] + TASKDBSTATUSES_FAILURES = ['SUBMITFAILED', 'KILLFAILED', 'RESUBMITFAILED', 'FAILED'] + TASKDBSTATUSES_FINAL = ['UPLOADED', 'SUBMITTED', 'KILLED'] + TASKDBSTATUSES_FAILURES + TASKDBSTATUSES = TASKDBSTATUSES_TMP + TASKDBSTATUSES_FINAL + ## These are all possible statuses of a task as returned by the `status' API. + TASKSTATUSES = TASKDBSTATUSES + ['COMPLETED', 'UNKNOWN', 'InTransition'] + + transfer states: + TRANSFERDB_STATES = {0: "NEW", + 1: "ACQUIRED", + 2: "FAILED", + 3: "DONE", + 4: "RETRY", + 5: "SUBMITTED", + 6: "KILL", + 7: "KILLED"} + publication states: + PUBLICATIONDB_STATES = {0: "NEW", + 1: "ACQUIRED", + 2: "FAILED", + 3: "DONE", + 4: "RETRY", + 5: "NOT_REQUIRED"} + """ + + # make sure the the "task status" is a "static" one + self.logger.debug("stepCheckKill() - status %s", self.failingTaskStatus["status"]) + self.logger.debug("stepCheckKill() - command %s", self.failingTaskStatus["command"]) + self.logger.debug("stepCheckKill() - dagStatus %s", self.failingTaskStatus["dagStatus"]) + self.logger.debug("stepCheckKill() - dbStatus %s", self.failingTaskStatus["dbStatus"]) + + # check the task status. + # it does not make sense to recover a task in COMPLETED + if not self.failingTaskStatus["status"] in ("SUBMITTED", "FAILED", "FAILED (KILLED)"): + msg = "In order to recover a task, the combined status of the task needs can not be {}".format(self.failingTaskStatus["status"]) + return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + + # the status on the db should be submitted or killed. or about to be killed + if self.failingTaskStatus["dbStatus"] in ("NEW", "QUEUED"): + if not self.failingTaskStatus["command"] in ("KILL"): + msg = "In order to recover a task, when the status of the task in the oracle DB is {}, the task command can not be {}"\ + .format(self.failingTaskStatus["dbStatus"], self.failingTaskStatus["command"]) + return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + else: + if not self.failingTaskStatus["dbStatus"] in ("SUBMITTED", "KILLED"): + msg = "In order to recover a task, the status of the task in the oracle DB can not be {}"\ + .format(self.failingTaskStatus["dbStatus"]) + return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + + # make sure that the jobs ad publications are in a final state. + # - [x] make sure that there are no ongoing transfers + # transfers are accounted in the job status + # considering as transient: "idle", "running", "transferring", "cooloff", "held" + terminalStates = set(("finished", "failed", "killed")) + # python2: need to convert .keys() into a set + if not set(self.failingTaskStatus["jobsPerStatus"].keys()).issubset(terminalStates): + msg = "In order to recover a task, all the jobs need to be in a terminal state ({}). You have {}"\ + .format(terminalStates, self.failingTaskStatus["jobsPerStatus"].keys()) + return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + + # - [x] make sure that there are no ongoing publications + self.logger.debug("stepCheckKill - publication %s", self.failingTaskStatus["publication"] ) + terminalStatesPub = set(("failed", "done", "not_required", "disabled")) + if not set(self.failingTaskStatus["publication"].keys()).issubset(terminalStatesPub): + msg = "In order to recover a task, publication for all the jobs need to be in a terminal state ({}). You have {}"\ + .format(terminalStatesPub, self.failingTaskStatus["publication"].keys()) + return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + + # - [x] if all jobs failed, then exit. it is better to submit again the task than using crab recover :) + # check that "failed" is the only key of the jobsPerStatus dictionary + if set(self.failingTaskStatus["jobsPerStatus"].keys()) == set(("failed",)): + msg = "All the jobs of the original task failed. better submitting it again from scratch than recovering it." + return {"commandStatus": "FAILED", "step": "checkKill" , "msg": msg } + + return {"commandStatus": "SUCCESS", "checkkill": "task can be recovered"} + + def stepReport(self): + """ + used to compute which lumisections have not been processed by the original task. + requires TW to have processed lumisection information for submitting the original task. + it does not support filebased splitting. + + side effects: + - populates the directory "result" inside the workdir of the original failing task + with the output of crab report + """ + + failingTaskPublish = getColumn(self.failingCrabDBInfo, 'tm_publication') + self.logger.debug("stepReport() - tm_publication: %s %s", type(failingTaskPublish), failingTaskPublish) + # - if the user specified --strategy=notPublished but the original failing task + # disabled publishing, then `crab report` fails and raises and exception. + # so, we will automatically switch to notFinished and print a warning + # - assuming "strategy" is always in self.options.__dict__.keys(): + if failingTaskPublish != "T" and self.options.__dict__["strategy"] != "notFinished": + self.logger.warning("WARNING - crab report - The original task had publication disabled. recovery strategy changed to notFinished") + self.options.__dict__["strategy"] = "notFinished" + + try: + os.remove(os.path.join(self.crabProjDir, "results", "notFinishedLumis.json")) + os.remove(os.path.join(self.crabProjDir, "results", "notPublishedLumis.json")) + self.logger.info("crab report - needed to delete existing files!") + except: + pass + + cmdargs = [] + cmdargs.append("-d") + cmdargs.append(str(self.crabProjDir)) + # if "strategy" in self.options.__dict__.keys(): + cmdargs.append("--recovery") + cmdargs.append(self.options.__dict__["strategy"]) + if "instance" in self.options.__dict__.keys(): + cmdargs.append("--instance") + cmdargs.append(self.options.__dict__["instance"]) + if "proxy" in self.options.__dict__.keys(): + cmdargs.append("--proxy") + cmdargs.append(self.options.__dict__["proxy"]) + + self.logger.debug("stepReport() - report, cmdargs: %s", cmdargs) + reportCmd = report(logger=self.logger, cmdargs=cmdargs) + with SubcommandExecution(self.logger, "report") as _: + # FIXME - stays noisy because interference with getMutedStatusInfo() + retval = reportCmd() + self.logger.debug("stepReport() - report, after, self.configuration: %s", self.configuration) + self.logger.debug("stepReport() - report, retval: %s", retval) + + recoverLumimaskPath = "" + if failingTaskPublish == "T" and self.options.__dict__["strategy"] == "notPublished": + recoverLumimaskPath = os.path.join(self.crabProjDir, "results", "notPublishedLumis.json") + # print a proper error message if the original task+recovery task(s) have processed everything. + publishedAllLumis = True + for dataset, lumis in retval["outputDatasetsLumis"].items(): + notPublishedLumis = BasicJobType.subtractLumis(retval["lumisToProcess"], lumis ) + self.logger.debug("stepReport() - report, subtract: %s %s", + dataset, notPublishedLumis) + if notPublishedLumis: + publishedAllLumis = False + if publishedAllLumis: + self.logger.info("stepReport() - all lumis have been published in the output dataset. crab recover will exit") + else: + if failingTaskPublish == "T" and self.options.__dict__["strategy"] == "notFinished": + self.logger.warning("%sWarning%s: You are recovering a task with publication enabled with notFinished strategy, this will likely cause to have DUPLICATE LUMIS in the output dataset." % (colors.RED, colors.NORMAL)) + # the only other option should be self.options.__dict__["strategy"] == "notFinished": + recoverLumimaskPath = os.path.join(self.crabProjDir, "results", "notFinishedLumis.json") + # print a proper error message if the original task+recovery task(s) have processed everything. + if not retval["notProcessedLumis"]: + # we will likely never reach this if, because in this case the status on the schedd + # should be COMPLETED, which is not accepted by stepCheckKill + self.logger.info("stepReport() - all lumis have been processed by original task. crab recover will exit") + + self.logger.debug("crab report - recovery task will process lumis contained in file %s", recoverLumimaskPath) + + + if os.path.exists(recoverLumimaskPath): + returnDict = {'commandStatus' : 'SUCCESS', 'recoverLumimaskPath': recoverLumimaskPath} + else: + msg = 'the file {} does not exist. crab report could not produce it, the task can not be recovered'.format(recoverLumimaskPath) + returnDict = {'commandStatus' : 'FAILED', 'msg': msg} + + return returnDict + + def stepGetsandbox(self): + """ + side effects: + - download the user_ and debug_sandbox from s3 or from the schedd + """ + + cmdargs = [] + cmdargs.append("-d") + cmdargs.append(str(self.crabProjDir)) + if "instance" in self.options.__dict__.keys(): + cmdargs.append("--instance") + cmdargs.append(self.options.__dict__["instance"]) + if "proxy" in self.options.__dict__.keys(): + cmdargs.append("--proxy") + cmdargs.append(self.options.__dict__["proxy"]) + self.logger.debug("stepGetsandbox() - cmdargs: %s", cmdargs) + getsandboxCmd = getsandbox(logger=self.logger, cmdargs=cmdargs) + with SubcommandExecution(self.logger, "getsandbox") as _: + retval = getsandboxCmd() + self.logger.debug("stepGetsandbox() - retval: %s", retval) + return retval + + def stepExtractSandbox(self, sandbox_paths): + """ + This step prepares all the information needed for the submit step + + side effects: + - extracts the user_ and debug_sandbox, so that the files that they contain + can be used by crab submit at a later step + """ + debug_sandbox = tarfile.open(sandbox_paths[0]) + debug_sandbox.extractall(path=os.path.join(self.crabProjDir, "user_sandbox")) + debug_sandbox.close() + + debug_sandbox = tarfile.open(sandbox_paths[1]) + debug_sandbox.extractall(path=os.path.join(self.crabProjDir, "debug_sandbox")) + debug_sandbox.close() + + self.recoverconfig = os.path.join(self.crabProjDir, "debug_sandbox", + "debug" , "crabConfig.py") + + return {"commandStatus": "SUCCESS", } + + def stepSubmitLumiBased(self, notFinishedJsonPath): + """ + Submit a recovery task in the case that the original failing task + - is of type Analysis + - used LumiBased splitting algorithm + + side effect: + - submits a new task + """ + + cmdargs = [] + cmdargs.append("-c") + cmdargs.append(self.recoverconfig) + if "proxy" in self.options.__dict__.keys(): + cmdargs.append("--proxy") + cmdargs.append(self.options.__dict__["proxy"]) + if "destinstance" in self.options.__dict__.keys(): + cmdargs.append("--instance") + cmdargs.append(self.options.__dict__["destinstance"]) + + # override config arguments with + # https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRAB3ConfigurationFile#Passing_CRAB_configuration_param + cmdargs.append("General.requestName=None") + cmdargs.append("General.workArea=.") + cmdargs.append("Data.lumiMask={}".format(notFinishedJsonPath)) + cmdargs.append("JobType.pluginName=Recover") + cmdargs.append("JobType.copyCatTaskname={}".format(self.failingTaskName)) + cmdargs.append("JobType.copyCatWorkdir={}".format(self.crabProjDir)) + cmdargs.append("JobType.copyCatInstance={}".format(self.restHostCommonname)) + scriptexe = getColumn(self.failingCrabDBInfo, 'tm_scriptexe') + if scriptexe: + cmdargs.append("JobType.scriptExe={}".format(os.path.join(self.crabProjDir, "debug_sandbox" , scriptexe))) + cmdargs.append("JobType.psetName={}".format(os.path.join(self.crabProjDir, "debug_sandbox" , "debug", "originalPSet.py"))) + + # when the user running crab recover does not match the user who originally submited the task, + # then it is likely to be a crab operator, and we should not send filen in the same + # base dir as the original failing task. + # we should use default tm_output_lfn when recovering task submitted by another user + username = getUsername(self.proxyfilename, logger=self.logger) + if self.failingTaskInfo["username"] != username: + cmdargs.append("Data.outLFNDirBase=/store/user/{}".format(username)) + + self.logger.warning("crab submit - recovery task will process lumis contained in file config.Data.lumiMask=%s", notFinishedJsonPath) + self.logger.debug("stepSubmit() - cmdargs %s", cmdargs) + submitCmd = submit(logger=self.logger, cmdargs=cmdargs) + + # with SubcommandExecution(self.logger, "submit") as _: + retval = submitCmd() + self.logger.debug("stepSubmit() - retval %s", retval) + return retval + + def stepSubmitFileBased(self): + """ + Submit a recovery task in the case that the original failing task + - is of type Analysis + - used FileBased splitting algorithm + + what's missing? + - [ ] if the input is from DBS, then write info to runs_and_lumis.tar.gz + """ + + # TODO + # I will need to implement this! + return {'commandStatus': 'FAILED', 'error': 'not implemented yet'} + + def setOptions(self): + """ + __setOptions__ + + """ + # step: remake + # --dir, --cmptask, --instance already added elsewhere: + + # step: recovery + self.parser.add_option("--strategy", + dest = "strategy", + default="notPublished", + help = "When using lumibased splitting, sets crab report --recovery=$option") + + self.parser.add_option("--destinstance", + dest = "destinstance", + default = None, + help = "(Experimental) The CRABServer instance where you want to submit the recovery task to. Should be used by crab operators only") + # if the user sets this option, then it gets tricky to setup a link between the original + # failing task and the recovery task. + # this option is to be considered experimental and useful for developers only + + # step: kill + self.parser.add_option("--forcekill", + action="store_true", dest="forceKill", default=False, + help="Allows to kill failing task submitted by another user. Effective only for crab operators") + + def validateOptions(self): + """ + __validateOptions__ + + """ + + # step: remake. + if self.options.cmptask is None and self.options.projdir is None: + msg = "%sError%s: Please specify a CRAB task project directory or the task name for which to remake a CRAB project directory." % (colors.RED, colors.NORMAL) + msg += " Use the --dir or the --task option." + ex = MissingOptionException(msg) + ex.missingOption = "cmptask" + raise ex + elif self.options.projdir: + self.cmdconf["requiresDirOption"] = True + elif self.options.cmptask: + regex = "^\d{6}_\d{6}_?([^\:]*)\:[a-zA-Z0-9-]+_(crab_)?.+" + if not re.match(regex, self.options.cmptask): + msg = "%sError%s: Task name does not match the regular expression '%s'." % (colors.RED, colors.NORMAL, regex) + raise ConfigurationException(msg) + + SubCommand.validateOptions(self) + +class SubcommandExecution: + """ + Context manager to silence logging when calling a subcommand. + """ + + def __init__(self, logger, commandname): + self.handlerLevels = [] + self.logger = logger + self.commandname = commandname + + def __enter__(self): + self.logger.debug("%s - handlers1: %s", self.commandname, self.logger.handlers) + for h in self.logger.handlers: + self.handlerLevels.append(h.level) + h.setLevel(LOGLEVEL_MUTE) + + def __exit__(self, *exc): + for idx, h in enumerate(self.logger.handlers): + h.setLevel(self.handlerLevels[idx]) + self.logger.debug("%s - handlers2: %s", self.commandname, self.handlerLevels) + self.logger.debug("%s - handlers3: %s", self.commandname, self.logger.handlers) + +def findServerInstance(serverurl, dbinstance): + """ + given ServerUtilities.SERVICE_INSTANCES and (url,db instance) finds the "common" name + """ + result = None + for commonName, details in SERVICE_INSTANCES.items(): + if serverurl == details["restHost"] and dbinstance == details["dbInstance"]: + result = commonName + return result diff --git a/src/python/CRABClient/Commands/remake.py b/src/python/CRABClient/Commands/remake.py index 4f5fa408..57b4b72c 100644 --- a/src/python/CRABClient/Commands/remake.py +++ b/src/python/CRABClient/Commands/remake.py @@ -35,7 +35,7 @@ def remakecache(self,taskname): requestarea = taskname.split(":", 1)[1].split("_", 1)[1] cachepath = os.path.join(requestarea, '.requestcache') if os.path.exists(cachepath): - self.logger.info("%sError%s: %s not created, because it already exists." % (colors.RED, colors.NORMAL, cachepath)) + self.logger.info("%sWarning%s: %s not created, because it already exists." % (colors.RED, colors.NORMAL, cachepath)) elif not os.path.exists(requestarea): self.logger.info('Remaking %s folder.' % (requestarea)) try: diff --git a/src/python/CRABClient/Commands/report.py b/src/python/CRABClient/Commands/report.py index 18a50af7..1b62b166 100644 --- a/src/python/CRABClient/Commands/report.py +++ b/src/python/CRABClient/Commands/report.py @@ -292,7 +292,7 @@ def collectReportData(self): self.logger.debug("Result: %s" % dictresult) self.logger.info("Running crab status first to fetch necessary information.") # Get job statuses - statusDict = getMutedStatusInfo(self.logger) + statusDict = getMutedStatusInfo(logger=self.logger, proxy=self.proxyfilename) if not statusDict['jobList']: # No point in continuing if the job list is empty. @@ -362,7 +362,7 @@ def getLumisToProcess(self, userWebDirURL, jobs, workflow): fd.close() tarball.close() else: - self.logger.error("Failed to retrieve input dataset duplicate lumis.") + self.logger.error("Failed to retrieve run_and_lumis.tar.gz") return res diff --git a/src/python/CRABClient/Commands/submit.py b/src/python/CRABClient/Commands/submit.py index 66d7e615..cacc1ba2 100644 --- a/src/python/CRABClient/Commands/submit.py +++ b/src/python/CRABClient/Commands/submit.py @@ -119,7 +119,11 @@ def __call__(self): % (colors.RED, colors.NORMAL, non_edm_files) self.logger.warning(msg) + self.logger.debug("submit() - self.configuration {}".format(str(self.configuration))) + self.logger.debug("submit() - self.configreq {}".format(self.configreq)) + self.logger.debug("submit() - jobconfig {}".format(jobconfig)) self.configreq.update(jobconfig) + server = self.crabserver self.logger.info("Sending the request to the server at %s" % self.serverurl) @@ -245,7 +249,7 @@ def validateConfig(self): ## Load the external plugin or check that the crab plugin is valid. external_plugin_name = getattr(self.configuration.JobType, 'externalPluginFile', None) crab_plugin_name = getattr(self.configuration.JobType, 'pluginName', None) - crab_job_types = {'ANALYSIS': None, 'PRIVATEMC': None, 'COPYCAT': None} #getJobTypes() + crab_job_types = {'ANALYSIS': None, 'PRIVATEMC': None, 'COPYCAT': None, 'RECOVER': None} #getJobTypes() if external_plugin_name: addPlugin(external_plugin_name) # Do we need to do this here? if crab_plugin_name: @@ -253,8 +257,15 @@ def validateConfig(self): msg = "Invalid CRAB configuration: Parameter JobType.pluginName has an invalid value ('%s')." % (crab_plugin_name) msg += "\nAllowed values are: %s." % (", ".join(['%s' % job_type for job_type in crab_job_types.keys()])) return False, msg - msg = "Will use CRAB %s plugin" % ("Analysis" if crab_plugin_name.upper() == 'ANALYSIS' else "PrivateMC") - msg += " (i.e. will run %s job type)." % ("an analysis" if crab_plugin_name.upper() == 'ANALYSIS' else "a MC generation") + msg = "Will use CRAB plugin %s" % (crab_plugin_name.upper()) + if crab_plugin_name.upper() == 'ANALYSIS': + msg += " (i.e. will run an analysis job type)." + elif crab_plugin_name.upper() == 'PRIVATEMC': + msg += " (i.e. will run an a MC generation job type)." + elif crab_plugin_name.upper() == 'COPYCAT': + msg += " (i.e. will run a copy of a task. the exact job type will be defined later)." + elif crab_plugin_name.upper() == 'RECOVER': + msg += " (i.e. will run the recovery of a task. the exact job type will be defined later)." self.logger.debug(msg) ## Check that the requested memory does not exceed the allowed maximum. diff --git a/src/python/CRABClient/JobType/Recover.py b/src/python/CRABClient/JobType/Recover.py new file mode 100644 index 00000000..4c712c86 --- /dev/null +++ b/src/python/CRABClient/JobType/Recover.py @@ -0,0 +1,295 @@ +# this is an experimantal new feature introduced by Marco and never fully tested/used +# will worry about pylint if and when we decide to use it and look at details +#pylint: skip-file +""" +CopyCat job type plug-in +""" + +import os +import re +import math +import shutil +import string +import tempfile +from functools import reduce +from ast import literal_eval +import json +import hashlib +import tarfile +import ast +import json + +try: + from FWCore.PythonUtilities.LumiList import LumiList +except Exception: # pylint: disable=broad-except + # if FWCore version is not py3 compatible, use our own + from CRABClient.LumiList import LumiList + +from ServerUtilities import BOOTSTRAP_CFGFILE_DUMP, getProxiedWebDir, NEW_USER_SANDBOX_EXCLUSIONS +from ServerUtilities import SERVICE_INSTANCES + +import CRABClient.Emulator +from CRABClient import __version__ +from CRABClient.UserUtilities import curlGetFileFromURL +from CRABClient.ClientUtilities import colors, LOGGERS, getColumn, getJobTypes, DBSURLS +from CRABClient.JobType.UserTarball import UserTarball +from CRABClient.JobType.CMSSWConfig import CMSSWConfig +# from CRABClient.JobType._AnalysisNoUpload import _AnalysisNoUpload +from CRABClient.JobType.BasicJobType import BasicJobType +from CRABClient.ClientMapping import getParamDefaultValue +from CRABClient.JobType.LumiMask import getLumiList, getRunList +from CRABClient.ClientUtilities import bootstrapDone, BOOTSTRAP_CFGFILE, BOOTSTRAP_CFGFILE_PKL +from CRABClient.ClientExceptions import ClientException, EnvironmentException, ConfigurationException, CachefileNotFoundException +from CRABClient.Commands.SubCommand import ConfigCommand +from CRABClient.ClientMapping import parametersMapping, getParamDefaultValue +from ServerUtilities import uploadToS3, downloadFromS3 + + +class Recover(BasicJobType): + """ + CMSSW job type plug-in + + Access the configuration of the task that we are submitting with self.config + """ + + def initCRABRest(self): + """ + - self.crabserver is the destination, where to submit recovery task. already created for us. + - self.crabserverCopyOfTask is the source, where the failing task was submitted to + """ + serverFactory = CRABClient.Emulator.getEmulator('rest') + serverhost = SERVICE_INSTANCES.get(self.config.JobType.copyCatInstance) + self.crabserverCopyOfTask = serverFactory(hostname=serverhost['restHost'], localcert=self.proxyfilename, + localkey=self.proxyfilename, retry=2, logger=self.logger, + verbose=False, userAgent='CRABClient') + self.crabserverCopyOfTask.setDbInstance(serverhost['dbInstance']) + + # after having an instance of the server where the original task was submitted, + # we can now set the General.instance to be the destination server + self.config.General.instance = self.config.JobType.copyCatInstance + + def getTaskDict(self): + #getting information about the task + inputlist = {'subresource':'search', 'workflow': self.config.JobType.copyCatTaskname} + + dictret, _, _ = self.crabserverCopyOfTask.get(api='task', data=inputlist) + + task = {} + self.logger.debug(dictret) + task['username'] = getColumn(dictret, 'tm_username') + task['jobarch'] = getColumn(dictret, 'tm_job_arch') + task['jobsw'] = getColumn(dictret, 'tm_job_sw') + task['inputdata'] = getColumn(dictret, 'tm_input_dataset') + # crabclient send none to server and server confuse + if not task['inputdata']: + task.pop('inputdata') + + # it is a list in string format + task['edmoutfiles'] = ast.literal_eval(getColumn(dictret, 'tm_edm_outfiles')) + task['tfileoutfiles'] = ast.literal_eval(getColumn(dictret, 'tm_tfile_outfiles')) + task['addoutputfiles'] = ast.literal_eval(getColumn(dictret, 'tm_outfiles')) + task['userfiles'] = ast.literal_eval(getColumn(dictret, 'tm_user_files')) + + # use for download original task cache + task['cachefilename'] = getColumn(dictret, 'tm_user_sandbox') + task['debugfilename'] = getColumn(dictret, 'tm_debug_files') + + task['primarydataset'] = getColumn(dictret, 'tm_primary_dataset') + task['jobtype'] = getColumn(dictret, 'tm_job_type') + tmp = ast.literal_eval(getColumn(dictret, 'tm_split_args')) + task['runs'] = tmp['runs'] + task['lumis'] = tmp['lumis'] + #import pdb; pdb.set_trace() + tmp = json.loads(getColumn(dictret, 'tm_user_config')) + if tmp['inputblocks']: + task['inputblocks'] = tmp['inputblocks'] + + if task['jobtype'] == 'PrivateMC': + task['generator'] = getColumn(dictret, 'tm_generator') + + # if the original task has publication enabled, then the recovery task + # publishes in the same DBS dataset + failingTaskPublishName = getColumn(dictret, 'tm_publish_name') + # remove -0...0 from publish name, see https://github.com/dmwm/CRABServer/issues/4947 + failingTaskPublishName = failingTaskPublishName.replace("-00000000000000000000000000000000", "") + task['publishname2'] = failingTaskPublishName + # task['jobtype'] = getColumn(dictret, 'tm_publish_groupname') + + # this needs to be passed in recover.py, needs the full path + # task["scriptexe"] = getColumn(self.failingCrabDBInfo, 'tm_scriptexe') + + return task + + def run(self, filecacheurl = None): + """ + Override run() for JobType Recover. + + - 'addoutputfiles': [] -> removed + - 'tfileoutfiles': [] -> removed, ignoring for the time being + - 'edmoutfiles': ['output.root'], + + 'jobarch': 'el8_amd64_gcc11', 'jobsw': 'CMSSW_13_0_3', + 'inputdata': '/GenericTTbar/HC-CMSSW_9_2_6_91X_mcRun1_realistic_v2-v2/AODSIM', + 'cacheurl': 'https://cmsweb-test2.cern.ch/S3/crabcache_dev', + 'cachefilename': 'f1fed93419f0d25d8d7dd1b7331cff56f50376ebfe0c6c77daf9bfd6da6daade.tar.gz', + 'debugfilename': 'e435f28faecdb441796d2696b9b6f955108a0217ad602bca6114638272ab9a82.tar.gz', + 'jobtype': 'Analysis'} + """ + + self.initCRABRest() + jobInfoDict = self.getTaskDict() + + # reupload sandbox with new hash (from sandbox filename) + newCachefilename = "{}.tar.gz".format(hashlib.sha256(jobInfoDict['cachefilename'].encode('utf-8')).hexdigest()) + localPathCachefilename = os.path.join(self.config.JobType.copyCatWorkdir, "taskconfig", 'sandbox.tar.gz') + uploadToS3(crabserver=self.crabserver, objecttype='sandbox', filepath=localPathCachefilename, + tarballname=newCachefilename, logger=self.logger) + + newDebugfilename = "{}.tar.gz".format(hashlib.sha256(jobInfoDict['debugfilename'].encode('utf-8')).hexdigest()) + newDebugPath = os.path.join(self.workdir, newDebugfilename) + + copyOfTaskCrabConfig = os.path.join(self.config.JobType.copyCatWorkdir, "debug_sandbox", 'debug', 'crabConfig.py') + copyOfTaskPSet = os.path.join(self.config.JobType.copyCatWorkdir, "debug_sandbox", 'debug', 'originalPSet.py') + self.config.JobType.psetName = copyOfTaskPSet + debugFilesUploadResult = None + with UserTarball(name=newDebugPath, logger=self.logger, config=self.config, + crabserver=self.crabserver, s3tester=self.s3tester) as dtb: + + dtb.addMonFiles() + try: + debugFilesUploadResult = dtb.upload(filecacheurl = filecacheurl) + except Exception as e: + msg = ("Problem uploading debug_files.tar.gz.\nError message: %s.\n" + "More details can be found in %s" % (e, self.logger.logfile)) + LOGGERS['CRAB3'].exception(msg) #the traceback is only printed into the logfile + + + configreq = {'dryrun': 0} + for param in parametersMapping['on-server']: + default = parametersMapping['on-server'][param]['default'] + config_params = parametersMapping['on-server'][param]['config'] + for config_param in config_params: + attrs = config_param.split('.') + temp = self.config + for attr in attrs: + temp = getattr(temp, attr, None) + if temp is None: + break + if temp is not None: + configreq[param] = temp + break + elif default is not None: + configreq[param] = default + temp = default + else: + ## Parameter not strictly required. + pass + ## Translate boolean flags into integers. + if param in ['savelogsflag', 'publication', 'nonprodsw', 'useparent',\ + 'ignorelocality', 'saveoutput', 'oneEventMode', 'nonvaliddata', 'ignoreglobalblacklist',\ + 'partialdataset', 'requireaccelerator']: + configreq[param] = 1 if temp else 0 + ## Translate DBS URL aliases into DBS URLs. + elif param in ['dbsurl', 'publishdbsurl']: + if param == 'dbsurl': + dbstype = 'reader' + elif param == 'publishdbsurl': + dbstype = 'writer' + allowed_dbsurls = DBSURLS[dbstype].values() + allowed_dbsurls_aliases = DBSURLS[dbstype].keys() + if configreq[param] in allowed_dbsurls_aliases: + configreq[param] = DBSURLS[dbstype][configreq[param]] + elif configreq[param].rstrip('/') in allowed_dbsurls: + configreq[param] = configreq[param].rstrip('/') + elif param == 'scriptexe' and 'scriptexe' in configreq: + configreq[param] = os.path.basename(configreq[param]) + elif param in ['acceleratorparams'] and param in configreq: + configreq[param] = json.dumps(configreq[param]) + + configreq.update(jobInfoDict) + + ## RECOVER - set lumimask from crab report, not from original task - START + ## copied from Analysis.py + lumi_mask_name = getattr(self.config.Data, 'lumiMask', None) + lumi_list = None + if lumi_mask_name: + self.logger.debug("Attaching lumi mask %s to the request" % (lumi_mask_name)) + try: + lumi_list = getLumiList(lumi_mask_name, logger = self.logger) + except ValueError as ex: + msg = "%sError%s:" % (colors.RED, colors.NORMAL) + msg += " Failed to load lumi mask %s : %s" % (lumi_mask_name, ex) + raise ConfigurationException(msg) + run_ranges = getattr(self.config.Data, 'runRange', None) + if run_ranges: + run_ranges_is_valid = re.match('^\d+((?!(-\d+-))(\,|\-)\d+)*$', run_ranges) + if run_ranges_is_valid: + run_list = getRunList(run_ranges) + if lumi_list: + lumi_list.selectRuns(run_list) + if not lumi_list: + msg = "Invalid CRAB configuration: The intersection between the lumi mask and the run range is null." + raise ConfigurationException(msg) + else: + if len(run_list) > 50000: + msg = "CRAB configuration parameter Data.runRange includes %s runs." % str(len(run_list)) + msg += " When Data.lumiMask is not specified, Data.runRange can not include more than 50000 runs." + raise ConfigurationException(msg) + lumi_list = LumiList(runs = run_list) + else: + msg = "Invalid CRAB configuration: Parameter Data.runRange should be a comma separated list of integers or (inclusive) ranges. Example: '12345,99900-99910'" + raise ConfigurationException(msg) + if lumi_list: + configreq['runs'] = lumi_list.getRuns() + ## For each run we encode the lumis as a string representing a list of integers: [[1,2],[5,5]] ==> '1,2,5,5' + lumi_mask = lumi_list.getCompactList() + configreq['lumis'] = [str(reduce(lambda x,y: x+y, lumi_mask[run]))[1:-1].replace(' ','') for run in configreq['runs']] + ## RECOVER - set lumimask from crab report, not from original task - END + + + # new filename + configreq['cachefilename'] = newCachefilename + configreq['debugfilename'] = newDebugfilename + configreq['debugfilename'] = "%s.tar.gz" % debugFilesUploadResult + configreq['cacheurl'] = filecacheurl + + # pop + configreq.pop('username', None) + configreq.pop('workflow', None) + configreq.pop('vogroup', None) + # outputlfndirbase + configreq.pop('lfn', None) + configreq.pop('asyncdest', None) + + # optional pop + if getattr(self.config.Data, 'splitting', None): + configreq.pop('splitalgo', None) + if getattr(self.config.Data, 'totalUnits', None): + configreq.pop('totalunits', None) + if getattr(self.config.Data, 'unitsPerJob', None): + configreq.pop('algoargs', None) + if getattr(self.config.JobType, 'maxJobRuntimeMin', None): + configreq.pop('maxjobruntime', None) + if getattr(self.config.Data, 'publication', None) != None: + configreq.pop('publication', None) + if getattr(self.config.General, 'transferLogs', None) != None: + configreq.pop('savelogsflag', None) + + return '', configreq + + + def validateConfig(self, config): + """ + """ + # skip it all for now + valid, reason = self.validateBasicConfig(config) + if not valid: + return valid, reason + + return True, "Valid configuration" + + def validateBasicConfig(self, config): + """ + + """ + return True, "Valid configuration" diff --git a/src/python/CRABClient/UserUtilities.py b/src/python/CRABClient/UserUtilities.py index aacd9da2..f6fd71dd 100644 --- a/src/python/CRABClient/UserUtilities.py +++ b/src/python/CRABClient/UserUtilities.py @@ -193,12 +193,16 @@ def setConsoleLogLevel(lvl): for h in logging.getLogger('CRAB3.all').handlers: h.setLevel(lvl) -def getMutedStatusInfo(logger): +def getMutedStatusInfo(logger=None, proxy=None): """ Mute the status console output before calling status and change it back to normal afterwards. """ mod = __import__('CRABClient.Commands.status', fromlist='status') - cmdobj = getattr(mod, 'status')(logger) + cmdargs = [] + if proxy: + cmdargs.append("--proxy") + cmdargs.append(proxy) + cmdobj = getattr(mod, 'status')(logger=logger, cmdargs=cmdargs) loglevel = getConsoleLogLevel() setConsoleLogLevel(LOGLEVEL_MUTE) statusDict = cmdobj.__call__()