forked from findmover/wxread
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpush.py
70 lines (62 loc) · 2.51 KB
/
push.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# push.py
import os
import requests
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PushNotification:
def __init__(self):
self.pushplus_url = "https://www.pushplus.plus/send"
self.telegram_base_url = "https://api.telegram.org/bot{}/sendMessage"
self.headers = {
'User-Agent': 'Apifox/1.0.0 (https://apifox.com)'
}
def push_pushplus(self, content, token):
"""
Send notification via PushPlus
"""
try:
params = {
"token": token,
"content": content
}
response = requests.get(self.pushplus_url, headers=self.headers, params=params)
response.raise_for_status()
logger.info("PushPlus Response: %s", response.text)
return True
except Exception as e:
logger.error("PushPlus通知发送失败: %s", str(e))
return False
def push_telegram(self, content, bot_token, chat_id):
"""
Telegram通知
"""
try:
url = self.telegram_base_url.format(bot_token)
params = {
"chat_id": chat_id,
"text": content
}
response = requests.post(url, json=params)
response.raise_for_status()
logger.info("Telegram Response: %s", response.text)
return True
except Exception as e:
logger.error("Telegram通知发送失败: %s", str(e))
return False
def push(content, method, pushplus_token=None, telegram_bot_token=None, telegram_chat_id=None):
"""
统一推送接口
"""
notifier = PushNotification()
if method == "pushplus":
if not pushplus_token:
pushplus_token = os.getenv("PUSHPLUS_TOKEN", "YOUR_PUSHPLUS_TOKEN") # 替换为你的PushPlus token
return notifier.push_pushplus(content, pushplus_token)
elif method == "telegram":
if not all([telegram_bot_token, telegram_chat_id]):
telegram_bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "YOUR_BOT_TOKEN") # 替换为你的Telegram bot token
telegram_chat_id = os.getenv("TELEGRAM_CHAT_ID", "YOUR_CHAT_ID") # 替换为你的Telegram chat ID
return notifier.push_telegram(content, telegram_bot_token, telegram_chat_id)
else:
raise ValueError("无效的通知渠道. 请选择 'pushplus' 或者 'telegram'")