Skip to content

Commit

Permalink
Merge pull request #309 from brianhlin/stable.glue-validation-fixes
Browse files Browse the repository at this point in the history
Glue validation fixes (stable)
  • Loading branch information
brianhlin authored Mar 5, 2020
2 parents 6f5e1b0 + e55023c commit 1032f7a
Showing 1 changed file with 119 additions and 54 deletions.
173 changes: 119 additions & 54 deletions contrib/bdii/htcondor-ce-provider
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
#!/usr/bin/env python

"""
GIP provider for htcondor-CE.
"""

# we cannot change the name of the script.
# pylint: disable=invalid-name


from __future__ import print_function
import sys
from datetime import datetime
Expand All @@ -8,7 +16,8 @@ from collections import defaultdict
import signal
import htcondor

service_ldif = """dn: {bind_dn}

SERVICE_LDIF = """dn: {bind_dn}
GLUE2ServiceID: {central_manager}
objectClass: GLUE2Entity
objectClass: GLUE2Service
Expand All @@ -21,7 +30,8 @@ GLUE2ServiceComplexity: endpointType={num_endpoints}, share={num_shares}, resour
GLUE2ServiceAdminDomainForeignKey: {site_name}
"""

manager_ldif = """dn: GLUE2ManagerID={central_manager}_Manager,{bind_dn}

MANAGER_LDIF = """dn: GLUE2ManagerID={central_manager}_Manager,{bind_dn}
objectClass: GLUE2Entity
objectClass: GLUE2Manager
objectClass: GLUE2ComputingManager
Expand All @@ -33,7 +43,8 @@ GLUE2ManagerServiceForeignKey: {central_manager}
GLUE2ComputingManagerComputingServiceForeignKey: {central_manager}
"""

resource_ldif = """dn: GLUE2ResourceID={central_manager}_{resource},{bind_dn}

RESOURCE_LDIF = """dn: GLUE2ResourceID={central_manager}_{resource},{bind_dn}
objectClass: GLUE2Entity
objectClass: GLUE2Resource
objectClass: GLUE2ExecutionEnvironment
Expand All @@ -53,7 +64,8 @@ GLUE2ResourceManagerForeignKey: {central_manager}_Manager
GLUE2ExecutionEnvironmentComputingManagerForeignKey: {central_manager}_Manager
"""

endpoint_ldif = """dn: GLUE2EndpointID={name}_HTCondorCE,{bind_dn}

ENDPOINT_LDIF = """dn: GLUE2EndpointID={name}_HTCondorCE,{bind_dn}
objectClass: GLUE2Entity
objectClass: GLUE2Endpoint
objectClass: GLUE2ComputingEndpoint
Expand All @@ -74,7 +86,9 @@ GLUE2EndpointDowntimeInfo: See the GOC DB for downtimes: https://goc.egi.eu/
GLUE2EndpointServiceForeignKey: {central_manager}
GLUE2ComputingEndpointComputingServiceForeignKey: {central_manager}
"""
share_ldif = """dn: GLUE2ShareID={shareid},{bind_dn}


SHARE_LDIF = """dn: GLUE2ShareID={shareid},{bind_dn}
objectClass: GLUE2Entity
objectClass: GLUE2Share
objectClass: GLUE2ComputingShare
Expand All @@ -90,27 +104,50 @@ GLUE2ShareEndpointForeignKey: {endpointid}
{resource_keys}
"""

policy_ldif = """dn: GLUE2PolicyID={policyid},GLUE2ShareID={shareid},{bind_dn}

POLICY_LDIF = """dn: GLUE2PolicyID={policyid},GLUE2ShareID={shareid},{bind_dn}
objectClass: GLUE2Entity
objectClass: GLUE2Policy
objectClass: GLUE2MappingPolicy
GLUE2PolicyID: {policyid}
GLUE2PolicyScheme: org.glite.standard
GLUE2PolicyRule: vo:{vo}
GLUE2MappingPolicyShareForeignKey: {shareid}
dn: GLUE2PolicyID={policyid}_access,GLUE2ShareID={shareid},{bind_dn}
objectClass: GLUE2Entity
objectClass: GLUE2Policy
objectClass: GLUE2AccessPolicy
GLUE2PolicyID: {policyid}_access
GLUE2PolicyScheme: org.glite.standard
GLUE2PolicyRule: vo:{vo}
GLUE2AccessPolicyEndpointForeignKey: {endpointid}
"""


class TimeoutError(Exception):
"""
Dummy timeout exception class.
"""
pass


# pylint: disable=unused-argument
def handler(signum, frame):
"""
Handler for timeout signal.
"""
raise TimeoutError("TimeoutError")


