-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathget_fixture_data.py
52 lines (42 loc) · 1.54 KB
/
get_fixture_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""
Generate fixture data for the tests.
The package and the development dependencies must be installed.
"""
import os
import json
from fedora_messaging import message
import requests
import click
from hotness_schema.tests import FIXTURES_DIR
@click.command()
@click.option("--timeout", default=300, help="Timeout for datagrepper (seconds)")
def get_fixtures(timeout):
message.load_message_classes()
for message_class, name in message._class_to_schema_name.items():
if not message_class.topic or not name.startswith("hotness"):
print("Skipping {}".format(message_class))
continue
try:
resp = requests.get(
"https://apps.fedoraproject.org/datagrepper/raw",
params={
"topic": message_class.topic,
"rows_per_page": 5,
"delta": 604800,
},
timeout=timeout,
)
except requests.exceptions.Timeout:
print("Datagrepper timed out, maybe there aren't any recent results")
continue
if resp.status_code != 200:
print(
"Failed to communicate with datagrepper ({})".format(resp.status_code)
)
continue
path = os.path.join(FIXTURES_DIR, message_class.topic + ".json")
messages = [msg["msg"] for msg in resp.json()["raw_messages"]]
with open(path, "w") as fp:
json.dump(messages, fp, sort_keys=True, indent=4)
if __name__ == "__main__":
get_fixtures()