-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
164 lines (136 loc) · 5.65 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import asyncio
import datetime
import sys
import traceback
import asyncpg
import discord
from discord.ext import commands
import config
from cogs import meta, utils
initial_extensions = (
'cogs.admin',
'cogs.lfg',
'cogs.lobby',
'cogs.meta',
'cogs.misc',
'cogs.mod',
'cogs.reports',
'cogs.roles',
'cogs.streams',
'cogs.tags',
'cogs.tickets',
)
class PWBot(commands.Bot):
def __init__(self):
intents = discord.Intents.none()
intents.value = 46855
allowed_mentions = discord.AllowedMentions(everyone=False, users=True, roles=False)
super().__init__(command_prefix='?', fetch_offline_members=False,
help_command=meta.PWBotHelp(command_attrs={
'brief': 'Display all commands available',
'help': 'Display all commands available,\
will display additional info if a command is specified'
}), allowed_mentions=allowed_mentions, intents=intents
)
self.client_id = config.client_id
self.settings = utils.Settings()
self.weather_key = config.weather_key
loop = asyncio.get_event_loop()
try:
self.pool = loop.run_until_complete(asyncpg.create_pool(config.postgresql))
except Exception:
print('Failed set up PostgreSQL pool, exiting')
raise
for extension in initial_extensions:
try:
self.load_extension(extension)
except commands.ExtensionError:
print(f'Failed to load extension {extension}', file=sys.stderr)
traceback.print_exc()
async def on_ready(self):
if not hasattr(self, 'uptime'):
self.uptime = datetime.datetime.utcnow()
print(f'Ready: {self.user} (ID: {self.user.id})')
async def on_command_error(self, ctx, error):
ignored_errors = (
commands.UserInputError,
commands.CommandNotFound,
commands.CheckFailure
)
if isinstance(error, ignored_errors):
print(f'Ignoring error: {error}')
if isinstance(error, commands.CommandInvokeError):
original = error.original
if isinstance(original, discord.HTTPException):
print(f'Unexpected HTTPException: {original}')
tb = ''.join(traceback.format_exception(
type(original), original, original.__traceback__
))
print(f'Inside command {ctx.command.qualified_name}:', file=sys.stderr)
traceback.print_tb(original.__traceback__)
print(f'{original.__class__.__name__}: {original}', file=sys.stderr)
await ctx.send(f'```py\n{tb}\n```')
async def invoke(self, ctx):
# This is copied from commands.Bot.invoke, we still want to execute commands like usual
if ctx.command is not None:
self.dispatch('command', ctx)
try:
if await self.can_run(ctx, call_once=True):
await ctx.command.invoke(ctx)
else:
raise commands.errors.CheckFailure(
'The global check once functions failed.'
)
except commands.errors.CommandError as exc:
await ctx.command.dispatch_error(ctx, exc)
else:
self.dispatch('command_completion', ctx)
elif ctx.invoked_with: # This is edited to first try to send a tag
try:
await ctx.send_tag(ctx.invoked_with.lower())
except commands.errors.CommandNotFound:
exc = commands.errors.CommandNotFound(
'Command or tag "{}" is not found'.format(ctx.invoked_with)
)
self.dispatch('command_error', ctx, exc)
async def process_commands(self, message):
ctx = await self.get_context(message, cls=utils.Context)
try:
await self.invoke(ctx)
finally:
# In case we have any outstanding database connections
await ctx.release()
async def fetch_tag(self, name, *, conn=None):
"""Fetch a tag's content from the database."""
conn = conn or self.pool
query = """SELECT tag_content.value
FROM tags
INNER JOIN tag_content ON tag_content.id = tags.content_id
WHERE tags.name=$1 LIMIT 1;
"""
return await conn.fetchval(query, name)
async def create_content(self, content, *, conn=None):
"""Create and insert content into the database, returns the created id
so that it can be used to create a tag. This does not handle any errors
that may occur while inserting.
It is recommended that `conn` is through a transaction.
"""
conn = conn or self.pool
query = """INSERT INTO tag_content (value)
VALUES ($1)
RETURNING id;
"""
return await conn.fetchval(query, content)
async def create_tag(self, name, content_id, *, conn=None):
"""Create and insert a tag into the database, returns the created id.
This doesn't handle any error that could happen while inserting.
It is recommended that `conn` is through a transaction.
"""
conn = conn or self.pool
query = """INSERT INTO tags (content_id, name)
VALUES ($2, $1)
RETURNING id;
"""
return await conn.execute(query, name, content_id)
def run(self):
super().run(config.token, reconnect=True)