Skip to content

Commit

Permalink
Fix checkdataset 5263 (#5272)
Browse files Browse the repository at this point in the history
* fix broken printout. fix #5263

* print tape location

* properly deal with non-existing dataset

* add DBS info
  • Loading branch information
belforte authored Dec 4, 2023
1 parent c91e6bd commit abfb608
Showing 1 changed file with 81 additions and 8 deletions.
89 changes: 81 additions & 8 deletions src/python/CRABClient/Commands/checkdataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@

from __future__ import print_function, division

import sys
import json

from CRABClient.Commands.SubCommand import SubCommand
from CRABClient.ClientUtilities import execute_command, colors
from CRABClient.ClientUtilities import commandUsedInsideCrab
from CRABClient.ClientExceptions import MissingOptionException
from CRABClient.ClientExceptions import MissingOptionException, ConfigurationException
from CRABClient.RestInterfaces import getDbsREST

if sys.version_info >= (3, 0):
from urllib.parse import urlencode
if sys.version_info < (3, 0):
from urllib import urlencode

class checkdataset(SubCommand):
"""
Expand All @@ -25,15 +32,20 @@ def __init__(self, logger, cmdargs=None):
SubCommand.__init__(self, logger, cmdargs)
self.dataset = None
self.interactive = False

self.dbsInstance = None

def __call__(self):

self.dataset = self.options.dataset
self.dbsInstance = self.options.dbsInstance
self.interactive = commandUsedInsideCrab()

status, size = self.checkDatasetInDBS()
self.logger.info("dataset status in DBS is: %s", status)
self.logger.info("dataset size is: %s", size)

if not self.rucio:
self.logger.error("Rucio client not available with this CMSSW version")
self.logger.error("Rucio client not available with this CMSSW version. Can't check disk availability")
return {'commandStatus': 'FAILED'}

containerScope, containerName, blocks = self.getInputs()
Expand All @@ -45,13 +57,14 @@ def __call__(self):
print("Checking blocks availabiliyt on disk ...")
else:
self.logger.info("Check blocks availabiliyt on disk")
locationsMap = self.createLocationsMap(blocks)
tapeSites, locationsMap = self.createLocationsMap(blocks)
self.logger.info("dataset is on tape at: %s", tapeSites)
(nbFORnr, nbFORrse) = self.createBlockMaps(locationsMap=locationsMap, blackList=[])
self.printBlocksPerReplicaMap(nbFORnr)

self.logger.info("\n Block locations:")
for rse in nbFORrse.keys():
msg = " %20s hosts %3s blocks" % (rse, nbFORrse)
msg = " %20s hosts %3s blocks" % (rse, nbFORrse[rse])
if rse in blackListedSites:
msg += " *SITE BLACKLISTED IN CRAB*"
self.logger.info(msg)
Expand All @@ -74,6 +87,50 @@ def __call__(self):

return {'commandStatus': 'SUCCESS'}

def checkDatasetInDBS(self):
""" return status and size (including unit) of the dataset as strings """

status = 'NotFound'
size = 0

dbsReader, _ = getDbsREST(instance=self.dbsInstance, logger=self.logger,
cert=self.proxyfilename, key=self.proxyfilename)
datasetName = self.dataset.split(':')[-1]
isBlock = datasetName if '#' in datasetName else None
if not isBlock:
query = {'dataset': datasetName, 'dataset_access_type': '*', 'detail': True}
ds, rc, msg = dbsReader.get(uri="datasets", data=urlencode(query))
if not ds:
self.logger.error("ERROR: %s not found in DBS", datasetName)
return status, size
else:
query = {'block_name': datasetName}
bs, rc, msg = dbsReader.get(uri="blocks", data=urlencode(query))
if not bs:
self.logger.error("ERROR: %s not found in DBS", datasetName)
return status, size
self.logger.debug('exitcode= %s', rc)
if rc != 200: # this is HTTP code. 200=OK
self.logger.error("Error trying to talk with DBS:\n%s", msg)
return status, size
# OK, dataset/block found in DBS
if isBlock:
status = 'EXISTS' # only datasets and files have status in DBS
query = {'block_name': datasetName}
else:
status = ds[0]['dataset_access_type']
query = {'dataset': datasetName}
fs, rc, msg = dbsReader.get(uri='filesummaries', data=urlencode(query))
byteSize = fs[0]['file_size']
gBytes = byteSize / 1000. / 1000.
if gBytes > 10000:
size = "%1.f TB" % (gBytes / 1000.)
else:
size = "%s GB" % int(gBytes)

return status, size


def getInputs(self):
"""
get name of object to test and check what it is
Expand Down Expand Up @@ -103,8 +160,13 @@ def getInputs(self):
self.logger.info("Input is a DBS-block (Rucio-dataset) will check that one")
blocks = [name]
else:
dss = self.rucio.list_content(scope=scope, name=name)
blocks = [ds['name'] for ds in dss]
from rucio.common.exception import DataIdentifierNotFound
try:
dss = self.rucio.list_content(scope=scope, name=name)
blocks = [ds['name'] for ds in dss]
except DataIdentifierNotFound:
self.logger.error('Dataset not found in Rucio')
blocks = []
self.logger.info("dataset has %d blocks", len(blocks))

return containerScope, containerName, blocks
Expand Down Expand Up @@ -166,6 +228,7 @@ def createLocationsMap(self, blocks):
locationsMap content. Makes it easier to read, debug, improve
"""
locationsMap = {}
tapeSites = set()
nb = 0
for blockName in blocks:
nb += 1
Expand All @@ -175,6 +238,7 @@ def createLocationsMap(self, blocks):
response = self.rucio.list_dataset_replicas(scope='cms', name=blockName, deep=True)
for item in response:
if 'Tape' in item['rse']:
tapeSites.add(item['rse'])
continue # skip tape locations
if 'T3_CH_CERN_OpenData' in item['rse']:
continue # ignore OpenData until it is accessible by CRAB
Expand All @@ -183,7 +247,7 @@ def createLocationsMap(self, blocks):
locationsMap[blockName] = replicas
if self.interactive:
print("")
return locationsMap
return tapeSites, locationsMap

def printBlocksPerReplicaMap(self, nbFORnr):
"""
Expand Down Expand Up @@ -213,6 +277,10 @@ def setOptions(self):
dest='dataset',
default=None,
help='dataset of block ID or Rucio DID (scope:name)')
self.parser.add_option('--dbs-instance', dest='dbsInstance', default='prod/global',
help="DBS instance. e.g. prod/global (default) or prod/phys03 or full URL."
+ "\nUse at your own risk only if you really know what you are doing"
)

def validateOptions(self):
SubCommand.validateOptions(self)
Expand All @@ -224,3 +292,8 @@ def validateOptions(self):
ex = MissingOptionException(msg)
ex.missingOption = "dataset"
raise ex
dbsInstance = self.options.dbsInstance
if '/' not in dbsInstance or len(dbsInstance.split('/')) > 2 and not dbsInstance.startswith('https://'):
msg = "Bad DBS instance value %s. " % dbsInstance # pylint: disable=consider-using-f-string
msg += "Use either server/db format or full URL"
raise ConfigurationException(msg)

0 comments on commit abfb608

Please sign in to comment.