forked from VolumeFi/trading-bots
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcache_db.py
174 lines (160 loc) · 4.56 KB
/
cache_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import json
import logging
import threading
import time
from contextlib import contextmanager
from datetime import timedelta
from psycopg.types.json import Jsonb
from psycopg_pool.pool import ConnectionPool
DB_POOL: ConnectionPool = None
REFRESH = threading.local()
CONN = threading.local()
WARM_DEXES = ("pancakeswap_new", "uniswap_v2", "uniswap_v3")
WARM_INTERVAL = timedelta(hours=1)
@contextmanager
def connect():
with DB_POOL.connection() as conn:
CONN.commit = conn.commit
CONN.cursor = conn.cursor
CONN.execute = conn.execute
try:
yield
finally:
delattr(CONN, "commit")
delattr(CONN, "cursor")
delattr(CONN, "execute")
def init():
global DB_POOL
DB_POOL = ConnectionPool("dbname=momentum_cache user=postgres")
with DB_POOL.connection() as conn:
conn.execute(
"""\
CREATE TABLE IF NOT EXISTS gecko (
path TEXT,
params JSONB,
ts TIMESTAMP WITHOUT TIME ZONE NOT NULL,
max_age INTERVAL NOT NULL,
value JSONB NOT NULL,
PRIMARY KEY (path, params)
)"""
)
conn.execute(
"""\
CREATE TABLE IF NOT EXISTS required_pairs (
dex TEXT,
from_coin TEXT NOT NULL,
to_coin TEXT NOT NULL,
PRIMARY KEY (dex, from_coin, to_coin)
)"""
)
conn.execute(
"""\
CREATE TABLE IF NOT EXISTS get_high_returns_warming_params (
params JSONB PRIMARY KEY,
ts TIMESTAMP WITHOUT TIME ZONE NOT NULL
)"""
)
for dex in WARM_DEXES:
conn.execute(
"""\
INSERT INTO get_high_returns_warming_params
VALUES (%s, now() - %s)
ON CONFLICT DO NOTHING""",
(
Jsonb(
{
"dex": dex,
"lag_return": 6,
"daily_volume": 0,
"vol_30": 0,
"market_cap": 0,
}
),
WARM_INTERVAL,
),
)
def try_cache(path, params, f):
with CONN.cursor() as cur:
cur.execute(
"""\
SELECT value
FROM gecko
WHERE path = %s
AND params = %s
AND (
now() - interval '5 minutes' <= ts -- Ignore use_cache if this entry is very young.
OR (%s AND now() - max_age <= ts)
)
""",
(path, Jsonb(params), getattr(REFRESH, "use_cache", True)),
)
value = cur.fetchone()
if value is not None:
return value[0]
value = f()
cur.execute(
"""\
INSERT INTO gecko(path, params, ts, max_age, value)
VALUES (%s, %s, now(), %s, %s)
ON CONFLICT (path, params) DO UPDATE
SET
ts = EXCLUDED.ts,
max_age = EXCLUDED.max_age,
value = EXCLUDED.value
""",
(
path,
Jsonb(params),
3 * WARM_INTERVAL + timedelta(minutes=5),
Jsonb(value),
),
)
CONN.commit()
return value
def get_pairs(dex):
with CONN.cursor() as cur:
cur.execute(
"""\
SELECT from_coin, to_coin FROM required_pairs
WHERE dex = %s
""",
(dex,),
)
return cur.fetchall()
def warm_cache_loop():
import momentum_scanner_intraday
REFRESH.use_cache = False
while True:
try:
with DB_POOL.connection() as conn:
# GC old data.
conn.execute("""DELETE FROM gecko where ts < now() - max_age""")
with conn.cursor() as cur:
cur.execute(
"""SELECT params FROM get_high_returns_warming_params WHERE ts < now() - %s""",
(WARM_INTERVAL,),
)
out_of_date = cur.fetchall()
if out_of_date is not None:
with connect():
for kwargs in out_of_date:
kwargs = kwargs[0]
logging.info(
"Running warming query with parameters %s",
json.dumps(kwargs),
)
momentum_scanner_intraday.get_high_returns(**kwargs)
CONN.execute(
"""\
INSERT INTO get_high_returns_warming_params
VALUES (%s, now())
ON CONFLICT (params)
DO UPDATE SET ts = EXCLUDED.ts
""",
(Jsonb(kwargs),),
)
else:
logging.info("Cache is warm")
except Exception:
logging.exception("Error while attempting cache warming query")
time.sleep(60)