diff --git a/dashboard/baserpc.py b/dashboard/baserpc.py index 95f1f7c3..ad2e0d5b 100644 --- a/dashboard/baserpc.py +++ b/dashboard/baserpc.py @@ -18,6 +18,7 @@ from .rpc.permissions import DashboardRPC_Permissions from .rpc.utils import rpccheck from .rpc.webhooks import DashboardRPC_Webhooks +from .rpc.thirdparties import DashboardRPC_ThirdParties HUMANIZED_PERMISSIONS = { "view": "View server on dashboard", @@ -51,6 +52,8 @@ def __init__(self, cog: commands.Cog): self.extensions.append(DashboardRPC_Permissions(self.cog)) self.extensions.append(DashboardRPC_AliasCC(self.cog)) self.extensions.append(DashboardRPC_Webhooks(self.cog)) + self.third_parties_handler = DashboardRPC_ThirdParties(self.cog) + self.extensions.append(self.third_parties_handler) # To make sure that both RPC server and client are on the same "version" self.version = random.randint(1, 10000) @@ -171,7 +174,7 @@ async def get_variables(self): try: botavatar = str(self.bot.user.avatar_url_as(static_format="png")) except AttributeError: - botavatar = str(self.bot.user.avatar) + botavatar = str(self.bot.user.display_avatar) returning = { "bot": { @@ -199,15 +202,12 @@ async def get_variables(self): "uptime": uptime_str, }, }, + "third_parties": await self.third_parties_handler.get_third_parties(), } if self.owner is None: app_info = await self.bot.application_info() - if app_info.team: - self.owner = str(app_info.team.name) - else: - self.owner = str(app_info.owner) - + self.owner = str(app_info.team.name) if app_info.team else str(app_info.owner) returning["bot"]["owner"] = self.owner return returning @@ -220,42 +220,32 @@ async def get_commands(self): returning = [] downloader = self.bot.get_cog("Downloader") for name, cog in self.bot.cogs.copy().items(): - stripped = [] - - for c in cog.__cog_commands__: - if not c.parent: - stripped.append(c) - + stripped = [c for c in cog.__cog_commands__ if not c.parent] cmds = await self.build_cmd_list(stripped, do_escape=False) - if not cmds: - continue + # if not cmds: + # continue author = "Unknown" repo = "Unknown" # Taken from Trusty's downloader fuckery, # https://gist.github.com/TrustyJAID/784c8c32dd45b1cc8155ed42c0c56591 - if name not in self.cog_info_cache: - if downloader: - module = downloader.cog_name_from_instance(cog) - installed, cog_info = await downloader.is_installed(module) - if installed: - author = humanize_list(cog_info.author) if cog_info.author else "Unknown" - try: - repo = ( - cog_info.repo.clean_url if cog_info.repo.clean_url else "Unknown" - ) - except AttributeError: - repo = "Unknown (Removed from Downloader)" - elif cog.__module__.startswith("redbot."): - author = "Cog Creators" - repo = "https://github.com/Cog-Creators/Red-DiscordBot" - self.cog_info_cache[name] = {} - self.cog_info_cache[name]["author"] = author - self.cog_info_cache[name]["repo"] = repo - else: + if name in self.cog_info_cache: author = self.cog_info_cache[name]["author"] repo = self.cog_info_cache[name]["repo"] + elif downloader: + module = downloader.cog_name_from_instance(cog) + installed, cog_info = await downloader.is_installed(module) + if installed: + author = humanize_list(cog_info.author) if cog_info.author else "Unknown" + try: + repo = cog_info.repo.clean_url or "Unknown" + except AttributeError: + repo = "Unknown (Removed from Downloader)" + elif cog.__module__.startswith("redbot."): + author = "Cog Creators" + repo = "https://github.com/Cog-Creators/Red-DiscordBot" + self.cog_info_cache[name] = {"author": author, "repo": repo} returning.append( { "name": escape(name or ""), @@ -265,8 +255,7 @@ async def get_commands(self): "repo": repo, } ) - returning = sorted(returning, key=lambda k: k["name"]) - return returning + return sorted(returning, key=lambda k: k["name"]) @rpccheck() async def get_users_servers(self, userid: int, page: int): @@ -308,6 +297,7 @@ async def get_users_servers(self, userid: int, page: int): )(), "go": False, } + if is_owner: guilds.append(sgd) continue @@ -347,13 +337,9 @@ async def get_server(self, userid: int, serverid: int): user = guild.get_member(userid) baseuser = self.bot.get_user(userid) - is_owner = False - if await self.bot.is_owner(baseuser): - is_owner = True - - if not user: - if not baseuser and not is_owner: - return {"status": 0} + is_owner = bool(await self.bot.is_owner(baseuser)) + if not user and not baseuser and not is_owner: + return {"status": 0} if is_owner: humanized = ["Everything (Bot Owner)"] @@ -401,23 +387,17 @@ async def get_server(self, userid: int, serverid: int): else: vl = "Unknown" - if not self.cog.configcache.get(serverid, {"roles": []})["roles"]: - warn = True - else: - warn = False - + warn = not self.cog.configcache.get(serverid, {"roles": []})["roles"] adminroles = [] ar = await self.bot._config.guild(guild).admin_role() for rid in ar: - r = guild.get_role(rid) - if r: + if r := guild.get_role(rid): adminroles.append((rid, r.name)) modroles = [] mr = await self.bot._config.guild(guild).mod_role() for rid in mr: - r = guild.get_role(rid) - if r: + if r := guild.get_role(rid): modroles.append((rid, r.name)) all_roles = [(r.id, r.name) for r in guild.roles] diff --git a/dashboard/rpc/thirdparties.py b/dashboard/rpc/thirdparties.py new file mode 100644 index 00000000..a7be4d0a --- /dev/null +++ b/dashboard/rpc/thirdparties.py @@ -0,0 +1,164 @@ +from redbot.core.bot import Red +from redbot.core.commands import commands + +import discord +import inspect +import typing + +from .utils import rpccheck + + +def dashboard_page(name: typing.Optional[str] = None, methods: typing.List[str] = ["GET"], context_ids: typing.List[str] = None, required_kwargs: typing.List[str] = None, permissions_required: typing.List[str] = ["view"], hidden: typing.Optional[bool] = None): + if context_ids is None: + context_ids = [] + if required_kwargs is None: + required_kwargs = [] + + def decorator(func: typing.Callable): + if name is not None and not isinstance(name, str): + raise TypeError("Name of a page must be a string.") + if name is not None: + discord.app_commands.commands.validate_name(name) + if not inspect.iscoroutinefunction(func): + raise TypeError("Func must be a coroutine.") + params = {"name": name, "methods": methods, "context_ids": context_ids, "required_kwargs": required_kwargs, "permissions_required": permissions_required, "hidden": hidden, "real_cog_name": None} + for key, value in inspect.signature(func).parameters.items(): + if value.name == "self" or value.kind in [inspect._ParameterKind.POSITIONAL_ONLY, inspect._ParameterKind.VAR_KEYWORD]: + continue + if value.default is not inspect._empty: + continue + if key in ["user_id", "guild_id", "member_id", "role_id", "channel_id"] and key not in params["context_ids"]: + params["context_ids"].append(key) + elif f"{key}_id" in ["user_id", "guild_id", "member_id", "role_id", "channel_id"] and f"{key}_id" not in params["context_ids"]: + params["context_ids"].append(f"{key}_id") + elif key not in ["method", "lang_code"]: + params["required_kwargs"].append(key) + # A guild must be chose for these kwargs. + for key in ["member_id", "role_id", "channel_id"]: + if key in params["context_ids"] and "guild_id" not in params["context_ids"]: + params["context_ids"].append("guild_id") + # No guild available without user connection. + if ( + "guild_id" in params["context_ids"] + and "user_id" not in params["context_ids"] + ): + params["context_ids"].append("user_id") + if params["hidden"] is None: + params["hidden"] = params["required_kwargs"] or [x for x in params["context_ids"] if x not in ["user_id", "guild_id"]] + func.__dashboard_params__ = params.copy() + return func + + return decorator + + +class DashboardRPC_ThirdParties: + def __init__(self, cog: commands.Cog): + self.bot: Red = cog.bot + self.cog: commands.Cog = cog + + self.third_parties: typing.Dict[str, typing.Dict[str, typing.Tuple[typing.Callable, typing.Dict[str, bool]]]] = {} + self.third_parties_cogs: typing.Dict[str, commands.Cog] = {} + + self.bot.register_rpc_handler(self.data_receive) + self.bot.add_listener(self.on_cog_add) + self.bot.add_listener(self.on_cog_remove) + self.bot.dispatch("dashboard_cog_add", self.cog) + + def unload(self): + self.bot.unregister_rpc_handler(self.data_receive) + self.bot.remove_listener(self.on_cog_add) + self.bot.remove_listener(self.on_cog_remove) + + @commands.Cog.listener() + async def on_cog_add(self, cog: commands.Cog): + ev = "on_dashboard_cog_add" + funcs = [listener[1] for listener in cog.get_listeners() if listener[0] == ev] + for func in funcs: + self.bot._schedule_event(func, ev, self.cog) # like in `bot.dispatch` + + @commands.Cog.listener() + async def on_cog_remove(self, cog: commands.Cog): + if cog not in self.third_parties_cogs.values(): + return + self.remove_third_party(cog) + + def add_third_party(self, cog: commands.Cog, overwrite: bool = False): + cog_name = cog.qualified_name.lower() + if cog_name in self.third_parties and not overwrite: + raise RuntimeError(f"The cog {cog_name} is already an existing third party.") + _pages = {} + for attr in dir(cog): + if hasattr((func := getattr(cog, attr)), "__dashboard_params__"): + page = func.__dashboard_params__["name"] + if page in _pages: + raise RuntimeError(f"The page {page} is already an existing page for this third party.") + func.__dashboard_params__["real_cog_name"] = cog.qualified_name + _pages[page] = (func, func.__dashboard_params__) + if not _pages: + raise RuntimeError("No page found.") + self.third_parties[cog_name] = _pages + self.third_parties_cogs[cog_name] = cog + + def remove_third_party(self, cog: commands.Cog): + cog_name = cog.qualified_name.lower() + try: + del self.third_parties_cogs[cog_name] + except KeyError: + pass + return self.third_parties.pop(cog_name, None) + + @rpccheck() + async def get_third_parties(self): + return {key: {k: v[1] for k, v in value.items()} for key, value in self.third_parties.items()} + + @rpccheck() + async def data_receive(self, method: str, cog_name: str, page: str, context_ids: typing.Optional[typing.Dict[str, int]] = None, kwargs: typing.Dict[str, typing.Any] = None, lang_code: typing.Optional[str] = None) -> typing.Dict[str, typing.Any]: + if context_ids is None: + context_ids = {} + if kwargs is None: + kwargs = {} + cog_name = cog_name.lower() + if not cog_name or cog_name not in self.third_parties or cog_name not in self.third_parties_cogs: + return {"status": 1, "message": "Third party not found.", "error_message": "404: Looks like that third party doesn't exist... Strange..."} + if self.bot.get_cog(self.third_parties_cogs[cog_name].qualified_name) is None: + return {"status": 1, "message": "Third party not loaded.", "error_message": "404: Looks like that third party doesn't exist... Strange..."} + page = page.lower() if page is not None else page + if page not in self.third_parties[cog_name]: + return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that page doesn't exist... Strange..."} + kwargs["method"] = method + if "user_id" in self.third_parties[cog_name][page][1]["context_ids"]: + if (user := self.bot.get_user(context_ids["user_id"])) is None: + return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that I do not share any server with you..."} + kwargs["user_id"] = context_ids["user_id"] + kwargs["user"] = user + if "guild_id" in self.third_parties[cog_name][page][1]["context_ids"] and "user_id" in self.third_parties[cog_name][page][1]["context_ids"]: + if (guild := self.bot.get_guild(context_ids["guild_id"])) is None: + return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that I'm not in this server..."} + if (m := guild.get_member(context_ids["user_id"])) is None: + return {"status": 1, "message": "Page not found.", "error_message": "403: Looks like that you're not in this server..."} + if m.id != guild.owner.id: + perms = self.cog.rpc.get_perms(guildid=guild.id, m=m) + if perms is None: + return {"status": 1, "message": "Page not found.", "error_message": "403: Looks like that you haven't permissions in this server..."} + for permission in self.third_parties[cog_name][page][1]["permissions_required"]: + if permission not in perms: + return {"status": 1, "message": "Page not found.", "error_message": "403: Looks like that you haven't permissions in this server..."} + kwargs["guild_id"] = context_ids["guild_id"] + kwargs["guild"] = guild + if "member_id" in self.third_parties[cog_name][page][1]["context_ids"]: + if (member := guild.get_member(context_ids["member_id"])) is None: + return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that this member is not found in this guild..."} + kwargs["member_id"] = context_ids["member_id"] + kwargs["member"] = member + if "role_id" in self.third_parties[cog_name][page][1]["context_ids"]: + if (role := guild.get_role(context_ids["role_id"])) is None: + return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that this role is not found in this guild..."} + kwargs["role_id"] = context_ids["role_id"] + kwargs["role"] = role + if "channel_id" in self.third_parties[cog_name][page][1]["context_ids"]: + if (channel := guild.get_channel(context_ids["channel_id"])) is None: + return {"status": 1, "message": "Page not found.", "error_message": "404: Looks like that this channel is not found in this guild..."} + kwargs["channel_id"] = context_ids["channel_id"] + kwargs["channel"] = channel + kwargs["lang_code"] = lang_code or "en-EN" + return await self.third_parties[cog_name][page][0](**kwargs)