def main():
"""
Main provider routine.
"""

# Get hostname of the batch system central manager
central_manager = htcondor.param.get('COLLECTOR_HOST')
central_manager = htcondor.param.get('COLLECTOR_HOST').split(' ')[0]


# Get VO Names
vonames = htcondor.param.get('HTCONDORCE_VONames')
Expand All @@ -119,80 +156,97 @@ def main():
sys.exit(1)
vonames = vonames.split(',')


# Get Site Name
site_name = htcondor.param.get('GLUE2DomainID')
if not site_name:
sys.stderr.write("Error: GLUE2DomainID: not set\n")
sys.exit(1)


# Get the timeout value
time_out = htcondor.param.get('GLUE_PROVIDER_TIMEOUT')
if not time_out:
time_out = 10


# This is the bind DN for all entries
bind_dn = "GLUE2ServiceID=%s,GLUE2GroupID=resource,o=glue" % (central_manager)


# Query collector for the number of CPUs and batch system Collector ad
coll = htcondor.Collector()
total_cores = {}
topologies = {}
for ad in coll.query(htcondor.AdTypes.Startd, 'State=!="Owner"', ['Arch', 'OpSys', 'OpSysMajorVer', 'OpSysName',
'DetectedCpus', 'DetectedMemory', 'Machine']):
if not ad.get('Machine'):
for classad in coll.query(htcondor.AdTypes.Startd, 'State=!="Owner"',
['Arch', 'OpSys', 'OpSysMajorVer', 'OpSysName',
'DetectedCpus', 'DetectedMemory', 'Machine']):
if not classad.get('Machine'):
continue # skip malformed ads where we can't provide additional information

try:
if ad['Machine'] not in total_cores:
total_cores[ad['Machine']] = ad['DetectedCpus']

k = (ad['Arch'],
ad['OpSys'],
ad['OpSysName'],
ad['OpSysMajorVer'],
ad['DetectedCpus'],
ad['DetectedMemory'])
if k not in topologies:
topologies[k] = 1
if classad['Machine'] not in total_cores:
total_cores[classad['Machine']] = classad['DetectedCpus']

machine = (
classad['Arch'].lower(),
classad['OpSys'].lower(),
classad['OpSysName'],
classad['OpSysMajorVer'],
classad['DetectedCpus'],
classad['DetectedMemory']
)
if machine not in topologies:
topologies[machine] = 1
else:
topologies[k] += 1
topologies[machine] += 1
except KeyError, exc:
msg = "Malformed machine ad: Missing '{0}' attribute for {1}".format(exc, ad['Machine'])
msg = "Malformed machine ad: Missing '{0}' attribute for {1}"\
.format(exc, classad['Machine'])
sys.stderr.write(msg)

resources = []


