forked from FreezeRasis/Ricebot-Nonebot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsydxRecent.py
380 lines (329 loc) · 14.8 KB
/
sydxRecent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
import datetime
import json
import os
import sqlite3
import requests
from nonebot import on_command, CommandSession
import openpyxl
from openpyxl.comments import Comment
book = openpyxl.load_workbook('song_alter.xlsx') #Form contain song's original and alternative name. There's only one in the project.
sheet = book.active
def song_find(key):
#You only need one arguement, which is the keyword of the song's alternative/original name
row_count = 0
for row in sheet.iter_rows():
row_count = row_count + 1
for cell in row:
if cell.value == key:
return sheet.cell(row_count, 1).value #Retuen the original name, which in the first column.
elif cell.value == None:
break
return False #When song no found, return the original search key for the original search engine.
def alter_add(new, exist, qqid):
#You need 3 arguement, new name, exisied name, and the uploader's QQID
exist = song_find(exist)
for row in sheet.iter_rows():
for cell in row:
if cell.value == exist:
for cell in row:
if sheet.cell(cell.row, cell.column+1).value == None:
sheet.cell(cell.row, cell.column+1).value = new
comment = Comment(qqid, 'Ricebot')
comment.width = 100
comment.height = 10
sheet.cell(cell.row, cell.column+1).comment = comment #Add uploader's QQID as comment
book.save('song_alter_test.xlsx')
return '已添加,请输入/search 歌曲名 查看' #add alternative name
elif sheet.cell(cell.row, cell.column+1).value == new:
return "该别名已存在库中" #Check for existing alternative name
elif cell.value == None:
break
return '歌曲未找到,请确定格式为 /add 旧名称|||新名称' #when no song found
database = 'myDB.sqlite3'
# 计算自从游玩时间到现在过了多久,返回时间间隔(N秒前、N分钟前、N小时前、N天前)
def timeDelta(string):
past = datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S")
now = datetime.datetime.now()
deltaSeconds = (now - past).total_seconds()
if deltaSeconds > 86400:
return str(round(deltaSeconds / 86400)) + "天前"
if deltaSeconds > 3600:
return str(round(deltaSeconds / 3600)) + "小时前"
if deltaSeconds > 60:
return str(round(deltaSeconds / 60)) + "分钟前"
else:
return str(round(deltaSeconds)) + "秒前"
# 通过QQ在数据库中查询token,如果有则返回token,没有则返回None
def query_token(qq):
conn = sqlite3.connect(database)
c = conn.cursor()
c.execute('SELECT * FROM info WHERE qq = (?)', (qq,))
try:
return c.fetchone()[3]
except:
return None
finally:
conn.close()
def get_diff_name(music_id, diff_number):
diff_str = ""
if diff_number == 0:
diff_str = "NOV"
if diff_number == 1:
diff_str = "ADV"
if diff_number == 2:
diff_str = "EXH"
if diff_number == 4:
diff_str = "MXM"
if diff_number == 3:
conn = sqlite3.connect(database)
c = conn.cursor()
c.execute('select infver from diffinfo where ID = (?)', (music_id,))
diff_str = c.fetchone()[0]
return diff_str
def get_music(string):
conn = sqlite3.connect(database)
c = conn.cursor()
music = c.execute("select id,name from diffinfo where upper(name) = upper('%s')" % string).fetchone()
if not music:
music = c.execute('select id,name from diffinfo where name like "%%%s%%" order by id DESC' % string).fetchone()
return music
def get_rank(input_str, diff, qq):
token = query_token(qq)
music_id = str(get_music(song_find(input_str))[0])
url = "https://iot.universal-space.cn/api/konami/music5/musicRank?musicId=" + music_id + "&musicGrade=" + str(diff)
playdata = requests.get(url, headers={'token': token})
json_str = playdata.json()
if json_str["code"] == 403:
return get_new_token(qq)
else:
score = json_str["data"]["rankInfo"]
img_file = "jk_" + str(music_id).zfill(4) + "_" + str(
diff + 1) + ".png"
print(img_file)
if not json_str["data"]["rankInfo"]:
return "你好像还没打过这首歌"
if os.path.exists(r'/usr/bot/httpserver/img/' + img_file):
img_url = "http://127.0.0.1/img/jk_" + str(music_id).zfill(4) + "_" + str(diff + 1) + ".png"
else:
img_url = "http://127.0.0.1/img/jk_" + str(music_id).zfill(4) + "_" + str(1) + ".png"
a = "[CQ:image,file=" + img_url + "]" + str(score["musicName"]) + " [" + get_diff_name(music_id, diff) + "]\n" + \
str(score["artistName"]) + "\n分数:" + str(score["score"]) + "\n排名:" + str(int(score["rank"])) + " [CQ:at,qq=" + qq + "]"
return a
def get_rank_for_pb(music_id, diff, qq):
token = query_token(qq)
url = "https://iot.universal-space.cn/api/konami/music5/musicRank?musicId=" + str(music_id) + "&musicGrade=" + str(diff)
playdata = requests.get(url, headers={'token': token})
json_str = playdata.json()
score = json_str["data"]["rankInfo"]
return score["rank"]
@on_command('.nov', aliases=('.nov', '.n'), only_to_me=False)
async def get_recent_score(session: CommandSession):
qq = str(session.ctx['user_id'])
input_str = session.current_arg_text.strip()
await session.send(get_rank(input_str, 0, qq))
@on_command('.adv', aliases=('.adv', '.a'), only_to_me=False)
async def get_recent_score(session: CommandSession):
qq = str(session.ctx['user_id'])
input_str = session.current_arg_text.strip()
await session.send(get_rank(input_str, 1, qq))
@on_command('.exh', aliases=('.exh', '.e'), only_to_me=False)
async def get_recent_score(session: CommandSession):
qq = str(session.ctx['user_id'])
input_str = session.current_arg_text.strip()
await session.send(get_rank(input_str, 2, qq))
@on_command('.my4', aliases=('.i', '.g', '.h', '.v', '.m'), only_to_me=False)
async def get_recent_score(session: CommandSession):
qq = str(session.ctx['user_id'])
input_str = session.current_arg_text.strip()
conn = sqlite3.connect(database)
c = conn.cursor()
c.execute('select id,infver,name from diffinfo where name like "%%%s%%" order by id DESC' % input_str)
music = c.fetchone()
if not music[1]:
await session.send("这首歌好像没有第四难度")
else:
if music[1] == "MXM":
await session.send(get_rank(input_str, 4, qq))
else:
await session.send(get_rank(input_str, 3, qq))
@on_command('.find', aliases=('.find', '.f'), only_to_me=False)
async def get_recent_score(session: CommandSession):
qq = str(session.ctx['user_id'])
input_str = session.current_arg_text.strip()
if not input_str:
conn = sqlite3.connect(database)
c = conn.cursor()
c.execute('SELECT * FROM diffinfo ORDER BY RANDOM() limit 1')
music = c.fetchone()
img_file = "jk_" + str(music[0]).zfill(4) + "_3.png"
if os.path.exists(r'/usr/bot/httpserver/img/' + img_file):
img_url = "http://127.0.0.1/img/jk_" + str(music[0]).zfill(4) + "_" + str(3) + ".png"
else:
img_url = "http://127.0.0.1/img/jk_" + str(music[0]).zfill(4) + "_" + str(1) + ".png"
await session.send("[CQ:at,qq=" + qq + "]随机抽到的歌曲是:" + music[2] + "[CQ:image,file=" + str(img_url)
+ "]")
else:
img_file = "jk_" + str(get_music(song_find(input_str))[0]).zfill(4) + "_3.png"
if os.path.exists(r'/usr/bot/httpserver/img/' + img_file):
img_url = "http://127.0.0.1/img/jk_" + str(get_music(song_find(input_str))[0]).zfill(4) + "_" + str(
3) + ".png"
else:
img_url = "http://127.0.0.1/img/jk_" + str(get_music(song_find(input_str))[0]).zfill(4) + "_" + str(
1) + ".png"
print(str(get_music(song_find(input_str))))
await session.send("您要找的歌曲是不是:" + get_music(song_find(input_str))[1] + "[CQ:image,file=" + str(img_url)
+ "]")
# 获取新token
def get_new_token(qq):
conn = sqlite3.connect(database)
c = conn.cursor()
try:
phone = str(c.execute('SELECT phone FROM info WHERE qq = (?)', (qq,)).fetchone()[0])
url = 'https://iot.universal-space.cn/api/sms/captcha/get/'
requests.post(url + phone)
return "获取失败,已重新申请验证码\n指令:.c 验证码"
except:
return "请先绑定手机。指令:\n.b 手机号"
finally:
conn.close()
# 获取最近5个游戏记录
@on_command('.recent', aliases=('.recent', '.r'), only_to_me=False)
async def get_recent_score(session: CommandSession):
qq = str(session.ctx['user_id'])
token = query_token(qq)
try:
num = int(session.current_arg_text.strip())
except:
num = 3
if not num:
num = 3
elif num > 6:
num = 6
# 获取成绩json
url = 'https://iot.universal-space.cn/api/mns/mnsGame/recordList?productId=3084&pageNo=1&pageSize=' + str(
num) + '&orderBy' \
'=gameDate '
playdata = requests.get(url, headers={'token': token})
json_str = playdata.json()
if json_str["code"] == 403:
await session.send(get_new_token(qq))
else:
# json处理和发送最终生成的成绩
i = 0
a = "[CQ:at,qq=" + qq + "]" + "最近的" + str(num) + "条游戏记录:"
while i < num:
a += "\n" + (str(json_str["data"][i]["musicName"]) + "[" +
get_diff_name(json_str["data"][i]["musicId"], json_str["data"][i]["musicGrade"])
+ "]\n" +
str(json_str["data"][i]["score"]) + " " +
json_str["data"][i]["clearTypeName"] + " " +
timeDelta(json_str["data"][i]["gameDate"])
)
i += 1
await session.send(a)
# 根据所给的数字查询从现在往前推第n个成绩
@on_command('.last', aliases=('.last', '.l'), only_to_me=False)
async def get_recent_score(session: CommandSession):
# 从数据库中得到token
qq = str(session.ctx['user_id'])
token = query_token(qq)
# 获取输入的参数,没有输入或者输入错误默认为1
num = session.current_arg_text.strip()
if not num:
num = 1
# 获取成绩json
url = 'https://iot.universal-space.cn/api/mns/mnsGame/recordList?productId=3084&pageNo=' + str(num) + \
'&pageSize=1&orderBy=gameDate'
playdata = requests.get(url, headers={'token': token})
json_str = playdata.json()
if json_str["code"] == 403:
await session.send(get_new_token(qq))
elif json_str["pageSize"] == 0:
await session.send("数字过大, 你可能还没有玩那么多首歌")
else:
# json处理和发送最终生成的成绩,包含了图片
score = json_str["data"][0]
img_file = "jk_" + str(score["musicId"]).zfill(4) + "_" + str(
score["musicGrade"] + 1) + ".png"
print(img_file)
if os.path.exists(r'/usr/bot/httpserver/img/' + img_file):
img_url = "http://127.0.0.1/img/jk_" + str(score["musicId"]).zfill(4) + "_" + str(
score["musicGrade"] + 1) + ".png"
else:
img_url = "http://127.0.0.1/img/jk_" + str(score["musicId"]).zfill(4) + "_" + str(
1) + ".png"
print(img_url)
# 判断是否是个人最佳
if score["highestScore"] == score["score"]:
is_pb = "PB #" + str(int(get_rank_for_pb(score["musicId"], score["musicGrade"], qq)))
else:
is_pb = ""
a = "[CQ:image,file=" + str(img_url
+ "]" +
str(score["musicName"]) + "[" +
get_diff_name(score["musicId"], score["musicGrade"])
+ "]\n" +
str(score["score"]) + " " +
str(score["criticalCount"]) + "/" +
str(score["nearCount"]) + "/" +
str(score["errorCount"]) + " " + is_pb + "\n" +
score["clearTypeName"] + " " +
timeDelta(json_str["data"][0]["gameDate"])
) + " [CQ:at,qq=" + qq + "]"
print('\n\n' + a + '\n\n')
await session.send(a)
@on_command('.bind', aliases=('.bind', '.b'), only_to_me=False)
async def get_captcha(session: CommandSession):
qq = str(session.ctx['user_id'])
url = 'https://iot.universal-space.cn/api/sms/captcha/get/'
phone = session.current_arg_text.strip()
# 更新数据库中的手机
conn = sqlite3.connect(database)
c = conn.cursor()
try:
old_phone = str(c.execute('SELECT phone FROM info WHERE qq = (?)', (qq,)).fetchone()[0])
if old_phone:
c.execute("UPDATE info SET phone = '%s' WHERE QQ = %s;" % (phone, qq))
except:
c.execute(r"INSERT INTO info (qq,phone) VALUES (%s,%s);" % (qq, phone))
finally:
conn.commit()
requests.post(url + phone)
await session.send("已请求验证码,请注意认证指令已经变为:\n.c 验证码")
conn.close()
@on_command('.captcha', aliases=('.captcha', '.c'), only_to_me=False)
async def auth_captcha(session: CommandSession):
qq = str(session.ctx['user_id'])
url = 'https://iot.universal-space.cn/api/unis/Myself/loginUser?mobile='
captcha = str(session.current_arg_text.strip())
conn = sqlite3.connect(database)
c = conn.cursor()
a = ""
try:
phone = str(c.execute('SELECT phone FROM info WHERE qq = (?)', (qq,)).fetchone()[0])
if phone:
response = requests.post(url + phone + "&captcha=" + captcha)
s = json.loads(response.text)
print(s)
if s["retCode"] in [404, 103]:
a = "绑定失败,可能是验证码错误"
else:
token = (s["data"]["token"])
c.execute("UPDATE info SET token = '%s' WHERE QQ = %s;" % (token, qq))
conn.commit()
a = "绑定成功"
else:
a = "请先发送验证码。命令:\n.b 手机号"
finally:
await session.send(a)
conn.close()
#Add song alternative name
@on_command('add')
async def add(session: CommandSession):
user_id = session.ctx['user_id']
text = session.current_arg_text.strip()
if text.find('|||') == -1:
await session.send('请正确输入命令,用法: /add 旧名字|||新别名')
else:
old, new = text.split('|||', 1)
await session.send(alter_add(new, old, user_id))