forked from EleutherAI/lm-evaluation-harness
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathzeno_visualize.py
242 lines (205 loc) · 8.29 KB
/
zeno_visualize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import argparse
import json
import os
import re
from pathlib import Path
import pandas as pd
from zeno_client import ZenoClient, ZenoMetric
from lm_eval.utils import (
eval_logger,
get_latest_filename,
get_results_filenames,
get_sample_results_filenames,
)
def parse_args():
parser = argparse.ArgumentParser(
description="Upload your data to the Zeno AI evaluation platform to visualize results. This requires a ZENO_API_KEY in your environment variables. The eleuther harness must be run with log_samples=True and an output_path set for data to be written to disk."
)
parser.add_argument(
"--data_path",
required=True,
help="Where to find the results of the benchmarks that have been run. Uses the name of each subfolder as the model name.",
)
parser.add_argument(
"--project_name",
required=True,
help="The name of the generated Zeno project.",
)
return parser.parse_args()
def main():
"""Upload the results of your benchmark tasks to the Zeno AI evaluation platform.
This scripts expects your results to live in a data folder where subfolders contain results of individual models.
"""
args = parse_args()
client = ZenoClient(os.environ["ZENO_API_KEY"])
# Get all model subfolders from the parent data folder.
models = [
os.path.basename(os.path.normpath(f))
for f in os.scandir(Path(args.data_path))
if f.is_dir()
]
assert len(models) > 0, "No model directories found in the data_path."
# Get the tasks from the latest results file of the first model.
tasks = set(tasks_for_model(models[0], args.data_path))
# Get tasks names from the latest results file for each model
# Get intersection of tasks for all models
for model in models:
old_tasks = tasks.copy()
task_count = len(tasks)
model_tasks = set(tasks_for_model(model, args.data_path))
tasks.intersection(set(model_tasks))
if task_count != len(tasks):
eval_logger.warning(
f"All models must have the same tasks. {model} has tasks: {model_tasks} but have already recorded tasks: {old_tasks}. Taking intersection {tasks}"
)
assert (
len(tasks) > 0
), "Must provide at least one task in common amongst models to compare."
for task in tasks:
# Upload data for all models
for model_index, model in enumerate(models):
# Get latest results and sample results for a model
model_dir = Path(args.data_path, model)
model_files = [f.as_posix() for f in model_dir.iterdir() if f.is_file()]
model_results_filenames = get_results_filenames(model_files)
model_sample_filenames = get_sample_results_filenames(model_files)
latest_results = get_latest_filename(
[Path(f).name for f in model_results_filenames]
)
latest_sample_results = get_latest_filename(
[Path(f).name for f in model_sample_filenames if task in f]
)
model_args = re.sub(
r"[\"<>:/\|\\?\*\[\]]+",
"__",
json.load(
open(Path(args.data_path, model, latest_results), encoding="utf-8")
)["config"]["model_args"],
)
print(model_args)
data = []
with open(
Path(args.data_path, model, latest_sample_results),
"r",
encoding="utf-8",
) as file:
for line in file:
data.append(json.loads(line.strip()))
configs = json.load(
open(Path(args.data_path, model, latest_results), encoding="utf-8")
)["configs"]
config = configs[task]
if model_index == 0: # Only need to assemble data for the first model
metrics = []
for metric in config["metric_list"]:
metrics.append(
ZenoMetric(
name=metric["metric"],
type="mean",
columns=[metric["metric"]],
)
)
project = client.create_project(
name=args.project_name + (f"_{task}" if len(tasks) > 1 else ""),
view="text-classification",
metrics=metrics,
)
project.upload_dataset(
generate_dataset(data, config),
id_column="id",
data_column="data",
label_column="labels",
)
project.upload_system(
generate_system_df(data, config),
name=model,
id_column="id",
output_column="output",
)
def tasks_for_model(model: str, data_path: str):
"""Get the tasks for a specific model.
Args:
model (str): The name of the model.
data_path (str): The path to the data.
Returns:
list: A list of tasks for the model.
"""
# get latest model results for a given name
model_dir = Path(data_path, model)
model_files = [f.as_posix() for f in model_dir.iterdir() if f.is_file()]
model_results_filenames = get_results_filenames(model_files)
latest_results = get_latest_filename(model_results_filenames)
config = (json.load(open(latest_results, encoding="utf-8"))["configs"],)
return list(config[0].keys())
def generate_dataset(
data,
config,
):
"""Generate a Zeno dataset from evaluation data.
Args:
data: The data to generate a dataset for.
config: The configuration of the task.
Returns:
pd.Dataframe: A dataframe that is ready to be uploaded to Zeno.
"""
ids = [x["doc_id"] for x in data]
labels = [x["target"] for x in data]
instance = [""] * len(ids)
if config["output_type"] == "loglikelihood":
instance = [x["arguments"][0][0] for x in data]
labels = [x["arguments"][0][1] for x in data]
elif config["output_type"] == "multiple_choice":
instance = [
x["arguments"][0][0]
+ "\n\n"
+ "\n".join([f"- {y[1]}" for y in x["arguments"]])
for x in data
]
elif config["output_type"] == "loglikelihood_rolling":
instance = [x["arguments"][0][0] for x in data]
elif config["output_type"] == "generate_until":
instance = [x["arguments"][0][0] for x in data]
return pd.DataFrame(
{
"id": ids,
"data": instance,
"input_len": [len(x) for x in instance],
"labels": labels,
"output_type": config["output_type"],
}
)
def generate_system_df(data, config):
"""Generate a dataframe for a specific system to be uploaded to Zeno.
Args:
data: The data to generate a dataframe from.
config: The configuration of the task.
Returns:
pd.Dataframe: A dataframe that is ready to be uploaded to Zeno as a system.
"""
ids = [x["doc_id"] for x in data]
system_dict = {"id": ids}
system_dict["output"] = [""] * len(ids)
if config["output_type"] == "loglikelihood":
system_dict["output"] = [
"correct" if x["filtered_resps"][0][1] is True else "incorrect"
for x in data
]
elif config["output_type"] == "multiple_choice":
system_dict["output"] = [
", ".join([str(y[0]) for y in x["filtered_resps"]]) for x in data
]
system_dict["num_answers"] = [len(x["filtered_resps"]) for x in data]
elif config["output_type"] == "loglikelihood_rolling":
system_dict["output"] = [str(x["filtered_resps"][0]) for x in data]
elif config["output_type"] == "generate_until":
system_dict["output"] = [str(x["filtered_resps"][0]) for x in data]
system_dict["output_length"] = [len(str(x["filtered_resps"][0])) for x in data]
metrics = {}
for metric in config["metric_list"]:
if "aggregation" in metric and metric["aggregation"] == "mean":
metrics[metric["metric"]] = [x[metric["metric"]] for x in data]
system_dict.update(metrics)
system_df = pd.DataFrame(system_dict)
return system_df
if __name__ == "__main__":
main()