-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathjd_farm_automation.py
168 lines (132 loc) · 4.55 KB
/
jd_farm_automation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""
new Env('农场自动种植兑换');
cron 5 */2 * * * jd_farm_automation.py
"""
import asyncio
import datetime
import json
import logging
import os
import aiohttp
from dotmap import DotMap
from qinglong import init_ql
try:
from notify import send
except:
send = lambda *args: ...
logger = logging.getLogger(__name__)
ql = {
"host": os.environ.get("host"),
"client_id": os.environ.get("client_id"),
"client_secret": os.environ.get("client_secret"),
"token": None,
}
level = "2"
headers = {
"Connection": "keep-alive",
"Accept": "*/*",
"Host": "api.m.jd.com",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_5_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.4(0x1800042c) NetType/4G Language/zh_CN miniProgram",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-cn",
}
async def get_cookies(qinglong):
envs = qinglong.get_env()
return list(filter(lambda x: "name" in x and x["name"] == "JD_COOKIE", envs))
async def api(cookie, fn, body):
# url = f"""https://api.m.jd.com/client.action?functionId=${fn}&body=${json.dumps(body)}&client=apple&clientVersion=10.0.4&osVersion=13.7&appid=wh5&loginType=2&loginWQBiz=interact"""
url = f"""https://api.m.jd.com/client.action?functionId={fn}&client=apple&clientVersion=10.0.4&osVersion=13.7&appid=wh5&loginType=2&loginWQBiz=interact"""
headers["Cookie"] = cookie
async with aiohttp.ClientSession(headers=headers) as session:
for _ in range(5):
try:
async with session.get(url, params={"body": json.dumps(body)}) as r:
json_body = await r.json(content_type=None)
return DotMap(json_body)
except Exception as e:
logger.error(e)
await asyncio.sleep(0.01)
def format_timestamp(t):
dt = datetime.datetime.fromtimestamp(t / 1000)
return dt.strftime("%Y-%m-%d %H:%M:%S")
async def main_(cookie_dict):
cookie = cookie_dict["value"]
name = cookie_dict["remarks"].split("@@")[0]
msg = f"【{name}】:"
info = await api(
cookie, "initForFarm", {"version": 11, "channel": 3, "babelChannel": 0}
)
if info.code != "0":
logger.info("可能没开通农场或者黑透了!!!")
if info.farmUserPro.treeState == 1:
return
if info.farmUserPro.treeState == 2:
logger.info(
f"{info.farmUserPro.name},种植时间:{format_timestamp(info.farmUserPro.createTime)}"
)
coupon = await api(
cookie, "gotCouponForFarm", {"version": 11, "channel": 3, "babelChannel": 0}
)
logger.info(coupon)
msg
info = await api(
cookie, "initForFarm", {"version": 11, "channel": 3, "babelChannel": 0}
)
if info.farmUserPro.treeState == 3:
hongBao = info.myHongBaoInfo.hongBao
msg += f"已兑换{hongBao.discount}红包,{format_timestamp(hongBao.endTime)}过期"
element = info.farmLevelWinGoods[level][0] or None
if not element:
logger.info("'种子已抢完,下次在来!!!\n'")
return
info = await api(
cookie,
"choiceGoodsForFarm",
{
"imageUrl": "",
"nickName": "",
"shareCode": "",
"goodsType": element.type,
"type": "0",
"version": 11,
"channel": 3,
"babelChannel": 0,
},
)
if info.code * 1 == 0:
msg += f"\n再次种植【${info.farmUserPro.name}】"
a = await api(
cookie,
"gotStageAwardForFarm",
{"type": "4", "version": 11, "channel": 3, "babelChannel": 0},
)
b = await api(
cookie,
"waterGoodForFarm",
{"type": "", "version": 11, "channel": 3, "babelChannel": 0},
)
c = await api(
cookie,
"gotStageAwardForFarm",
{"type": "1", "version": 11, "channel": 3, "babelChannel": 0},
)
return msg
async def main():
qinglong = init_ql()
cookies = await get_cookies(qinglong)
task_list = []
for cookie_dict in cookies:
task = asyncio.create_task(main_(cookie_dict))
task_list.append(task)
done, pending = await asyncio.wait(task_list, timeout=None)
# 得到执行结果
send_msg = []
for done_task in done:
res = done_task.result()
if res and res != "None":
print(f"{res}")
send_msg.append(res)
if send_msg:
send("农场种植成功", "\n".join(send_msg))
if __name__ == "__main__":
asyncio.run(main())