-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
496 lines (439 loc) · 18.1 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
#!/bin/env python3
import argparse
import collections
import csv
import datetime
import logging
import os
import pandas
import pathlib
import pprint
import pyexch.pyexch
import libdate
import libgroup
# Hash to hold module level data
resources = {}
def get_args():
if 'args' not in resources:
constructor_args = {
'formatter_class': argparse.RawTextHelpFormatter,
'description': 'triage duty scheduler',
'epilog': '''
ENVIRONMENT VARIABLES:
TRIAGE_STAFF_FILE: Path to a file containing staff
File format is CSV with headers "name", "email", "type",
where "type" is one of 'staff', 'manager'.
TRIAGE_LOCATION_FILE: Path to a file containing a URL for an online meeting
Used in the "location" field in newly created exchange calendar events
TRIAGE_HOLIDAYS_FILE: Path to a file containing a list of holidays to be excluded from scheduling
File format is CSV with one header "date"
Dates should be in the form of YYYY-MM-DD
OAUTH_CONFIG_FILE: Path to the pyexch config file
(See: https://github.com/andylytical/pyexch)
OAUTH_TOKEN_FILE: Path to the pyexch token file
(See: https://github.com/andylytical/pyexch)
NETRC: Alternate path to a netrc file (default is ~/.netrc)
(See also: https://github.com/andylytical/pyexch)
PYEXCH_REGEX_JSON: Alternate regex for matching existing calendar events
(See also: https://github.com/andylytical/pyexch)
'''
}
parser = argparse.ArgumentParser( **constructor_args )
parser.add_argument( '-d', '--debug', action='store_true' )
parser.add_argument( '-n', '--dryrun', action='store_true',
help='Show what would be done but make no changes.',
)
parser.add_argument( '--location_file',
help='Override TRIAGE_LOCATION_FILE environment variable.',
)
parser.add_argument( '--start', help='Start date (default: today).' )
parser.add_argument( '--end', help='End date (default: start + 90 days).' )
# List Options
g_list = parser.add_argument_group( title='List Duty Teams' )
g_list.add_argument( '-l', '--list_teams', action='store_true',
help=(
'List triage teams (built from staff list) with indexes.'
'\nUse one of these indices with --start_at.'
),
)
# Triage Schedule
g_triage = parser.add_argument_group(
title='Triage Schedule',
description=(
'For all work days from START to END,'
'\ncreate a triage event on the calendar,'
'\nassigned to the next duty team.'
'\nUse --list_teams to see the duty teams.'
'\nUse --start_at to start scheduling at a specific duty team.'
),
)
g_triage.add_argument( '--mktriage', action='store_true',
help='Make triage schedule.',
)
g_triage.add_argument( '--triage_report', action='store_true',
help='Report on existing triage events between START and END.',
)
g_triage.add_argument( '--start_at',
type=int,
default=0,
help=(
'Integer index into triage teams.'
'\nStart scheduling with the specified triage team.'
'\nUse the --list_teams option to see the team list and indices.'
),
)
g_triage.add_argument( '--staff_file',
help='Override TRIAGE_STAFF_FILE environment variable.',
)
# Handoff Events
g_handoff = parser.add_argument_group(
title='Handoff Events',
description=(
'For all work days from START to END,'
'\nget existing Triage and Handoff events from the calendar,'
'\nmake new Handoff events where needed,'
'\nupdate existing Handoff events if the current members'
' do not match the associated Triage event members.'
),
)
g_handoff.add_argument( '--mkhandoff', action='store_true',
help='Make or update triage handoff events using data from existing triage events.'
)
args = parser.parse_args()
resources['args'] = args
# set sane default for start
if args.start:
# new_start = dateutil.parser.parse( args.start )
new_start = pandas.to_datetime( args.start )
args.start = new_start
else:
args.start = datetime.date.today()
# set sane default for end
if args.end:
# new_end = dateutil.parser.parse( args.end )
new_end = pandas.to_datetime( args.end )
args.end = new_end
else:
args.end = args.start + datetime.timedelta( days=90 )
return resources['args']
def get_regex_map():
return {
"TRIAGE":"^Triage: ",
"HANDOFF":"^Triage Hand-Off",
}
def get_pyexch():
if 'pyexch' not in resources:
regex_map = get_regex_map()
resources['pyexch'] = pyexch.pyexch.PyExch( regex_map = regex_map )
return resources['pyexch']
def get_staff_data():
''' Read in the multi-purpose TRIAGE_STAFF_FILE
Reads the CSV file and stores a dict
'''
key = 'staffdata'
if key not in resources:
filename = os.getenv( 'TRIAGE_STAFF_FILE', get_args().staff_file )
if not filename:
raise UserWarning( 'Missing staff file. Use --staff_file or TRIAGE_STAFF_FILE' )
df = pandas.read_csv( filename, sep=None, engine='python' )
resources[key] = { row.Name: row for row in df.itertuples() }
return resources[key]
def get_staff():
key = 'staff'
if key not in resources:
data = get_staff_data()
resources[key] = { k: v for k,v in data.items() if v.Type == 'staff' }
return resources[key]
def get_managers():
key = 'managers'
if key not in resources:
data = get_staff_data()
resources[key] = { k: v for k,v in data.items() if v.Type == 'manager' }
return resources[key]
def get_MODs( date ):
''' Given a date, return the Managers On Duty for that day
'''
key = 'mod'
if key not in resources:
# Create a mapping of int-day-of-week -> Manager(s)
daychars = ( 'M', 'T', 'W', 'R', 'F', )
resources[key] = [ [], [], [], [], [], ]
for name,mgr in get_managers().items():
for dow_char in mgr.DOW:
dow_int = daychars.index( dow_char )
resources[ key ][ dow_int ].append( mgr )
# do the lookup
return resources[ key ][ date.weekday() ]
def get_triage_location():
if 'triage_location' not in resources:
l_file = os.getenv( 'TRIAGE_LOCATION_FILE', get_args().location_file )
if not l_file:
raise UserWarning( 'Missing location file. Use --location_file or TRIAGE_LOCATION_FILE' )
p = pathlib.Path( l_file )
location = p.read_text()
if len(location) < 1:
raise UserWarning( f"Unable to read location from file '{l_file}'" )
resources['triage_location'] = location
return resources['triage_location']
def get_triage_categories():
if 'triage_categories' not in resources:
resources['triage_categories'] = [ 'TicketMaster' ]
return resources['triage_categories']
def get_existing_events():
''' Get existing events between "start" and "end"
start = datetime.date
end = datetime.date
'''
start = get_args().start
end = get_args().end
logging.debug( pprint.pformat( [ start, end ] ) )
px = get_pyexch()
# ensure end is 11:59:59 PM
existing_events = px.get_events_filtered(
start = datetime.datetime( start.year, start.month, start.day ),
end = datetime.datetime( end.year, end.month, end.day, hour=11, minute=59, second=59 ),
)
# logging.debug( f'Existing events: { [ (e.start, e.type, e.subject) for e in existing_events ] }' )
for e in existing_events:
logging.debug( f'{e.start} {e.type} {e.subject}' )
# create hash of event dates & types
current_events = {}
for e in existing_events:
dt = e.start.date()
if dt not in current_events:
current_events[dt] = {}
current_events[dt][e.type] = e
return current_events
def validate_user_input():
''' Check that all user input is present and valid
'''
location = get_triage_location() #will fail if location is not provided
logging.debug( f"Triage Location: '{location}'" )
get_staff() #will throw error in case of a problem
args = get_args()
if args.mktriage:
logging.debug( f"MKTRIAGE" )
if args.mkhandoff:
logging.debug( f"MKHANDOFF" )
def create_triage_meetings( mtg_data ):
''' Use mtg_data to create meetings iff they don't already exist
mtg_data = {
datetime: {
'emails': list of email addrs,
'members': Names of attendees,
}
)
'''
# get existing events
args = get_args()
existing_events = get_existing_events()
# try to create events for dates from csv
for dt, data in mtg_data.items():
try:
# dt is a datetime, use just the date component to match existing event
ev = existing_events[dt.date()]['TRIAGE']
except KeyError:
ev = None
create_or_update_triage_event(
date = dt,
emails = data['emails'],
members = data['members'],
existing_event = ev
)
def create_or_update_triage_event( date, emails, members, existing_event=None ):
''' date = datetime for new event
emails = list of email addresses
members = list of names (used in the event title)
existing_event = raw exchange event
'''
if existing_event:
logging.info( f'Found existing TRIAGE event for date "{date}"' )
#logging.debug( f'Existing Event: {existing_event}' )
else:
subj = f"Triage: {', '.join(members)}"
logging.info( f'Making new TRIAGE event for date "{date}"' )
args = get_args()
if args.dryrun:
logging.info( f'DRYRUN: Subj:"{subj}" Attendees:"{emails}"' )
else:
get_pyexch().new_all_day_event(
date = date,
subject = subj,
attendees = emails,
location = get_triage_location(),
categories = get_triage_categories(),
free = True,
)
def events_by_type( types ):
''' Map of event lists by type.
Return: dictionary with keys=DATE and values={ TYPE: event_list }
Effectively a filter on existing events.
'''
args = get_args()
# Get all existing TRIAGE & HANDOFF events from Exchange calendar
existing_events = get_existing_events()
# current_events[dt][e.type] = e
content = {}
for date, sub in existing_events.items():
content[date] = {}
for typ, val in sub.items():
if typ in types:
content[date][typ] = val
return content
def meeting_attendees( event ):
''' Get a list of emails for all meeting attendees from event,
where event is a "simple_event" from pyexch
'''
# Required_attendees=[ Attendee(), ...]
# where Attendee( mailbox=Mailbox(), ...)
# and where Mailbox( email_address='...', ...)
# thus, emails=[ a.mailbox.email_address for a in required_attendees ]
return [ a.mailbox.email_address for a in event.raw_event.required_attendees ]
def create_handoff_meetings():
args = get_args()
# Get all existing TRIAGE & HANDOFF events from Exchange calendar
existing_events = get_existing_events()
# For each day there is a TRIAGE event,
# get the required_attendees from both this and the next TRIAGE event
triage_dates = sorted( existing_events.keys() )
loop_end = len( triage_dates ) - 1
for i in range( loop_end ):
# get members from current triage event
curr_date = triage_dates[i]
curr_triage_event = existing_events[ curr_date ][ 'TRIAGE' ]
curr_members = meeting_attendees( curr_triage_event )
logging.debug( f'{curr_triage_event.start} {curr_triage_event.subject}, {curr_members}' )
# get members from next triage event
try:
next_date = triage_dates[i+1]
except KeyError:
logging.error( f'Next date not found, after curr_date: "{curr_date}"' )
raise
try:
next_triage_event = existing_events[ next_date ][ 'TRIAGE' ]
next_members = meeting_attendees( next_triage_event )
except KeyError:
logging.error( f'No event data found after date: "{curr_date}"' )
raise
except TypeError:
logging.error( f'Are there Required Attendees for the triage meeting on "{next_date}"?' )
raise
handoff_date = next_date
managers_on_duty = [ m.Email for m in get_MODs( handoff_date ) ]
logging.debug( f'MODs for {handoff_date}: {managers_on_duty}' )
handoff_members = curr_members + next_members + managers_on_duty
logging.debug( f'Calculated handoff data: {handoff_date} {handoff_members}' )
# get existing handoff event, if it exists
try:
handoff_event = existing_events[ handoff_date ][ 'HANDOFF' ]
except KeyError:
handoff_event = None
create_or_update_handoff_event( handoff_date, handoff_members, handoff_event )
def create_or_update_handoff_event( date, emails, existing_event=None ):
''' date = date of the event
emails = list of email addresses
existing_event = raw exchange event
'''
args = get_args()
if existing_event:
logging.info( f'Found existing HANDOFF event for date "{date}"' )
#logging.debug( f'Existing Event: {existing_event}' )
existing_members = sorted( meeting_attendees( existing_event ) )
new_members = sorted( emails )
if existing_members != new_members:
logging.debug( f'Member mismatch for HANDOFF date "{date}"' )
logging.debug( f'Existing: "{existing_members}"' )
logging.debug( f'New: "{new_members}"' )
msg = f'Updated member list for HANDOFF date "{date}"'
if args.dryrun:
logging.info( f'DRYRUN: {msg}' )
else:
px = get_pyexch()
px.update_event( existing_event.raw_event, attendees=new_members )
logging.info( msg )
else:
subj = 'Triage Hand-Off'
ev_start = datetime.datetime.combine( date, datetime.time( hour=8, minute=45 ) )
ev_end = datetime.datetime.combine( date, datetime.time( hour=9, minute=00 ) )
logging.info( f'Making new HANDOFF event for date "{date}"' )
if args.dryrun:
logging.info( f'DRYRUN: Start:"{ev_start}" End:"{ev_end}" Subj:"{subj}" Attendees:"{emails}"' )
else:
px = get_pyexch()
px.new_event(
start = ev_start,
end = ev_end,
subject = subj,
attendees = emails,
location = get_triage_location(),
categories = get_triage_categories()
)
def mk_triage_schedule():
''' Create a dict with
keys = date
values = { 'emails': emails, 'members': members }
'''
staff = get_staff()
triage_teams = get_triage_teams()
logging.debug( f'starting length triage_teams: {len(triage_teams)}' )
args = get_args()
workdays = list( libdate.get_workdays( args.start, args.end ) )
logging.debug( f'num workdays: {len(workdays)}' )
# ensure triage_teams is longer than workdays
scalar = int( len(workdays) / len(triage_teams) ) + 1
logging.debug( f'Scalar: {scalar}' )
triage_teams *= scalar
logging.debug( f'new length triage_teams: {len(triage_teams)}' )
# create the data
triage_raw_data = {}
for day in workdays:
# TODO - add checks here to skip if someone is on PTO or has a PM
members = triage_teams.popleft()
emails = [ staff[x].Email for x in members ]
triage_raw_data[day] = { 'emails': emails, 'members': members }
return triage_raw_data
def get_triage_teams():
staff = get_staff()
teamlist = libgroup.fair_pairs( list( staff.keys() ) )
teams = collections.deque( teamlist )
teams.rotate( -(get_args().start_at) )
return teams
def run():
validate_user_input()
args = get_args()
if args.list_teams:
for i,members in enumerate( get_triage_teams() ):
print( f'{i: >2d} {members}' )
return True
if args.triage_report:
events = events_by_type( types=('TRIAGE',) )
for date, sub in events.items():
print( f'{date}' )
for typ, ev in sub.items():
print( f'\t{typ}' )
members = meeting_attendees( ev )
print( f'\t\t{ev.start} {ev.type} {ev.subject} {members}' )
return True
if args.mktriage:
logging.info( f"make triage schedule" )
triage_raw_data = mk_triage_schedule()
logging.debug( f'Triage raw data: {triage_raw_data}' )
create_triage_meetings( triage_raw_data )
if args.mkhandoff:
create_handoff_meetings()
if __name__ == '__main__':
log_lvl = logging.INFO
args = get_args()
fmt = '%(levelname)s %(message)s'
if args.debug:
log_lvl = logging.DEBUG
fmt = '%(levelname)s [%(filename)s:%(funcName)s:%(lineno)s] %(message)s'
logging.basicConfig( level=log_lvl, format=fmt )
no_debug = [
'exchangelib',
'urllib3',
'requests_oauthlib',
]
for key in no_debug:
logging.getLogger(key).setLevel(logging.CRITICAL)
run()