-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathipfscontroller.py
executable file
·269 lines (240 loc) · 9.83 KB
/
ipfscontroller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import asyncio
import logging
import os
import aiopg
from redis.asyncio import StrictRedis
from ipfsworkerlib import say
logging.basicConfig(level=logging.DEBUG)
async def check_files(pool, cid_column, alloc_column, table):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
cmd = f"""
SELECT {cid_column}, {alloc_column}, replications from {table}
where {cid_column} is not null and state != 'delete' and (
{alloc_column} is null
or
(
replications is null and jsonb_array_length({alloc_column}) < 2
)
or
(
replications is not null and jsonb_array_length({alloc_column}) < replications
)
)
"""
await cur.execute(cmd)
async for row in cur:
yield row
async def check_files_for_rotation(pool, cid_column, alloc_column,
rotation_column, table):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
cmd = f"""
SELECT {cid_column}, {alloc_column}, {rotation_column} from {table}
where {cid_column} is not null and state != 'delete' and (
{rotation_column} is not null
)
order by jsonb_array_length({alloc_column})
limit 1
"""
await cur.execute(cmd)
async for row in cur:
yield row
async def get_members(connection):
members = await connection.smembers("ipfsworker.workers")
return [
member for member in members
if await connection.get(f"ipfsworker.{member}.df") is not None
]
async def get_candidates(connection, allocations, replications):
members = await get_members(connection)
members = set(members) - set(allocations)
if not members:
return None
# exclude node already doing something
error_members = [
member for member in members
if await connection.llen(f"ipfsworker.{member}.error") != 0
]
if error_members:
say(f"Those node have to recover from errors: {', '.join(error_members)}"
)
members = [
member for member in members
if await connection.get(f"ipfsworker.{member}.current") is None
and member not in error_members
]
members_with_df = [(float(await connection.get(f"ipfsworker.{member}.df")),
member) for member in members]
candidates = list(reversed(
sorted(members_with_df)))[:max(0, replications - len(allocations))]
return [candidate[1] for candidate in candidates]
async def wait_for_sync(connection):
client = StrictRedis(host=os.environ["REDIS_HOST"], port=6379, db=0)
p = client.pubsub()
await p.subscribe('ipfsworker.controller.wake')
await p.get_message(timeout=1) # header
await p.get_message(timeout=1) # header
members = await get_members(connection)
await connection.publish("ipfsworker.workers.wake", "dummy")
say("Awakened workers, waiting for idle time")
seconds = 5
while not all(await asyncio.gather(
*[check_member(member, connection) for member in members])):
say("Workers are still working."
f" Waiting for {seconds} seconds for someone to wake me")
message = await p.get_message(timeout=seconds)
if message is None:
# nothing unusual, just wait longueur
seconds = min(seconds * 2, 3600)
else:
say("Someone awakened me, let's check if I can stop waiting")
seconds = 5
await p.unsubscribe()
async def check_member(member, connection):
if await connection.get(f"ipfsworker.{member}.df") is None:
say(f"Heartbeat missed for {member}, removing it from the members")
await connection.srem("ipfsworker.workers", member)
return
res = all([
await connection.get(f"ipfsworker.{member}.current") is None,
await connection.llen(f"ipfsworker.{member}") == 0,
await connection.llen(f"ipfsworker.{member}.error") == 0,
])
return res
async def push_work(connection, pool):
waiting_time = 60
await wait_for_sync(connection)
while True:
cids = []
for cid_column, alloc_column, table in [
("cid", "allocations", "file"),
("thumbnail_cid", "thumbnail_allocations", "photovideo"),
("web_cid", "web_allocations", "photovideo"),
("sub_cid", "sub_allocations", "film"),
("sub_cid", "sub_allocations", "serie"),
]:
say(f"Playing with table {table}")
async for cid, candidates in step(connection, pool, cid_column,
alloc_column, table):
say(f"Done with cid {cid}")
cids.append(cid)
await wait_for_sync(connection)
say(f"{' and '.join(candidates)} should now have {cid}")
if cids == []:
say("Wake up processes that waited to see whether the normal work ended"
)
await connection.publish("ipfsworker.controller.idle", "dummy")
if cids == []:
say("Nothing done in the normal case, deal with some rotations")
for cid_column, alloc_column, rotation_column, table in [
("cid", "allocations", "rotation", "file"),
("thumbnail_cid", "thumbnail_allocations",
"thumbnail_rotation", "photovideo"),
("web_cid", "web_allocations", "web_rotation", "photovideo"),
("sub_cid", "sub_allocations", "sub_rotation", "film"),
("sub_cid", "sub_allocations", "sub_rotation", "serie"),
]:
say(f"Playing with table {table} for rotation")
async for cid, candidates in rotation_step(
connection, pool, cid_column, alloc_column,
rotation_column, table):
say(f"Done with cid {cid} for rotation")
cids.append(cid)
await wait_for_sync(connection)
say(f"{' and '.join(candidates)} have {cid} for rotation")
if cids == []:
say(f"Since nothing was done, waiting {waiting_time}s before starting again"
)
subscriber = connection.pubsub()
await subscriber.subscribe('ipfsworker.controller.wake')
await subscriber.get_message(timeout=1) # header
await subscriber.get_message(timeout=1) # header
await subscriber.get_message(timeout=waiting_time)
await subscriber.unsubscribe("ipfsworker.controller.wake")
waiting_time = min(waiting_time * 2, 3600)
else:
waiting_time = 60
async def step(connection, pool, cid_column, alloc_column, table):
async for cid, allocations, replications in check_files(
pool, cid_column, alloc_column, table):
allocations = allocations or []
replications = replications or 2
say(f"Replicating {replications} times {cid},"
f" already in {', '.join(allocations) if allocations else 'nowhere...'}"
)
candidates = await get_candidates(connection, allocations,
replications)
if candidates is None or candidates == []:
say(f"No alive node to host {cid}")
continue
say(f"Found candidates {', '.join(candidates)}")
for candidate in candidates:
say(f"Pushing {cid} to {candidate}")
await connection.lpush(
f"ipfsworker.{candidate}",
cid,
)
say(f"Updating alloc for {cid} to {candidate}")
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(f"""
update {table} set {alloc_column} = (
case
when {alloc_column} is null then '[]'
else {alloc_column}
end
) || '["{candidate}"]' where {cid_column} = '{cid}'
""")
say(f"Done with {cid} to {candidate}")
yield cid, candidates
async def rotation_step(connection, pool, cid_column, alloc_column,
rotation_column, table):
async for cid, allocations, candidates in check_files_for_rotation(
pool, cid_column, alloc_column, rotation_column, table):
allocations = allocations or []
say(f"Rotating {cid} in {rotation_column} for {candidates}")
done_for = []
for candidate in candidates:
assert candidate in allocations, f"{candidate} is not in allocations ({allocations}): Mistake?"
isavailable = await check_member(candidate, connection)
if not isavailable:
say(f"{candidate} is not available, skipping for now")
continue
say(f"Pushing {cid} to {candidate} for rotation")
await connection.lpush(
f"ipfsworker.{candidate}",
cid,
)
say(f"Consuming the rotation for candidate {candidate} for {cid}")
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(f"""
update {table} set {rotation_column} = (
case
when {rotation_column} = '["{candidate}"]'
then null
else {rotation_column} - '{candidate}'
end
)
where {cid_column} = '{cid}'
""")
say(f"Updated the database for {cid} to {candidate} for rotation")
done_for.append(candidate)
if done_for:
yield cid, candidates
async def main():
connection = StrictRedis(
decode_responses=True,
host=os.environ["REDIS_HOST"],
)
say("Connecting to postgres")
pool = await aiopg.create_pool(
f'dbname=docs user=postgres host={os.environ["SERVICEMESH_IP"]}')
say("Connected to postgres")
await asyncio.gather(push_work(connection, pool))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())