-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathworkflowlabelmaker.py
168 lines (123 loc) · 4.96 KB
/
workflowlabelmaker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python
"""
After a workflow enters *-archived status, we can make labels for them such
that comparisons with previous predictions can be made.
For now, the strategy is to get a list of workflows having the same prepID with
the interested workflow, and make label based on the following rule.
single workflow name ==> Good | 0
multiple workflow name:
has 'ACDC' in one of workflow names ==> Site Issue | 1
no 'ACDC' in any of workflow anmes ==> Workflow Issue | 2
no info available ==> Unknown | -1
Note: because ``get_json`` is used to get workflow prepid, environment variable
`X509_USER_PROXY` must point to a valid proxy.
"""
import json
import logging
import logging.config
import os
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from os.path import abspath, dirname, join
import yaml
from cmstoolbox.webtools import get_json
from workflowwrapper import Workflow, PrepID
from monitutils import (get_labeled_workflows, get_yamlconfig,
update_label_archive_db)
LOGGING_CONFIG = join(dirname(abspath(__file__)), 'config/configLogging.yml')
CONFIG_FILE_PATH = join(dirname(abspath(__file__)), 'config/config.yml')
logger = logging.getLogger("workflowmonitLogger")
rootlogger = logging.getLogger()
class No502WarningFilter(logging.Filter):
def filter(self, record):
return 'STATUS: 502' not in record.getMessage()
rootlogger.addFilter(No502WarningFilter())
def get_action_history(wfname):
result = []
wf = Workflow(wfname)
param = wf.get_reqparams()
prepid = str(param.get('PrepID', ''))
rqstatus = str(param.get('RequestStatus', ''))
if not prepid or not rqstatus: return result
if not ('completed' in rqstatus or 'archived' in rqstatus): return result
prepinfo = PrepID(prepid)
result = prepinfo.workflows
return result
def action_histories(wfnames):
"""query histories for a list of workflows
:param list wfnames: a list of workflow names
:returns: a dictionary with workflow name as key, associated list of workflow
names as value
:rtype: dict
"""
if not wfnames: return {}
with ThreadPoolExecutor(max_workers=min(500, len(wfnames))) as executor:
futures = {executor.submit(get_action_history, wf): wf for wf in wfnames}
histories = dict()
for future in as_completed(futures):
wf = futures[future]
try:
histories[wf] = future.result()
except Exception as e:
print(f"Fail to get history for {wf}; Msg: {str(e)}")
return histories
def create_label(wf, wfnames):
"""From a list of workflow names (``wfnames``) assocaited with the same prepid, infer the
category(label) for the mother workflow (``wf``)
:param str wf: mother workflow
:param list wfnames: list of workflow names
:returns: integer code as label: 0: good, 1: site issue, 2: workflow issue, -1: unknown
:rtype: int
"""
res = -1
if not wfnames: return res
if len(wfnames) == 1 and wfnames[0] == wf:
res = 0
if len(wfnames) > 1:
hasACDC = any(map(lambda name: 'ACDC' in name, wfnames))
if hasACDC:
res = 1
else:
res = 2
return res
def label_workflows(wfnames):
"""make labels for a list of workflows
:param list wfnames: workflow names
:returns: labels keyed by workflow name
:rtype: dict
"""
wflabelmap = {}
if not wfnames: return wflabelmap
actionHistories = action_histories(wfnames)
for wf in actionHistories:
wflabelmap[wf] = create_label(wf, actionHistories[wf])
return wflabelmap
def updateLabelArchives(wfnames, configpath=CONFIG_FILE_PATH):
"""Given a list of workflownames, make labels for those that has not been
labelled before, and update db
:param list wfnames: list of workflow names
:param str configpath: path of config yml contains db connection info
"""
config = get_yamlconfig(configpath)
labeled_ = get_labeled_workflows(config)
workflowstoquery = [w for w in wfnames if w not in labeled_]
logger.info("Making labels for {} workflows...".format(len(workflowstoquery)))
values = list(label_workflows(workflowstoquery).items())
update_label_archive_db(config, values)
def test():
with open(LOGGING_CONFIG, 'r') as f:
config = yaml.safe_load(f.read())
logging.config.dictConfig(config)
# get archived workflows from `/models/prediction_history.json`
db = json.load(open('models/prediction_history.json'))
wfupdated_ = db["updatedWorkflows"]
wfall_ = list(db["workflowData"].keys())
wfarchived_ = [w for w in wfall_ if w not in wfupdated_]
import time
from pprint import pprint
timestart = time.time()
labels = label_workflows(wfarchived_)
pprint(labels)
print("-----> took ", time.time() - timestart, 's')
if __name__ == "__main__":
test()