# Print the entry for the GLUE2 Resource
for tup in topologies:
resource = '{0}_{1}_{2}_{3}_{4}_{5}'.format(tup[0], tup[1], tup[2], tup[3], tup[4], tup[5])
instances = topologies[tup]
resources.append(resource)
print (resource_ldif.format(

print (RESOURCE_LDIF.format(
central_manager=central_manager,
resource=resource,
arch=tup[0],
os=tup[1],
name=tup[2],
version=tup[3],
memory=tup[4],
cpu=tup[5],
cpu=tup[4],
memory=tup[5],
bind_dn=bind_dn,
instances=instances,
))
coll_ad = coll.query(htcondor.AdTypes.Collector)[0] # the pool collector ad
version = coll_ad['CondorVersion'].split()[1]


# Print the entry for the GLUE2 Manager
print (manager_ldif.format(
print (MANAGER_LDIF.format(
central_manager=central_manager,
bind_dn=bind_dn,
version=version,
total_cores=sum(total_cores.values()),
))

ce_batch_schedd_ads = coll.query(htcondor.AdTypes.Schedd, 'HAS_HTCONDOR_CE =?= True', ['Machine'])
ce_batch_schedd_ads = coll.query(
htcondor.AdTypes.Schedd,
'HAS_HTCONDOR_CE =?= True',
['Machine']
)


# Print the entry for the GLUE2 Service
print (service_ldif.format(
print (SERVICE_LDIF.format(
central_manager=central_manager,
bind_dn=bind_dn,
num_endpoints=len(ce_batch_schedd_ads),
Expand All @@ -203,16 +257,21 @@ def main():
for ce_batch_schedd_ad in ce_batch_schedd_ads:

ce_host = ce_batch_schedd_ad['Machine']
ce_collector = htcondor.Collector(ce_host + ':9619') # find the CE using the default CE port
# find the CE using the default CE port
ce_collector = htcondor.Collector(ce_host + ':9619')
try:
ce_schedd_ad = ce_collector.query(htcondor.AdTypes.Schedd, 'Name =?= "{0}"'.format(ce_host))[0]
ce_schedd_ad = ce_collector.query(
htcondor.AdTypes.Schedd,
'Name =?= "{0}"'.format(ce_host)
)[0]

except (RuntimeError, IndexError):
sys.stderr.write("Unable to locate CE schedd on %s\n" % ce_host)
continue
except EnvironmentError:
sys.stderr.write("Failed communication with CE collector on %s\n" % ce_host)
continue

signal.signal(signal.SIGALRM, handler)
signal.alarm(time_out)
try:
Expand All @@ -224,10 +283,11 @@ def main():
except (KeyError, RuntimeError):
state = 'critical'
state_info = 'Authorization ping failed'
except TimeoutError, e:
sys.stderr.write("Ping to CE schedd on %s timed out after %i s.\n" % (ce_host, time_out) )
except TimeoutError as exc:
sys.stderr.write("Ping to CE schedd on %s timed out after %i s.\n"
% (ce_host, time_out))
continue

signal.signal(signal.SIGALRM, signal.SIG_IGN)

ce_schedd = htcondor.Schedd(ce_schedd_ad)
Expand All @@ -253,21 +313,23 @@ def main():
idle_vo_jobs[job['x509userproxyvoname']] += 1
elif job['JobStatus'] == 2:
running_vo_jobs[job['x509userproxyvoname']] += 1
except TimeoutError, e:
sys.stderr.write("CE schedd on %s timed out after %i s.\n" % ce_host, time_out )
except TimeoutError as exc:
sys.stderr.write("CE schedd on %s timed out after %i s.\n" % ce_host, time_out)
continue

signal.signal(signal.SIGALRM, signal.SIG_IGN)

cmd = ['/usr/bin/openssl', 'x509', '-noout', '-issuer', '-nameopt', 'RFC2253', '-in',
'/etc/grid-security/hostcert.pem']
cp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
issuer = cp.communicate()[0].replace('issuer=', '').strip()
cmd_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
issuer = cmd_proc.communicate()[0].replace('issuer=', '').strip()

name = ce_schedd_ad['Name']
start_time = datetime.fromtimestamp(int(ce_schedd_ad['DaemonStartTime'])).strftime('%Y-%m-%dT%H:%M:%SZ')
start_time = datetime.fromtimestamp(
int(ce_schedd_ad['DaemonStartTime'])
).strftime('%Y-%m-%dT%H:%M:%SZ')

print (endpoint_ldif.format(
print (ENDPOINT_LDIF.format(
name=name,
bind_dn=bind_dn,
version=version,
Expand All @@ -278,18 +340,20 @@ def main():
central_manager=central_manager,
))

for vo in vonames:
vo = vo.strip()
shareid = "%s_%s_share" % (ce_host, vo)
for voname in vonames:
voname = voname.strip()
shareid = "%s_%s_share" % (ce_host, voname)
endpointid = "%s_HTCondorCE" % (ce_host)
total_jobs = total_vo_jobs.get(vo, 0)
idle_jobs = idle_vo_jobs.get(vo, 0)
running_jobs = running_vo_jobs.get(vo, 0)
total_jobs = total_vo_jobs.get(voname, 0)
idle_jobs = idle_vo_jobs.get(voname, 0)
running_jobs = running_vo_jobs.get(voname, 0)
resource_keys = ""
for resource in resources:
resource_keys += 'GLUE2ComputingShareExecutionEnvironmentForeignKey: {0}_{1}\n'.format(central_manager, resource)
resource_keys += 'GLUE2ShareResourceForeignKey: {0}_{1}\n'.format(central_manager, resource)
print (share_ldif.format(
resource_keys += 'GLUE2ComputingShareExecutionEnvironmentForeignKey: {0}_{1}\n'\
.format(central_manager, resource)
resource_keys += 'GLUE2ShareResourceForeignKey: {0}_{1}\n'\
.format(central_manager, resource)
print (SHARE_LDIF.format(
shareid=shareid,
bind_dn=bind_dn,
total_vo_jobs=total_jobs,
Expand All @@ -300,12 +364,13 @@ def main():
resource_keys=resource_keys,
))

policyid = "%s_%s_policy" % (ce_host, vo)
print (policy_ldif.format(
policyid = "%s_%s_policy" % (ce_host, voname)
print (POLICY_LDIF.format(
policyid=policyid,
shareid=shareid,
bind_dn=bind_dn,
vo=vo,
vo=voname,
endpointid=endpointid
))

if __name__ == '__main__':
Expand Down

0 comments on commit 1032f7a

Please sign in to comment.