-
Notifications
You must be signed in to change notification settings - Fork 851
/
Copy pathmain.py
158 lines (142 loc) · 5.51 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import json
import os
import sys
from loguru import logger
import warnings
import asyncio
import aiohttp
import itertools
from src import BiliUser
log = logger.bind(user="B站粉丝牌助手")
__VERSION__ = "0.3.8"
warnings.filterwarnings(
"ignore",
message="The localize method is no longer necessary, as this time zone supports the fold attribute",
)
os.chdir(os.path.dirname(os.path.abspath(__file__)).split(__file__)[0])
try:
if os.environ.get("USERS"):
users = json.loads(os.environ.get("USERS"))
else:
import yaml
with open("users.yaml", "r", encoding="utf-8") as f:
users = yaml.load(f, Loader=yaml.FullLoader)
assert users["ASYNC"] in [0, 1], "ASYNC参数错误"
assert users["LIKE_CD"] >= 0, "LIKE_CD参数错误"
# assert users['SHARE_CD'] >= 0, "SHARE_CD参数错误"
assert users["DANMAKU_CD"] >= 0, "DANMAKU_CD参数错误"
assert users["WATCHINGLIVE"] >= 0, "WATCHINGLIVE参数错误"
assert users["WEARMEDAL"] in [0, 1], "WEARMEDAL参数错误"
config = {
"ASYNC": users["ASYNC"],
"LIKE_CD": users["LIKE_CD"],
# "SHARE_CD": users['SHARE_CD'],
"DANMAKU_CD": users["DANMAKU_CD"],
"WATCHINGLIVE": users["WATCHINGLIVE"],
"WEARMEDAL": users["WEARMEDAL"],
"SIGNINGROUP": users.get("SIGNINGROUP", 2),
"PROXY": users.get("PROXY"),
}
except Exception as e:
log.error(f"读取配置文件失败,请检查配置文件格式是否正确: {e}")
exit(1)
@log.catch
async def main():
messageList = []
session = aiohttp.ClientSession(trust_env=True)
try:
log.warning("当前版本为: " + __VERSION__)
resp = await (
await session.get(
"http://version.fansmedalhelper.1961584514352337.cn-hangzhou.fc.devsapp.net/"
)
).json()
if resp["version"] != __VERSION__:
log.warning("新版本为: " + resp["version"] + ",请更新")
log.warning("更新内容: " + resp["changelog"])
messageList.append(f"当前版本: {__VERSION__} ,最新版本: {resp['version']}")
messageList.append(f"更新内容: {resp['changelog']} ")
if resp["notice"]:
log.warning("公告: " + resp["notice"])
messageList.append(f"公告: {resp['notice']}")
except Exception as ex:
messageList.append(f"检查版本失败,{ex}")
log.warning(f"检查版本失败,{ex}")
initTasks = []
startTasks = []
catchMsg = []
for user in users["USERS"]:
if user["access_key"]:
biliUser = BiliUser(
user["access_key"],
user.get("white_uid", ""),
user.get("banned_uid", ""),
config,
)
initTasks.append(biliUser.init())
startTasks.append(biliUser.start())
catchMsg.append(biliUser.sendmsg())
try:
await asyncio.gather(*initTasks)
await asyncio.gather(*startTasks)
except Exception as e:
log.exception(e)
# messageList = messageList + list(itertools.chain.from_iterable(await asyncio.gather(*catchMsg)))
messageList.append(f"任务执行失败: {e}")
finally:
messageList = messageList + list(
itertools.chain.from_iterable(await asyncio.gather(*catchMsg))
)
[log.info(message) for message in messageList]
if users.get("SENDKEY", ""):
await push_message(session, users["SENDKEY"], " \n".join(messageList))
await session.close()
if users.get("MOREPUSH", ""):
from onepush import notify
notifier = users["MOREPUSH"]["notifier"]
params = users["MOREPUSH"]["params"]
await notify(
notifier,
title=f"【B站粉丝牌助手推送】",
content=" \n".join(messageList),
**params,
proxy=config.get("PROXY"),
)
log.info(f"{notifier} 已推送")
def run(*args, **kwargs):
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
log.info("任务结束,等待下一次执行。")
async def push_message(session, sendkey, message):
url = f"https://sctapi.ftqq.com/{sendkey}.send"
data = {"title": f"【B站粉丝牌助手推送】", "desp": message}
await session.post(url, data=data)
log.info("Server酱已推送")
if __name__ == "__main__":
cron = users.get("CRON", None)
if cron:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
log.info(f"使用内置定时器 {cron},开启定时任务,等待时间到达后执行。")
schedulers = BlockingScheduler()
schedulers.add_job(run, CronTrigger.from_crontab(cron), misfire_grace_time=3600)
schedulers.start()
elif "--auto" in sys.argv:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
import datetime
log.info("使用自动守护模式,每隔 24 小时运行一次。")
scheduler = BlockingScheduler(timezone="Asia/Shanghai")
scheduler.add_job(
run,
IntervalTrigger(hours=24),
next_run_time=datetime.datetime.now(),
misfire_grace_time=3600,
)
scheduler.start()
else:
log.info("未配置定时器,开启单次任务。")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
log.info("任务结束")