forked from 4KaNE/Discord-Lottery-Bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjson_handler.py
257 lines (220 loc) · 8.94 KB
/
json_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
"""This is a discord-lottery-Bot's module that handles json file"""
import json
import datetime
import time
from enum import IntEnum, auto
class Status(IntEnum):
NONE = auto()
INSERT = auto()
UPDATE = auto()
NO_UPDATE = auto()
IGN_KEY_ERROR = auto()
EXCEPTION = auto()
class JsonHandler:
"""Class to read and write json file.
"""
def __init__(self, logger, config):
self.json_file = 'userData.json'
self.logger = logger
self.config = config
self.first_day = self.config.kuji_first_day
self.last_day = self.config.kuji_last_day
def set_ign(self, discord_id, ign):
"""Save IGN with DiscordId.
Key: DiscordId ,Value: IGN
Return value:
dump = boolean
"""
json_data = self._open_json()
# IGNの判定を行う
ign_dict = dict(json_data["IGN"])
if discord_id in ign_dict.keys():
former_ign = json_data["IGN"][discord_id]
if former_ign != ign:
if ign in ign_dict.values():
can_execute = Status.IGN_KEY_ERROR
else:
can_execute = Status.UPDATE
else:
can_execute = Status.NO_UPDATE
else:
if ign in ign_dict.values():
can_execute = Status.IGN_KEY_ERROR
else:
can_execute = Status.INSERT
# IGN変更の場合、すでに登録されたresultsを差し替える
if can_execute == Status.UPDATE:
if former_ign in json_data["Lottery_results"]:
user_result = dict(json_data["Lottery_results"][former_ign])
del json_data["Lottery_results"][former_ign]
json_data["Lottery_results"][ign] = user_result
if can_execute == Status.INSERT or can_execute == Status.UPDATE:
json_data["IGN"][discord_id] = ign
try:
with open(self.json_file, 'w') as json_file:
json.dump(json_data, json_file, ensure_ascii=False, indent=4,
sort_keys=True, separators=(',', ': '))
except json.JSONDecodeError as e:
self.logger.error(f'JSONDecodeError: {e}')
can_execute = Status.EXCEPTION
except FileNotFoundError as e:
self.logger.error(f'FileNotFoundError: {e}')
can_execute = Status.EXCEPTION
return can_execute
def add_result(self, discord_id, result):
"""Save lottery result in json file.
"Lottery_results":{"IGN":{"日付":"くじの結果"}}
"""
now = datetime.datetime.now()
json_data = self._open_json()
ign = json_data["IGN"][discord_id]
date_dict = {}
date_dict["result"] = result
date_dict["time"] = now.time().strftime('%H:%M:%S')
if ign in json_data["Lottery_results"]:
json_data["Lottery_results"][ign][now.date().strftime('%Y/%m/%d')] = date_dict
else:
result_dict = {}
result_dict[now.date().strftime('%Y/%m/%d')] = date_dict
json_data["Lottery_results"][ign] = result_dict
with open(self.json_file, 'w') as json_file:
json.dump(json_data, json_file, ensure_ascii=False, indent=4,\
sort_keys=True, separators=(',', ': '))
return now.strftime('%m/%d %H:%M:%S')
def add_ranking(self, discord_id, result):
"""Save the top 20 data in the json file.
"""
pass
def check_today_result(self, discord_id):
"""Confirm whether today's lottery result is saved or not.
Return value:
has_ign = boolean
If already saved result=result, if not result = none
"""
now = datetime.datetime.now()
ign = self._check_ign(discord_id)
has_ign = False
today_result = None
if ign is not None:
has_ign = True
json_data = self._open_json()
if ign in json_data["Lottery_results"]:
datetime_key = now.date().strftime('%Y/%m/%d')
if datetime_key in json_data["Lottery_results"][ign]:
today_result = json_data["Lottery_results"][ign]\
[datetime_key]["result"]
return has_ign, today_result
def calc_previous_day_stats(self):
"""Calculate statistics of the previous day
Return pd_stats_dict
pd_stats_dict = {
"pd_date": "2018/7/22",
"players": 50,
"days_left",
"ranks": [
[1
"xxxxxxxxxxxxxxxxxx",
300000]
]
2nd3rd4th5th}
"""
pd_stats_dict = {}
now = datetime.datetime.now()
yesterday = now - datetime.timedelta(days=1)
pd_key = yesterday.strftime('%Y/%m/%d')
pd_stats_dict["pd_date"] = pd_key
days_left = (self.last_day - now.date()).days + 1
pd_stats_dict["days_left"] = days_left
pd_result_list = []
json_data = self._open_json()
for ign in json_data["Lottery_results"].keys():
pd_userdata = json_data["Lottery_results"][ign].get(pd_key)
if pd_userdata is not None:
# 昨日のIGNと値をdictに格納
pd_result_list.append([ign, pd_userdata["result"], pd_userdata["time"]])
pd_stats_dict["players"] = len(pd_result_list)
pd_stats_dict["ranks"] = []
# ランク順で昨日のデータを取得
count = 0
for v in sorted(pd_result_list, key=lambda x: (-x[1], x[2], x[0])):
# ソート順確認
self.logger.info(f"result={v[1]}、time={v[2]}、id={v[0]}")
count += 1
pd_stats_dict["ranks"].append([count, self._check_discord_id(v[0]), v[1]])
if count >= self.config.rank_count:
break
return pd_stats_dict
def period_stats(self):
"""Infomation statistics of the event period
Return period_stats_dict
period_stats_dict = {
"players": 50,
"number_of_lotteries":
"ranks": [
[1
"xxxxxxxxxxxxxxxxxx",
300000]
]
2nd3rd4th5th}
"""
period_stats_dict = {}
result_list = []
date_key_list = self.config.kuji_days
json_data = self._open_json()
for ign in json_data["Lottery_results"].keys():
for date_key in date_key_list:
key = date_key.strftime('%Y/%m/%d')
date_userdata = json_data["Lottery_results"][ign].get(key)
if date_userdata is not None:
# 期間中のIGNと値と時刻をlistに格納
result_list.append([ign, date_userdata["result"], date_key, date_userdata["time"]])
period_stats_dict["players"] = len(json_data["Lottery_results"])
period_stats_dict["number_of_lotteries"] = len(result_list)
period_stats_dict["ranks"] = []
# ランク順で期間中のデータを取得
count = 0
for v in sorted(result_list, key=lambda x: (-x[1], x[2], x[3], x[0])):
# ソート順確認
self.logger.info(f"result={v[1]}、day={v[2]}、time={v[3]}、id={v[0]}")
count += 1
period_stats_dict["ranks"].append([count, self._check_discord_id(v[0]), v[1]])
if count >= self.config.rank_count:
break
self.logger.info(period_stats_dict["ranks"])
return period_stats_dict
def _check_ign(self, discord_id):
"""Check if IGN is saved with discordId.
Return value:
ign = (str or None)
"""
with open(self.json_file, 'r') as json_file:
json_data = json.load(json_file)
if str(discord_id) in json_data["IGN"]:
ign = json_data["IGN"][discord_id]
else:
ign = None
return ign
def _check_discord_id(self, ign):
"""Return discordId from ign
"""
json_data = self._open_json()
try:
discord_id = [k for k, v in json_data["IGN"].items() if v == ign][0]
except IndexError:
discord_id = None
return discord_id
def _open_json(self):
"""Open json file and return it as dict type.
"""
with open(self.json_file, 'r') as json_file:
try:
json_data = json.load(json_file)
except json.decoder.JSONDecodeError as error:
print("Error occurred")
print("type:" + str(type(error)))
print("args:" + str(error.args))
return json_data
if __name__ == '__main__':
JH = JsonHandler()
print(JH.period_stats())
time.sleep(3000)