-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspreadsurfer.py
72 lines (60 loc) · 2.36 KB
/
spreadsurfer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import sys
import threading
import traceback
from spreadsurfer import *
import asyncio
import signal
from loguru import logger
from spreadsurfer.bookkeeper import Bookkeeper
from spreadsurfer.price_engine import PriceEngine
from spreadsurfer.connector_binance_wss import BinanceWebsocketConnector
# sys.tracebacklimit = 3
logger.remove()
# logger.add(sys.stdout, level=35)
logger.add(sys.stdout)
logger.level("magenta", color='<magenta>', no=15)
logger.level("data", color='<light-blue>', no=35)
logger.level("ml", color='<light-cyan>', no=37)
logger.level("bookkeeper", color='<light-green><bold>', no=37)
logger.add("console.log", rotation="500 MB")
def thread_dump(signum, stack):
stacktrace = ""
for _thread in threading.enumerate():
stacktrace += str(_thread)
stacktrace += "".join(traceback.format_stack(sys._current_frames()[_thread.ident]))
stacktrace += "\n"
logger.info('\n-- dump stacktrace start -- \n{}\n-- dump stacktrace end -- ', stacktrace)
@logger.catch
async def main():
signal.signal(signal.SIGUSR1, thread_dump)
wave_events_queue = asyncio.Queue(maxsize=1)
orders_queue = asyncio.Queue(maxsize=1)
datacollect_queue = asyncio.Queue(maxsize=1)
exchange = connect_exchange()
# exchange2 = connect_exchange()
try:
bookkeeper = Bookkeeper()
data_collector = DataCollector(datacollect_queue)
binance_wss_connector = BinanceWebsocketConnector()
balance_watcher = BalanceWatcher(exchange, binance_wss_connector)
order_book_watcher = OrderBookWatcher(exchange)
price_engine = PriceEngine(data_collector, order_book_watcher)
coroutines = [
TimeTracker(bookkeeper, binance_wss_connector),
balance_watcher,
OrderWatcher(exchange, bookkeeper),
order_book_watcher,
TradeWatcher(exchange, wave_events_queue),
WaveHandler(order_book_watcher, wave_events_queue, orders_queue, datacollect_queue),
OrderMaker(exchange, orders_queue, balance_watcher, bookkeeper, price_engine, binance_wss_connector),
data_collector
]
tasks = [asyncio.create_task(x.start()) for x in coroutines]
await asyncio.gather(*tasks)
finally:
await exchange.close()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass