forked from k8thekat/GatekeeperV2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
652 lines (546 loc) · 30.2 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
'''
Copyright (C) 2021-2022 Katelynn Cadwallader.
This file is part of Gatekeeper, the AMP Minecraft Discord Bot.
Gatekeeper is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3, or (at your option)
any later version.
Gatekeeper is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
License for more details.
You should have received a copy of the GNU General Public License
along with Gatekeeper; see the file COPYING. If not, write to the Free
Software Foundation, 51 Franklin Street - Fifth Floor, Boston, MA
02110-1301, USA.
'''
from datetime import datetime
import logging
import json
import requests
import pathlib
import aiohttp
import sys
import re
from typing import Union
import discord
from discord import app_commands
from discord.ext import commands
import asyncio
import DB
import AMP_Handler
#GLOBAL VARS# DO NOT EDIT THESE! ONLY READ THEM
__AMP_Handler = AMP_Handler.getAMPHandler()
__DB_Handler = DB.getDBHandler()
async def async_rolecheck(context: Union[commands.Context, discord.Interaction, discord.member.Member], perm_node:str= None):
DBHandler = DB.getDBHandler()
DBConfig = DBHandler.DBConfig
_mod_role = DBConfig.GetSetting('Moderator_role_id')
logger = logging.getLogger(__name__)
logger.dev(f'Permission Context command node {perm_node if perm_node != None else str(context.command).replace(" ",".")}')
author = context
if type(context) != discord.Member:
#This is used for `discord.ext.commands.Context` objects
if hasattr(context, 'author'):
top_role_id = context.author.top_role.id
author = context.author
#This is used for `discord.Interaction` objects
elif hasattr(context, 'user'):
top_role_id = context.user.top_role.id
author = context.user
#!TODO! Not sure which one triggers this action.
elif type(context) == discord.member.Member:
print('Triggered discord.member.Member in Role_check()')
top_role_id = context.top_role.id
author = context.name
else:
#This is for on_message commands
top_role_id = context.message.author.top_role.id
author = context.message.author
#This fast tracks role checks for Admins, which also allows the bot to still work without a Staff Role set in the DB
admin = author.guild_permissions.administrator
if admin == True:
logger.command(f'*Admin* Permission Check Okay on {author}')
return True
#This handles Custom Permissions for people with the flag set.
if DBConfig.GetSetting('Permissions') == 1: #0 is Default, 1 is Custom
if perm_node == None:
perm_node = str(context.command).replace(" ",".")
bPerms = get_botPerms()
bPerms.perm_node_check(perm_node, context)
if bPerms.perm_node_check == False:
logger.command(f'*Custom* Permission Check Failed on {author} missing {perm_node}')
return False
else:
logger.command(f'*Custom* Permission Check Okay on {author}')
return True
#This is the final check before we attempt to use the "DEFAULT" permissions setup.
if _mod_role == None:
await context.send(f'Please have an Adminstrator run `/bot moderator (role)` or consider setting up Custom Permissons.', ephemeral=True)
logger.error(f'DBConfig Moderator role has not been set yet!')
return False
staff_role, author_top_role = 0,0
guild_roles = context.guild.roles
#Guild Roles is a heirachy tree;
#So here I am comparing if the author/user's top role is greater than or equal to the `_mod_role` in terms of index values from the list.
for i in range(0, len(guild_roles)):
if guild_roles[i].id == top_role_id:
author_top_role = i
if guild_roles[i].id == _mod_role:
staff_role = i
if author_top_role >= staff_role:
logger.command(f'*Default* Permission Check Okay on {author}')
return True
logger.command(f'*Default* Permission Check Failed on {author}')
await context.send('You do not have permission to use that command...', ephemeral=True)
return False
def role_check():
"""Use this before any Commands that require a Staff/Mod level permission Role, this will also check for Administrator"""
#return commands.check(async_rolecheck(permission_node=perm))
return commands.check(async_rolecheck)
def author_check(user_id:int=None):
"""Checks if User ID matchs Context User ID"""
async def predicate(context:commands.Context):
if context.author.id == user_id:
return True
else:
await context.send('You do not have permission to use that command...', ephemeral=True)
return False
return commands.check(predicate)
def guild_check(guild_id:int=None):
"""Use this before any commands to limit it to a certain guild usage."""
async def predicate(context:commands.Context):
if context.guild.id == guild_id:
return True
else:
await context.send('You do not have permission to use that command...', ephemeral=True)
return False
return commands.check(predicate)
async def autocomplete_servers(interaction:discord.Interaction, current:str) -> list[app_commands.Choice[str]]:
"""Autocomplete for AMP Instance Names"""
choice_list = __AMP_Handler.get_AMP_instance_names()
if await async_rolecheck(interaction, perm_node= 'Staff') == True:
return [app_commands.Choice(name=f"{value} | ID: {key}", value= key)for key, value in choice_list.items() if current.lower() in value.lower()][:25]
else:
return [app_commands.Choice(name=f"{value}", value= key)for key, value in choice_list.items() if current.lower() in value.lower()][:25]
async def autocomplete_servers_public(interaction:discord.Interaction, current:str) -> list[app_commands.Choice[str]]:
"""Autocomplete for AMP Instance Names"""
choice_list = __AMP_Handler.get_AMP_instance_names(public= True)
return [app_commands.Choice(name=f"{value}", value= key)for key, value in choice_list.items() if current.lower() in value.lower()][:25]
class discordBot():
def __init__(self, client:discord.Client):
self.botLogger = logging.getLogger(__name__)
self._client = client
self.botLogger.debug(f'Utils Discord Loaded')
async def userAddRole(self, user:discord.Member, role:discord.Role, reason:str=None):
"""Adds a Role to a User.\n
Requires a `<user`> and `<role>` discord object.\n
Supports `reason`(Optional)"""
self.botLogger.dev('Add Users Discord Role Called...')
await user.add_roles(role,reason)
async def userRemoveRole(self, user:discord.Member, role:discord.Role, reason:str=None):
"""Removes a Role from the User.\n
Requires a `<user>` and `<role>` discord object.\n
Supports `reason`(Optional)"""
self.botLogger.dev('Remove Users Discord Role Called...')
await user.remove_roles(role,reason)
async def delMessage(self, message:discord.Message, delay:float=None):
"""Deletes the message.\n
Your own messages could be deleted without any proper permissions. However to delete other people's messages, you need the `manage_messages` permission.\n
Supports `delay[float]`(Optional)"""
self.botLogger.dev('Delete Discord Message Called...')
await message.delete(delay=delay)
async def channelHistory(self, channel:discord.TextChannel, limit:int=10, before:datetime=None, after:datetime=None, around:datetime=None, oldest_first:bool=False):
"""This will be used to access channel history up to 100. Simple scraper with datetime support."""
if limit > 100:
limit = 100
messages = await channel.history(limit,before,after,around,oldest_first).flatten()
return messages
async def editMessage(self ,message:discord.Message , content:str=None, embed:discord.Embed=None, embeds:list[discord.Embed]=None, delete_after:float=None):
"""Edits the message.\n
The content must be able to be transformed into a string via `str(content)`.\n
Supports `delete_after[float]`(Optional)"""
self.botLogger.dev('Edit Discord Message Called...')
await message.edit(content=content, embed=embed, embeds=embeds, delete_after=delete_after)
async def sendMessage(self, parameter:object, content:str,*, tts:bool=False,embed=None, file:discord.file=None, files:list=None, delete_after:float=None, nonce= None, allowed_mentions=None, reference:object=None):
#content=None, *, tts=False, embed=None, file=None, files=None, delete_after=None, nonce=None, allowed_mentions=None, reference=None, mention_author=None
"""Sends a message to the destination with the content given.\n
The content must be a type that can convert to a string through `str(content)`. If the content is set to `None` (the default), then the embed parameter must be provided.\n
To upload a single file, the `file` parameter should be used with a single File object. To upload multiple files, the `files` parameter should be used with a `list` of `File` objects. Specifying both parameters will lead to an exception.\n
`NOTE:` Using `file` - await channel.send(file=discord.File('my_file.png')) or
with open('my_file.png', 'rb') as fp:
await channel.send(file=discord.File(fp, 'new_filename.png'))
`NOTE:` Using `files` - my_files = [discord.File('result.zip'), discord.File('teaser_graph.png')] await channel.send(files=my_files)"""
self.botLogger.dev('Member Send Message Called...')
await parameter.send(content, tts=tts, embed=embed, file=file, files=files, delete_after=delete_after, nonce=nonce, allowed_mentions=allowed_mentions, reference=reference)
async def messageAddReaction(self, message:discord.Message, reaction_id:str):
"""The name and ID of a custom emoji can be found with the client by prefixing ':custom_emoji:' with a backslash. \n
For example, sending the message '\:python3:' with the client will result in '<:python3:232720527448342530>'.
`NOTE` Can only use Emoji's the bot has access too"""
self.botLogger.dev('Message Add Reaction Called...')
if reaction_id.isnumeric():
emoji = self._client.get_emoji(int(reaction_id))
return await message.add_reaction(emoji)
if not reaction_id.startswith('<') and not reaction_id.endswith('>'):
emoji = discord.utils.get(self._client.emojis, name= reaction_id)
return await message.add_reaction(emoji)
else:
emoji = reaction_id
return await message.add_reaction(emoji)
class botUtils():
"""Gatekeeper Utility Class"""
def __init__ (self, client:discord.Client= None):
self._client = client
self.logger = logging.getLogger(__name__)
self.logger.debug('Utils Bot Loaded')
self.DBHandler = DB.getDBHandler()
self.DB = self.DBHandler.DB #Main Database object
self.DBConfig = self.DBHandler.DBConfig
self.AMPHandler = AMP_Handler.getAMPHandler()
self.AMPInstances = self.AMPHandler.AMP_Instances
self.AMPServer_Avatar_urls = []
def str_to_bool(self, parameter:str):
"""Bool Converter"""
return parameter.lower() == 'true'
def message_formatter(self, message:str):
"""Formats the message for Discord \n
`Bold = \\x01, \\x02` \n
`Italic = \\x03, \\x04` \n
`Underline = \\x05, \\x06` \n"""
#Bold
message = message.replace('\x01', '**')
message = message.replace('\x02', '**')
#Italic
message = message.replace('\x03', '*')
message = message.replace('\x04', '*')
#Underline
message = message.replace('\x05', '__')
message = message.replace('\x06', '__')
return message
def whitelist_reply_handler(self, message:str, context:commands.Context, server:AMP_Handler.AMP.AMPInstance=None) -> str:
"""Handles the reply message for the whitelist event\n
Supports the following: \n
`<user>` - Uses the Message Author's Name/IGN \n
`<server>` - Uses the AMP Server Name \n
`<guild>` - Uses the Guild Name \n"""
if message.find('<user>') != -1:
message = message.replace('<user>',context.author.name)
if message.find('<guild>') != -1:
message = message.replace('<guild>',context.guild.name)
if message.find('<server>') != -1 and server is not None:
server_name = server.FriendlyName
if server.DisplayName != None:
server_name = server.DisplayName
message = message.replace('<server>',server_name)
return message
async def validate_avatar(self, db_server:AMP_Handler.AMP.AMPInstance) -> Union[str, None]:
"""This checks the DB Server objects Avatar_url and returns the proper object type. \n
Must be either `webp`, `jpeg`, `jpg`, `png`, or `gif` if it's animated."""
if db_server.Avatar_url == None:
return None
#This handles web URL avatar icon urls.
if db_server.Avatar_url.startswith("https://") or db_server.Avatar_url.startswith("http://"):
if db_server.Avatar_url not in self.AMPServer_Avatar_urls:
await asyncio.sleep(.5)
#Validating if the URL actually works/exists via response.status codes.
async with aiohttp.ClientSession() as session:
async with session.get(db_server.Avatar_url) as response:
if response.status == 200:
self.AMPServer_Avatar_urls.append(db_server.Avatar_url)
return db_server.Avatar_url
else:
self.logger.error(f'We are getting Error Code {response.status}, not sure whats going on...')
return None
else:
return db_server.Avatar_url
else:
return None
def name_to_uuid_MC(self, name) -> Union[None, str]:
"""Converts an IGN to a UUID/Name Table \n
`returns 'uuid'` else returns `None`, multiple results return `None`"""
url = 'https://api.mojang.com/profiles/minecraft'
header = {'Content-Type': 'application/json'}
jsonhandler = json.dumps(name)
post_req = requests.post(url, headers=header, data=jsonhandler)
minecraft_user = post_req.json()
if len(minecraft_user) == 0:
return None
if len(minecraft_user) > 1:
return None
else:
return minecraft_user[0]['id'] #returns [{'id': 'uuid', 'name': 'name'}]
def name_to_steam_id(self, steamname:str) -> Union[None, str]:
"""Converts a Steam Name to a Steam ID returns `STEAM_0:0:2806383`
"""
#Really basic HTML text scan to find the Title; which has the steam ID in it. Thank you STEAMIDFINDER! <3
#<title> Steam ID STEAM_0:0:2806383 via Steam ID Finder</title>
r = requests.get(f'https://www.steamidfinder.com/lookup/{steamname}')
self.logger.dev('Status Code',r.status_code)
if r.status_code == 404:
return None
title = re.search('(<title>)',r.text)
start,title_start = title.span()
title = re.search('(</title>)',r.text)
title_end,end = title.span()
#turns into " STEAM_0:0:2806383 "
#This should work regardless of the Steam ID length; since we came from the end of the second title backwards.
steam_id = r.text[title_start+9:title_end-20].strip()
self.logger.dev(f'Found Steam ID {steam_id}')
return steam_id
def role_parse(self, parameter:str, context:commands.Context, guild_id:int) -> Union[discord.Role, None]:
"""This is the bot utils Role Parse Function\n
It handles finding the specificed Discord `<role>` in multiple different formats.\n
They can contain single quotes, double quotes and underscores. (" ",' ',_)\n
returns `<role>` object if True, else returns `None`
**Note** Use context.guild.id"""
self.logger.dev('Role Parse Called...')
guild = self._client.get_guild(guild_id)
role_list = guild.roles
#Role ID catch
if parameter.isnumeric():
role = guild.get_role(int(parameter))
self.logger.debug(f'Found the Discord Role {role}')
return role
else:
#This allows a user to pass in a role in quotes double or single
if parameter.find("'") != -1 or parameter.find('"'):
parameter = parameter.replace('"','')
parameter = parameter.replace("'",'')
#If a user provides a role name; this will check if it exists and return the ID
for role in role_list:
if role.name.lower() == parameter.lower():
self.logger.debug(f'Found the Discord Role {role}')
return role
#This is to handle roles with spaces
parameter.replace('_',' ')
if role.name.lower() == parameter.lower():
self.logger.debug(f'Found the Discord Role {role}')
return role
#await context.reply(f'Unable to find the Discord Role: {parameter}')
return None
def channel_parse(self, parameter:Union[str, int], context:commands.Context=None, guild_id:int=None) -> Union[discord.TextChannel, None]:
"""This is the bot utils Channel Parse Function\n
It handles finding the specificed Discord `<channel>` in multiple different formats, either numeric or alphanumeric.\n
returns `<channel>` object if True, else returns `None`
**Note** Use context.guild.id"""
self.logger.dev('Channel Parse Called...')
if guild_id == None:
channel = self._client.get_channel(parameter)
self.logger.debug(f'Found the Discord Channel {channel}')
return channel
guild = self._client.get_guild(guild_id)
channel_list = guild.channels
if type(parameter) == int:
channel = guild.get_channel(parameter)
self.logger.debug(f'Found the Discord Channel {channel}')
return channel
else:
category_clear = parameter.find('->')
if category_clear != -1:
parameter = parameter[(category_clear + 2):].strip()
for channel in channel_list:
if channel.name == parameter:
self.logger.debug(f'Found the Discord Channel {channel}')
return channel
else:
self.logger.error('Unable to Find the Discord Channel')
#await context.reply(f'Unable to find the Discord Channel: {parameter}')
return None
def user_parse(self, parameter:str, context:commands.Context=None, guild_id:int=None) -> Union[discord.Member, None]:
"""This is the bot utils User Parse Function\n
It handles finding the specificed Discord `<user>` in multiple different formats, either numeric or alphanumeric.\n
It also supports '@', '#0000' and partial display name searching for user indentification (eg. k8thekat#1357)\n
returns `<user>` object if True, else returns `None`
**Note** Use context.guild.id"""
self.logger.dev('User Parse Called...')
#Without a guild_ID its harder to parse members.
if guild_id == None:
cur_member = self._client.get_user(int(parameter))
self.logger.dev(f'Found the Discord Member {cur_member.display_name}')
return cur_member
guild = self._client.get_guild(guild_id)
#Discord ID catch
if parameter.isnumeric():
cur_member = guild.get_member(int(parameter))
self.logger.dev(f'Found the Discord Member {cur_member.display_name}')
return cur_member
#Profile Name Catch
if parameter.find('#') != -1:
cur_member = guild.get_member_named(parameter)
self.logger.dev(f'Found the Discord Member {cur_member.display_name}')
return cur_member
#Using @ at user and stripping
if parameter.startswith('<@!') and parameter.endswith('>'):
user_discordid = parameter[3:-1]
cur_member = guild.get_member(int(user_discordid))
self.logger.dev(f'Found the Discord Member {cur_member.display_name}')
return cur_member
#DiscordName/IGN Catch(DB Get user can look this up)
cur_member = guild.get_member_named(parameter)
if cur_member != None:
self.logger.dev(f'Found the Discord Member {cur_member.display_name}')
return cur_member
#Display Name Lookup
else:
cur_member = None
for member in guild.members:
if member.display_name.lower().startswith(parameter.lower()) or (member.display_name.lower().find(parameter.lower()) != -1):
if cur_member != None:
self.logger.error(f'**ERROR** Found multiple Discord Members: {parameter}, Returning None')
return None
self.logger.dev(f'Found the Discord Member {member.display_name}')
cur_member = member
return cur_member
def serverparse(self, instanceID= str, context:commands.Context=None, guild_id:int=None) -> Union[AMP_Handler.AMP.AMPInstance, None]:
"""This is the botUtils Server Parse function.
**Note** Use context.guild.id \n
Returns `AMPInstance[server] <object>`"""
self.logger.dev('Bot Utility Server Parse')
cur_server = None
for key, value in self.AMPHandler.AMP_Instances.items():
if key == instanceID:
cur_server = value
self.logger.dev(f'Selected Server is {value} - InstanceID: {key}')
break
return cur_server #AMP instance object
def sub_command_handler(self, command:str, sub_command):
"""This will get the `Parent` command and then add a `Sub` command to said `Parent` command."""
parent_command = self._client.get_command(command)
try:
parent_command.add_command(sub_command)
self.logger.dev(f'Added {command} Parent Command: {parent_command}')
except discord.app_commands.errors.CommandAlreadyRegistered:
return
except Exception as e:
self.logger.error(f'We encountered an error in `sub_command_handler` - {e}')
def sub_group_command_handler(self, group:str, command):
"""Gets the `Command Group` and adds the `command` to said `Group`"""
parent_group = self._client.get_command(group)
if type(parent_group) == discord.ext.commands.hybrid.HybridGroup:
try:
parent_group.add_command(command)
self.logger.dev(f'Added {group} to Parent Command Group: {parent_group}')
except discord.app_commands.errors.CommandAlreadyRegistered:
return
except Exception as e:
self.logger.error(f'We encountered an error in `sub_group_command_handler` - {e}')
def _remove_commands(self, parent_group:str, command:str):
"""This will remove a command from a group"""
#Should call some form of sync command after; but I do not want to auto sync. Regardless the command tree will be cleaned up.
group = self._client.get_command(parent_group)
#the Group command could not exists on first startup; as the client has not been sync'd.
if group != None:
self.logger.dev(f'Removed {command} from {parent_group}')
group.remove_command(command)
async def _serverCheck(self, context:commands.Context, server, online_only:bool=True) -> Union[AMP_Handler.AMP.AMPInstance, bool]:
"""Verifies if the AMP Server exists and if its Instance is running and its ADS is Running"""
amp_server = self.serverparse(server, context, context.guild.id)
if online_only == False:
return amp_server
if amp_server.Running and amp_server._ADScheck():
return amp_server
await context.send(f'Well this is awkward, it appears the **{amp_server.FriendlyName if amp_server.FriendlyName != None else amp_server.InstanceName}** is `Offline`.', ephemeral=True, delete_after= self._client.Message_Timeout)
return False
#Used to maintain a "Global" botPerms() object.
bPerms = None
def get_botPerms():
"""Returns the Global botPerms() object; otherwise creates it."""
global bPerms
if bPerms == None:
bPerms = botPerms()
return bPerms
class botPerms():
def __init__(self):
self.logger = logging.getLogger()
self.DBHandler = DB.getDBHandler()
self.DB = self.DBHandler.DB
self._last_modified = 0
self.permissions = None
self.permission_roles = []
self.validate_and_load()
self.get_roles()
self.logger.info('**SUCCESS** Loading Bot Permissions')
def validate_and_load(self):
"""Validates the contents of bot_perms.json."""
self.json_file = pathlib.Path.cwd().joinpath('bot_perms.json')
if self.json_file.stat().st_mtime > self._last_modified:
try:
self.permissions = json.load(open(self.json_file, 'r'))
self._last_modified = self.json_file.stat().st_mtime
#Soft validation of the file to help with errors.
#Verifies each role has a numeric discord_role_id or is equal to None and the name is not empty.
for role in self.permissions['Roles']:
if len(role['name']) == 0:
self.logger.critical(f'You are missing a role name, please do not leave role names empty..')
sys.exit(0)
if role['discord_role_id'] == 'None':
continue
elif type(role['discord_role_id']) != str:
self.logger.critical(f'Your Discord Role ID for {role["name"]} does not appear to be string. Please check your bot_perms.json.')
sys.exit(0)
elif not role['discord_role_id'].isnumeric():
self.logger.critical(f'Your Discord Role ID for {role["name"]} does not appear to be all numbers. Please check your bot_perms.json.')
sys.exit(0)
except json.JSONDecodeError:
self.permissions = None
self.logger.critical('Unable to load your permissions file. Please check your formatting.')
def perm_node_check(self, command_perm_node:str, context:commands.Context) -> bool:
"""Checks a Users for a DB Role then checks for that Role inside of bot_perms.py, then checks that Role for the proper permission node."""
#Lets get our DB user and check if they exist.
DB_user = self.DB.GetUser(str(context.author.id))
if DB_user == None:
return False
#Lets also check for their DB Role
user_role = DB_user.Role
if user_role == None:
return False
#Need to turn author roles into a list of ints.
user_discord_role_ids = []
for user_roles in context.author.roles:
user_discord_role_ids.append(str(user_roles.id))
#This is to check for Super perm nodes such as `server.*`
command_super_node = command_perm_node.split(".")[0] + '.*'
if self.permissions == None:
self.logger.error('**ATTENTION** Please verify your bot_perms file, it failed to load.')
return False
self.validate_and_load()
self.logger.info('Validated and Loaded Permissions File.')
roles = self.permissions['Roles']
for role in roles:
if user_role.lower() in role['name'].lower() or role['discord_role_id'] in user_discord_role_ids:
if command_super_node in role['permissions']:
command_perm_node_false_check = '-' + command_perm_node
if command_perm_node_false_check in role['permissions']:
if command_perm_node_false_check[1:] == command_perm_node:
self.logger.dev('This perm node has been denied even though you have global permissions.',command_perm_node_false_check,command_perm_node)
return False
if command_perm_node in role['permissions']:
self.logger.dev('Found command perm node in Roles Permissions list.',command_perm_node)
return True
def get_roles(self) -> list[str]:
"""Pre build my Permissions Role Name List"""
self.permission_roles = []
for role in self.permissions['Roles']:
self.permission_roles.append(role['name'])
return self.permission_roles
async def get_role_prefix(self, user_id:str=None, context:commands.Context=None) -> Union[str, None]:
"""Use to get a Users Role Prefix for displaying."""
#This grabs all a Users discord roles and makes a list of their ids
discord_roles = []
if context != None:
for role in context.author.roles:
discord_roles.append(str(role.id))
#This works because you can only have one bot_perms role.
for role in self.permissions['Roles']:
if role['discord_role_id'] in discord_roles:
return role['prefix']
db_user = self.DB.GetUser(user_id)
if db_user != None and db_user.Role != None:
rolename = db_user.Role
if rolename in self.permission_roles:
for role in self.permissions['Roles']:
if role['name'] == rolename:
return role['prefix']
else:
continue
return None