-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrank.py
295 lines (280 loc) · 11.4 KB
/
rank.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import os
import base64
from hoshino import Service
from hoshino.priv import *
from hoshino import aiorequests
from os import path
import json
from nonebot import scheduler
from PIL import Image
sv_help = '''
[日/台/陆rank] rank推荐表
'''.strip()
sv = Service("pcr-rank", help_=sv_help, bundle="pcr查询")
server_addr = "https://pcresource.coldthunder11.com/rank/"
resize_pic = False
config = None
def imgtobase64(path:str):
with open (path,'rb') as f:
fb = f.read()
img_base64 = str(base64.b64encode(fb),encoding='utf-8')
f.close()
return img_base64
async def load_config():
global config
global server_addr
config_path = path.join(path.dirname(__file__),"config.json")
with open(config_path,"r",encoding="utf8")as fp:
config = json.load(fp)
server_addr = config['upstream']
resize_pic = config['resize_pic']
if not path.exists(path.join(path.abspath(path.dirname(__file__)),"cache")):
os.mkdir(path.join(path.abspath(path.dirname(__file__)),"cache"))
if not path.exists(path.join(path.abspath(path.dirname(__file__)),"cache","pic")):
os.mkdir(path.join(path.abspath(path.dirname(__file__)),"cache","pic"))
await update_cache()
def save_config():
config_path = path.join(path.dirname(__file__),"config.json")
with open(config_path,'r+',encoding='utf8')as fp:
fp.seek(0)
fp.truncate()
str = json.dumps(config,indent=4,ensure_ascii=False)
fp.write(str)
async def download_rank_pic(url):
sv.logger.info(f"正在下载{url}")
resp = await aiorequests.head(url)
content_length = int(resp.headers["Content-Length"])
sv.logger.info(f"块大小{str(content_length)}")
#分割200kb下载
block_size = 1024*200
range_list = []
current_start_bytes = 0
while True:
if current_start_bytes + block_size >= content_length:
range_list.append(f"{str(current_start_bytes)}-{str(content_length)}")
break
range_list.append(f"{str(current_start_bytes)}-{str(current_start_bytes + block_size)}")
current_start_bytes += block_size + 1
pic_bytes_list = []
for block in range_list:
sv.logger.info(f"正在下载块{block}")
headers = {"Range":f"bytes={block}"}
resp = await aiorequests.get(url,headers = headers)
res_content = await resp.content
pic_bytes_list.append(res_content)
return b"".join(pic_bytes_list)
async def update_rank_pic_cache(force_update:bool):
config_names = ["cn","tw","jp"]
for conf_name in config_names:
config_path = path.join(path.dirname(__file__),"cache",f"{conf_name}.json")
with open(config_path,"r",encoding="utf8")as fp:
rank_config = json.load(fp)
for img_name in rank_config["files"]:
if not force_update:
if path.exists(path.join(path.abspath(path.dirname(__file__)),"cache","pic",f"{conf_name}_{img_name}")):
continue
rank_img_url = f"{server_addr}{config['source'][conf_name]['channel']}/{config['source'][conf_name]['route']}/{img_name}"
img_content = await download_rank_pic(rank_img_url)
with open(path.join(path.abspath(path.dirname(__file__)),"cache","pic",f"{conf_name}_{img_name}"),"ab")as fp:
fp.seek(0)
fp.truncate()
fp.write(img_content)
async def update_cache(force_update:bool=False):
sv.logger.info("正在更新Rank表缓存")
config_names = ["cn","tw","jp"]
for conf_name in config_names:
resp = await aiorequests.get(f"{server_addr}{config['source'][conf_name]['channel']}/{config['source'][conf_name]['route']}/config.json")
res = await resp.text
cache_path = path.join(path.abspath(path.dirname(__file__)),"cache",f"{conf_name}.json")
with open(cache_path,"a",encoding="utf8")as fp:
fp.seek(0)
fp.truncate()
fp.write(res)
await update_rank_pic_cache(force_update)
sv.logger.info("Rank表缓存更新完毕")
@sv.on_rex(r"^(\*?([日台国陆b])服?([前中后]*)卫?)?rank(表|推荐|指南)?$")
async def rank_sheet(bot, ev):
if config == None:
await load_config()
match = ev["match"]
is_jp = match.group(2) == "日"
is_tw = match.group(2) == "台"
is_cn = match.group(2) and match.group(2) in "国陆b"
if not is_jp and not is_tw and not is_cn:
await bot.send(ev, "\n请问您要查询哪个服务器的rank表?\n*日rank表\n*台rank表\n*陆rank表", at_sender=True)
return
msg = []
msg.append("\n")
if is_jp:
rank_config_path = path.join(path.abspath(path.dirname(__file__)),"cache","jp.json")
rank_config = None
with open(rank_config_path,"r",encoding="utf8")as fp:
rank_config = json.load(fp)
rank_imgs = []
for img_name in rank_config["files"]:
rank_imgs.append(f'{path.join(path.dirname(__file__),"cache","pic",f"jp_{img_name}")}')
msg.append(rank_config["notice"])
pos = match.group(3)
if not pos or "前" in pos:
msg.append(f"[CQ:image,file=base64://{imgtobase64(rank_imgs[0])}]")
if not pos or "中" in pos:
msg.append(f"[CQ:image,file=base64://{imgtobase64(rank_imgs[1])}]")
if not pos or "后" in pos:
msg.append(f"[CQ:image,file=base64://{imgtobase64(rank_imgs[2])}]")
await bot.send(ev, "".join(msg), at_sender=True)
elif is_tw:
rank_config_path = path.join(path.abspath(path.dirname(__file__)),"cache","tw.json")
rank_config = None
with open(rank_config_path,"r",encoding="utf8")as fp:
rank_config = json.load(fp)
rank_imgs = []
for img_name in rank_config["files"]:
rank_imgs.append(f'{path.join(path.dirname(__file__),"cache","pic",f"tw_{img_name}")}')
msg.append(rank_config["notice"])
for rank_img in rank_imgs:
b64 = imgtobase64(rank_img)
msg.append(f"[CQ:image,file=base64://{b64}]")
await bot.send(ev, "".join(msg), at_sender=True)
elif is_cn:
rank_config_path = path.join(path.abspath(path.dirname(__file__)),"cache","cn.json")
rank_config = None
with open(rank_config_path,"r",encoding="utf8")as fp:
rank_config = json.load(fp)
rank_imgs = []
for img_name in rank_config["files"]:
rank_imgs.append(f'{path.join(path.dirname(__file__),"cache","pic",f"cn_{img_name}")}')
msg.append(rank_config["notice"])
for rank_img in rank_imgs:
b64 = imgtobase64(rank_img)
msg.append(f"[CQ:image,file=base64://{b64}]")
await bot.send(ev, "".join(msg), at_sender=True)
@sv.on_fullmatch("查看当前rank更新源")
async def show_current_rank_source(bot, ev):
if config == None:
await load_config()
if not check_priv(ev, SUPERUSER):
await bot.send(ev, "仅有SUPERUSER可以使用本功能")
msg = []
msg.append("\n")
msg.append("国服:\n")
msg.append(config["source"]["cn"]["name"])
msg.append(" ")
if config["source"]["cn"]["channel"] == "stable":
msg.append("稳定源")
elif config["source"]["cn"]["channel"] == "auto_update":
msg.append("自动更新源")
else:
msg.append(config["source"]["cn"]["channel"])
msg.append("\n台服:\n")
msg.append(config["source"]["tw"]["name"])
msg.append(" ")
if config["source"]["tw"]["channel"] == "stable":
msg.append("稳定源")
elif config["source"]["tw"]["channel"] == "auto_update":
msg.append("自动更新源")
else:
msg.append(config["source"]["tw"]["channel"])
msg.append("\n日服:\n")
msg.append(config["source"]["jp"]["name"])
msg.append(" ")
if config["source"]["jp"]["channel"] == "stable":
msg.append("稳定源")
elif config["source"]["jp"]["channel"] == "auto_update":
msg.append("自动更新源")
else:
msg.append(config["source"]["jp"]["channel"])
await bot.send(ev, "".join(msg), at_sender=True)
@sv.on_fullmatch("查看全部rank更新源")
async def show_all_rank_source(bot, ev):
if config == None:
await load_config()
if not check_priv(ev, SUPERUSER):
await bot.send(ev, "仅有SUPERUSER可以使用本功能")
resp = await aiorequests.get(server_addr+"route.json")
res = await resp.json()
msg = []
msg.append("\n")
msg.append("稳定源:\n国服:\n")
for uo in res["ranks"]["channels"]["stable"]["cn"]:
msg.append(uo["name"])
msg.append(" ")
msg.append("\n台服:\n")
for uo in res["ranks"]["channels"]["stable"]["tw"]:
msg.append(uo["name"])
msg.append(" ")
msg.append("\n日服:\n")
for uo in res["ranks"]["channels"]["stable"]["jp"]:
msg.append(uo["name"])
msg.append(" ")
msg.append("\n自动更新源:\n国服:\n")
for uo in res["ranks"]["channels"]["auto_update"]["cn"]:
msg.append(uo["name"])
msg.append(" ")
msg.append("\n台服:\n")
for uo in res["ranks"]["channels"]["auto_update"]["tw"]:
msg.append(uo["name"])
msg.append(" ")
msg.append("\n日服:\n")
for uo in res["ranks"]["channels"]["auto_update"]["jp"]:
msg.append(uo["name"])
msg.append(" ")
msg.append("\n如需修改更新源,请使用命令[设置rank更新源 国/台/日 稳定/自动更新 源名称]")
await bot.send(ev, "".join(msg), at_sender=True)
@sv.on_rex(r'^设置rank更新源 (.{0,5}) (.{0,10}) (.{0,20})$')
async def change_rank_source(bot, ev):
if config == None:
await load_config()
if not check_priv(ev, SUPERUSER):
await bot.send(ev, "仅有SUPERUSER可以使用本功能")
robj = ev['match']
server = robj.group(1)
channel = robj.group(2)
name = robj.group(3)
if server == "国":
server = "cn"
elif server == "台":
server = "tw"
elif server == "日":
server = "jp"
else :
await bot.send(ev, "请选择正确的服务器(国/台/日)", at_sender=True)
return
if channel == "稳定":
channel = "stable"
elif channel == "自动更新":
channel = "auto_update"
else :
await bot.send(ev, "请选择正确的频道(稳定/自动更新)", at_sender=True)
return
resp = await aiorequests.get(server_addr+"route.json")
res = await resp.json()
has_name = False
source_jo = None
for uo in res["ranks"]["channels"][channel][server]:
if uo["name"].upper() == name.upper():
has_name = True
source_jo = uo
break
if not has_name:
await bot.send(ev, "请输入正确的源名称", at_sender=True)
return
config["source"][server]["name"] = source_jo["name"]
config["source"][server]["channel"] = channel
config["source"][server]["route"] = source_jo["route"]
save_config()
await update_cache(True)
await bot.send(ev, "更新源设置成功", at_sender=True)
@sv.on_fullmatch("更新rank表缓存")
async def update_rank_cache(bot, ev):
if config == None:
await load_config()
if not check_priv(ev, SUPERUSER):
await bot.send(ev, "仅有SUPERUSER可以使用本功能")
return
await update_cache()
await bot.send(ev, "更新成功")
@scheduler.scheduled_job('cron', hour='17', minute='06')
async def schedule_update_rank_cache():
if config == None:
await load_config()
await update_cache()