-
Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathutil.py
159 lines (142 loc) · 6.15 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import subprocess
import sys
import itertools
from typing import List, Dict
from colorama import Fore, Back, Style
import re
import time
# https://stackoverflow.com/a/14693789/15675011
ansi_escape = re.compile(r'''
\x1B # ESC
(?: # 7-bit C1 Fe (except CSI)
[@-Z\\-_]
| # or [ for CSI, followed by a control sequence
\[
[0-?]* # Parameter bytes
[ -/]* # Intermediate bytes
[@-~] # Final byte
)
''', re.VERBOSE)
class MatrixRunner:
def __init__(self, matrix, exclude):
self.matrix = matrix
self.exclude = exclude
self.include = self.parse_includes()
self.keys = [*matrix.keys()]
self.values = [*matrix.values()]
self.results = {} # insertion-ordered
self.failed = False
self.work = self.get_work()
self.last_matrix_config = None
self.current_matrix_config = None
def parse_includes(self) -> Dict[str, List[str]]:
includes: Dict[str, List[str]] = dict()
for arg in sys.argv:
if arg.startswith("--slice="):
rest = arg[len("--slice="):]
key, value = rest.split(":")
if key not in includes:
includes[key] = []
includes[key].append(value)
return includes
def run_command(self, *args: List[str], always_output=False, output_matcher=None) -> bool:
self.log(f"{Fore.CYAN}{Style.BRIGHT}Running Command \"{' '.join(args)}\"{Style.RESET_ALL}")
start_time = time.time()
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
runtime = time.time() - start_time
self.log(Style.RESET_ALL, end="") # makefile in parallel sometimes messes up colors
if p.returncode != 0:
self.log(f"{Fore.RED}{Style.BRIGHT}Command failed{Style.RESET_ALL} {Fore.MAGENTA}(time: {runtime:.2f}s){Style.RESET_ALL}")
self.log("stdout:")
self.log(stdout.decode("utf-8"), end="")
self.log("stderr:")
self.log(stderr.decode("utf-8"), end="")
self.failed = True
return False
else:
self.log(f"{Fore.GREEN}{Style.BRIGHT}Command succeeded{Style.RESET_ALL} {Fore.MAGENTA}(time: {runtime:.2f}s){Style.RESET_ALL}")
if always_output:
self.log("stdout:")
self.log(stdout.decode("utf-8"), end="")
self.log("stderr:")
self.log(stderr.decode("utf-8"), end="")
elif len(stderr) != 0:
self.log("stderr:")
self.log(stderr.decode("utf-8"), end="")
if output_matcher is not None:
if not output_matcher(stdout.decode("utf-8")):
self.failed = True
return False
return True
def set_fail(self):
self.failed = True
def current_config(self):
return self.current_matrix_config
def last_config(self):
return self.last_matrix_config
def log(self, *args, **kwargs):
print(*args, **kwargs, flush=True)
def do_exclude(self, matrix_config, exclude):
return all(map(lambda k: matrix_config[k] == exclude[k], exclude.keys()))
def do_include(self, matrix_config, include):
if len(include) == 0:
return True
return all(map(lambda k: matrix_config[k] in include[k], include.keys()))
def assignment_to_matrix_config(self, assignment):
matrix_config = {}
for k, v in zip(self.matrix.keys(), assignment):
matrix_config[k] = v
return matrix_config
def get_work(self):
work = []
for assignment in itertools.product(*self.matrix.values()):
config = self.assignment_to_matrix_config(assignment)
if any(map(lambda ex: self.do_exclude(config, ex), self.exclude)):
continue
if not self.do_include(config, self.include):
continue
work.append(assignment)
return work
def run(self, fn):
for i, assignment in enumerate(self.work):
matrix_config = self.assignment_to_matrix_config(assignment)
config_tuple = tuple(self.values[i].index(p) for i, p in enumerate(assignment))
config_str = ', '.join(map(lambda v: str(v), matrix_config.values()))
if config_str == "":
self.log(f"{Fore.BLUE}{Style.BRIGHT}{'=' * 10} [{i + 1}/{len(self.work)}] Running with blank config {'=' * 10}{Style.RESET_ALL}")
else:
self.log(f"{Fore.BLUE}{Style.BRIGHT}{'=' * 10} [{i + 1}/{len(self.work)}] Running with config {config_str} {'=' * 10}{Style.RESET_ALL}")
self.last_matrix_config = self.current_matrix_config
self.current_matrix_config = matrix_config
self.results[config_tuple] = fn(self)
self.print_results()
if self.failed:
self.log("🔴 Some checks failed")
sys.exit(1)
else:
self.log("🟢 All checks passed")
def adj_width(self, text):
return len(text) - len(ansi_escape.sub("", text))
def print_table(self, table):
columns = len(table[0])
column_widths = [1 for _ in range(columns)]
for row in table:
for i, cell in enumerate(row):
column_widths[i] = max(column_widths[i], len(ansi_escape.sub("", cell)))
for j, cell in enumerate(table[0]):
self.log("| {cell:{width}} ".format(cell=cell, width=column_widths[j] + self.adj_width(cell)), end="")
self.log("|")
for i, row in enumerate(table[1:]):
for j, cell in enumerate(row):
self.log("| {cell:{width}} ".format(cell=cell, width=column_widths[j] + self.adj_width(cell)), end="")
self.log("|")
def print_results(self):
self.log("Results:")
table = [self.keys]
for result in self.results:
table.append([
f"{Fore.GREEN if self.results[result] else Fore.RED}{Style.BRIGHT}{self.values[i][v]}{Style.RESET_ALL}"
for i, v in enumerate(result)
])
self.print_table(table)