-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
258 lines (210 loc) · 7.85 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
"""
Copyright 2021-2025 AstreaTSS.
This file is part of PYTHIA.
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at https://mozilla.org/MPL/2.0/.
"""
import asyncio
import contextlib
import functools
import logging
import os
import subprocess
import sys
from collections import defaultdict
import interactions as ipy
import sentry_sdk
import typing_extensions as typing
from interactions.ext import hybrid_commands as hybrid
from interactions.ext import prefixed_commands as prefixed
from prisma import Prisma
from load_env import load_env
load_env()
import common.help_tools as help_tools
import common.models as models
import common.utils as utils
if typing.TYPE_CHECKING:
import discord_typings
logger = logging.getLogger("pythiabot")
logger.setLevel(logging.INFO)
handler = logging.FileHandler(
filename=os.environ["LOG_FILE_PATH"], encoding="utf-8", mode="a"
)
handler.setFormatter(
logging.Formatter("%(asctime)s:%(levelname)s:%(name)s: %(message)s")
)
logger.addHandler(handler)
logger.addHandler(logging.StreamHandler(sys.stdout))
def default_sentry_filter(
event: dict[str, typing.Any], hint: dict[str, typing.Any]
) -> typing.Optional[dict[str, typing.Any]]:
if "log_record" in hint:
record: logging.LogRecord = hint["log_record"]
if "interactions" in record.name or "pythiabot" in record.name:
# there are some logging messages that are not worth sending to sentry
if ": 403" in record.message:
return None
if ": 404" in record.message:
return None
if record.message.startswith("Ignoring exception in "):
return None
if record.message.startswith("Unsupported channel type for "):
# please shut up
return None
if "exc_info" in hint:
exc_type, exc_value, tb = hint["exc_info"]
if isinstance(exc_value, KeyboardInterrupt):
# We don't need to report a ctrl+c
return None
return event
class MyHookedTask(ipy.Task):
def on_error_sentry_hook(self: ipy.Task, error: Exception) -> None:
scope = sentry_sdk.Scope.get_current_scope()
if isinstance(self.callback, functools.partial):
scope.set_tag("task", self.callback.func.__name__)
else:
scope.set_tag("task", self.callback.__name__)
scope.set_tag("iteration", self.iteration)
sentry_sdk.capture_exception(error)
# im so sorry
if utils.SENTRY_ENABLED:
ipy.Task.on_error_sentry_hook = MyHookedTask.on_error_sentry_hook
sentry_sdk.init(dsn=os.environ["SENTRY_DSN"], before_send=default_sentry_filter)
class PYTHIA(utils.THIABase):
@ipy.listen("ready")
async def on_ready(self) -> None:
utcnow = ipy.Timestamp.utcnow()
time_format = f"<t:{int(utcnow.timestamp())}:f>"
connect_msg = (
f"Logged in at {time_format}!"
if self.init_load
else f"Reconnected at {time_format}!"
)
await self.owner.send(connect_msg)
self.init_load = False
activity = ipy.Activity(
name="Status",
type=ipy.ActivityType.CUSTOM,
state="Assisting servers | pythia.astrea.cc",
)
await self.change_presence(activity=activity)
@ipy.listen("resume")
async def on_resume_func(self) -> None:
activity = ipy.Activity(
name="Status",
type=ipy.ActivityType.CUSTOM,
state="Assisting servers | pythia.astrea.cc",
)
await self.change_presence(activity=activity)
# technically, this is in ipy itself now, but its easier for my purposes to do this
@ipy.listen("raw_application_command_permissions_update")
async def i_like_my_events_very_raw(
self, event: ipy.events.RawGatewayEvent
) -> None:
data: discord_typings.GuildApplicationCommandPermissionData = event.data # type: ignore
guild_id = int(data["guild_id"])
if not self.slash_perms_cache[guild_id]:
await help_tools.process_bulk_slash_perms(self, guild_id)
return
cmds = help_tools.get_commands_for_scope_by_ids(self, guild_id)
if cmd := cmds.get(int(data["id"])):
self.slash_perms_cache[guild_id][
int(data["id"])
] = help_tools.PermissionsResolver(
cmd.default_member_permissions, guild_id, data["permissions"] # type: ignore
)
@ipy.listen(is_default_listener=True)
async def on_error(self, event: ipy.events.Error) -> None:
await utils.error_handle(event.error, ctx=event.ctx)
@property
def guild_count(self) -> int:
return len(self.user._guild_ids or ())
def create_task(self, coro: typing.Coroutine) -> asyncio.Task:
# see the "important" note below for why we do this (to prevent early gc)
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
task = asyncio.create_task(coro)
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)
return task
async def stop(self) -> None:
await self.db.disconnect()
await super().stop()
intents = ipy.Intents.new(
guilds=True,
guild_emojis_and_stickers=True,
messages=True,
reactions=True,
message_content=True,
guild_members=True,
)
mentions = ipy.AllowedMentions.all()
bot = PYTHIA(
activity=ipy.Activity(
name="Status", type=ipy.ActivityType.CUSTOM, state="Loading..."
),
status=ipy.Status.IDLE,
sync_interactions=False, # big bots really shouldn't have this on
sync_ext=False,
disable_dm_commands=True,
allowed_mentions=mentions,
intents=intents,
interaction_context=utils.THIAInteractionContext,
slash_context=utils.THIASlashContext,
modal_context=utils.THIAModalContext,
auto_defer=ipy.AutoDefer(enabled=True, time_until_defer=0),
message_cache=ipy.utils.TTLCache(30, 50, 150),
channel_cache=ipy.utils.TTLCache(600, 50, 250),
scheduled_events_cache=ipy.utils.NullCache(),
voice_state_cache=ipy.utils.NullCache(),
logger=logger,
)
bot.init_load = True
bot.slash_perms_cache = defaultdict(dict)
bot.mini_commands_per_scope = {}
bot.background_tasks = set()
bot.msg_enabled_bullets_guilds = set()
bot.color = ipy.Color(int(os.environ["BOT_COLOR"])) # #723fb0 or 7487408
prefixed.setup(bot, prefixed_context=utils.THIAPrefixedContext)
hybrid.setup(bot, hybrid_context=utils.THIAHybridContext)
async def start() -> None:
db = Prisma(
auto_register=True,
datasource={"url": os.environ["DB_URL"]},
http={"http2": True},
)
await db.connect()
bot.db = db
for model in await models.BulletConfig.prisma().find_many(
where={
"bullets_enabled": True,
"investigation_type": {"not": models.InvestigationType.COMMAND_ONLY},
}
):
bot.msg_enabled_bullets_guilds.add(model.guild_id)
ext_list = utils.get_all_extensions(os.environ["DIRECTORY_OF_FILE"])
for ext in ext_list:
if "voting" in ext and not utils.VOTING_ENABLED:
continue
try:
bot.load_extension(ext)
except ipy.errors.ExtensionLoadException:
raise
await bot.astart(os.environ["MAIN_TOKEN"])
if __name__ == "__main__":
run_method = asyncio.run
# use uvloop if possible
with contextlib.suppress(ImportError):
import uvloop # type: ignore
run_method = uvloop.run
if (
utils.DOCKER_ENABLED
and os.environ.get("DO_NOT_MIGRATE") not in utils.OS_TRUE_VALUES
):
import sys
subprocess.run(
[sys.executable, "-m", "prisma", "migrate", "deploy"],
check=True,
env={"DB_URL": os.environ["DB_URL"]},
)
run_method(start())