forked from teaxyz/chai
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfetcher.py
119 lines (90 loc) · 3.47 KB
/
fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import tarfile
from dataclasses import dataclass
from datetime import datetime
from io import BytesIO
from shutil import rmtree
from typing import Any
from requests import get
from core.config import Config
from core.logger import Logger
@dataclass
class Data:
file_path: str
file_name: str
content: Any # json or bytes
class Fetcher:
def __init__(self, name: str, config: Config):
self.name = name
self.source = config.pm_config.source
self.output = f"data/{name}"
self.logger = Logger(f"{name}_fetcher")
self.no_cache = config.exec_config.no_cache
self.test = config.exec_config.test
def write(self, files: list[Data]):
"""generic write function for some collection of files"""
# prep the file location
now = datetime.now().strftime("%Y-%m-%d")
root_path = f"{self.output}/{now}"
# write
# it can be anything - json, tarball, etc.
for item in files:
file_path = item.file_path
file_name = item.file_name
file_content = item.content
full_path = os.path.join(root_path, file_path)
# make sure the path exists
os.makedirs(full_path, exist_ok=True)
with open(os.path.join(full_path, file_name), "wb") as f:
self.logger.debug(f"writing {full_path}")
f.write(file_content)
# update the latest symlink
self.update_symlink(now)
def update_symlink(self, latest_path: str):
latest_symlink = f"{self.output}/latest"
if os.path.islink(latest_symlink):
self.logger.debug(f"removing existing symlink {latest_symlink}")
os.remove(latest_symlink)
self.logger.debug(f"creating symlink {latest_symlink} -> {latest_path}")
os.symlink(latest_path, latest_symlink)
def fetch(self):
if self.fetch:
response = get(self.source)
try:
response.raise_for_status()
except Exception as e:
self.logger.error(f"error fetching {self.source}: {e}")
raise e
return response.content
def cleanup(self):
if self.no_cache:
rmtree(self.output, ignore_errors=True)
os.makedirs(self.output, exist_ok=True)
class TarballFetcher(Fetcher):
def __init__(self, name: str, config: Config):
super().__init__(name, config)
def fetch(self) -> list[Data]:
content = super().fetch()
bytes_io_object = BytesIO(content)
bytes_io_object.seek(0)
files = []
with tarfile.open(fileobj=bytes_io_object, mode="r:gz") as tar:
for member in tar.getmembers():
if member.isfile():
bytes_io_file = BytesIO(tar.extractfile(member).read())
destination_key = member.name
file_name = destination_key.split("/")[-1]
file_path = "/".join(destination_key.split("/")[:-1])
self.logger.debug(f"file_path/file_name: {file_path}/{file_name}")
files.append(Data(file_path, file_name, bytes_io_file.read()))
return files
class JSONFetcher(Fetcher):
def __init__(self, name: str, source: str):
super().__init__(name, source)
def fetch(self):
pass
class YAMLFetcher(Fetcher):
def __init__(self, name: str, source: str):
super().__init__(name, source)
def fetch(self):
pass