-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathArbDataPuller.py
223 lines (201 loc) · 9.73 KB
/
ArbDataPuller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# =============================================================================
# IMPORTS
# =============================================================================
import os, sys, json
import boto3
from dotenv import load_dotenv
import pandas as pd
import concurrent.futures
from itertools import repeat
# =============================================================================
# FILE IMPORTS
# =============================================================================
load_dotenv()
sys.path.append(os.path.abspath("./utils"))
from utils.jprint import jprint
from utils.time_helpers import (
determine_cur_utc_timestamp,
determine_today_str_timestamp,
determine_next_midnight,
determine_if_new_day,
sleep_to_desired_interval,
)
from classes.GetBidAsks import GetBidAsks
from classes.DiscordAlert import DiscordAlert
from classes.EodDiff import EodDiff
from classes.SaveRawData import SaveRawData
from classes.FrozenOrderbook import FrozenOrderbook
from utils.discord_hook import ping_private_discord
# =============================================================================
# CONFIG
# =============================================================================
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")
s3 = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name="eu-central-1",
)
# =============================================================================
# CLASS
# =============================================================================
class ArbDataPuller:
def __init__(self, market: str, exchanges_obj: dict):
self.market = self.check_market(market)
self.exchanges_obj = exchanges_obj
self.interval = self.ask_user_for_interval()
self.exchanges = self.make_list_of_exchanges(exchanges_obj)
self.diff_pairs = self.create_unique_exchange_pairs()
self.S3_BASE_PATHS = self.determine_general_s3_filepaths()
self.s3 = s3
self.GetBidAsks = GetBidAsks(self)
self.FrozenOrderbook = FrozenOrderbook(self)
self.Discord = DiscordAlert(self)
self.SaveRawData = SaveRawData(self)
self.EodDiff = EodDiff(self)
# =============================================================================
# Get market data for exchanges, iterate infinitely
# =============================================================================
def main(self):
print("MAKE SURE THRESHS ARE APPROPRIATE!")
self.reset_for_new_day()
sleep_to_desired_interval(self.interval)
while True:
if determine_if_new_day(self.midnight):
self.handle_midnight_event()
self.get_bid_ask_and_process_df_and_test_diff()
sleep_to_desired_interval(self.interval)
# =============================================================================
# It's midnight! Save important data and reset for next day
# =============================================================================
def handle_midnight_event(self):
if self.today not in [
"2023-01-27",
"2023-01-28",
"2023-01-29",
]:
self.SaveRawData.save_raw_bid_ask_data_to_s3()
self.EodDiff.determine_eod_diff_n_create_summary(self.df_obj, self.today)
self.reset_for_new_day() # must come last!
# =============================================================================
# Get bid ask data and update dataframe obj for all exchanges
# =============================================================================
def get_bid_ask_and_process_df_and_test_diff(self) -> dict:
bid_asks = self.get_bid_ask_from_exchanges()
self.update_df_obj_with_new_bid_ask_data(bid_asks)
self.FrozenOrderbook.check_all_orderbooks_if_frozen()
self.Discord.determine_exchange_diff_and_alert_discord(bid_asks)
print("=========================================\n")
jprint(self.df_obj)
# =============================================================================
# Get current bid ask data from exchange using THREADDING
# =============================================================================
def get_bid_ask_from_exchanges(self) -> dict:
bid_asks = {}
now = determine_cur_utc_timestamp()
with concurrent.futures.ThreadPoolExecutor() as executor:
result = executor.map(
self.GetBidAsks.get_bid_ask_from_specific_exchange,
self.exchanges_obj.items(),
repeat(now),
)
for r in result:
exchange, bid_ask = r
bid_asks[exchange] = bid_ask
return bid_asks
# =============================================================================
# If no data frame exists, create dataframe
# =============================================================================
def update_df_obj_with_new_bid_ask_data(self, bid_asks: dict) -> dict:
for exchange, bid_ask in bid_asks.items():
if exchange not in self.df_obj:
df = self.create_new_df_with_bid_ask(bid_ask)
else:
cur_df = self.df_obj[exchange]
df = self.append_existing_df_with_bid_ask(bid_ask, cur_df)
self.df_obj[exchange] = df
# =============================================================================
# Start of new day, create a new df
# =============================================================================
def create_new_df_with_bid_ask(self, bid_ask) -> pd.DataFrame:
return pd.DataFrame([bid_ask])
# =============================================================================
# Append to existing dataframe
# =============================================================================
def append_existing_df_with_bid_ask(self, bid_ask, df):
return pd.concat([df, pd.DataFrame([bid_ask])], ignore_index=True)
# =============================================================================
#
# HELPERS
#
# =============================================================================
# =============================================================================
# Create all unique exchange pairs for diff calculation later
# =============================================================================
def create_unique_exchange_pairs(self):
i = 0
pairs = []
for i, ex in enumerate(self.exchanges[:-1]):
j = i + 1
for ex2 in self.exchanges[j:]:
pairs.append(f"{ex}-{ex2}")
return pairs
# =============================================================================
# Get exchanges, make sure they're not hyphonated
# =============================================================================
def make_list_of_exchanges(self, exchanges_obj: dict):
exchanges = list(exchanges_obj.keys())
for ex in exchanges:
if "-" in ex:
msg = f"Naming convention: Rename exchange. {ex} cannot contain a - (hyphon) in its name."
raise Exception(msg)
return exchanges
# =============================================================================
# Get all relevant filepaths to fetch and save data to
# =============================================================================
def determine_general_s3_filepaths(self) -> dict:
s3_base_paths = {}
for exchange in self.exchanges_obj.keys():
path = f"{exchange}/{self.market}/{exchange}-{self.market}"
s3_base_paths[exchange] = path
return s3_base_paths
# =============================================================================
# Reset all values that start a new with new day
# =============================================================================
def reset_for_new_day(self):
self.today = determine_today_str_timestamp()
self.midnight = determine_next_midnight()
self.df_obj = {}
# =============================================================================
# Ask user for interval on how often to fetch bid/ask
# =============================================================================
def ask_user_for_interval(self):
inp = int(input("Specify the desired interval in seconds: ").strip())
if inp < 5:
raise ValueError("Interval is too small. Execution cancelled.")
return inp
# =============================================================================
# Make sure standardized market input has a valid format
# =============================================================================
def check_market(self, market):
if "-" not in market or not market.isupper():
raise ValueError("Invalid market format. Should be like so: `BTC-USD`.")
return market
if __name__ == "__main__":
# to activate EC2: ssh -i "ec2-arb-stats.pem" [email protected]
# to active venv: source venv/bin/activate
# BTC-USD '{"DYDX": "BTC-USD", "BINANCE_GLOBAL": "BTCBUSD"}'
# ETH-USD '{"DYDX": "ETH-USD", "BINANCE_GLOBAL": "ETHBUSD"}'
if len(sys.argv) < 3:
raise Exception(
'Need to enter exchanges dict like so: \'{"BINANCE_GLOBAL": "BTC/USD", "DYDX": "BTC-USD"}\''
)
market = sys.argv[1]
exchanges_obj = json.loads(sys.argv[2])
obj = ArbDataPuller(market=market, exchanges_obj=exchanges_obj)
try:
ping_private_discord(f"Initiating arb-tracker for {market} on {exchanges_obj}")
obj.main()
finally:
ping_private_discord("ALARM: ARB_DATAPULLER EXECUTION WAS STOPPED")