From ae9b0bf0aaa60e2757e2a830d8d08bf70b0c7d79 Mon Sep 17 00:00:00 2001 From: Furior Date: Sun, 2 Feb 2025 03:18:00 +0700 Subject: [PATCH] whitelisted ckeys, probably stampella --- app/database/models.py | 7 +++- app/routes/v1/wl/whitelist.py | 75 ++++++++++++++++++++++++++--------- 2 files changed, 62 insertions(+), 20 deletions(-) diff --git a/app/database/models.py b/app/database/models.py index 5ea7d83..9db8d63 100644 --- a/app/database/models.py +++ b/app/database/models.py @@ -11,6 +11,7 @@ class Player(SQLModel, table=True): + __tablename__ = "player" id: int = Field(default=None, primary_key=True) # Actually is a pretty big int. Is way **too** big for a lot of software to handle discord_id: str = Field(max_length=32, unique=True, index=True) @@ -19,6 +20,7 @@ class Player(SQLModel, table=True): class CkeyLinkToken(SQLModel, table=True): + __tablename__ = "ckey_link_token" id: int = Field(default=None, primary_key=True) ckey: str = Field(max_length=32, unique=True, index=True) token: str = Field( @@ -39,21 +41,24 @@ class WhitelistBase(SQLModel): class Whitelist(WhitelistBase, table=True): - pass + __tablename__ = "whitelist" class WhitelistBan(WhitelistBase, table=True): + __tablename__ = "whitelist_ban" expiration_time: datetime = Field( default_factory=lambda: datetime.now() + DEFAULT_WHITELIST_BAN_EXPIRATION_TIME) reason: str | None = Field(max_length=1024) class Auth(SQLModel, table=True): + __tablename__ = "auth" id: int = Field(default=None, primary_key=True) token_hash: str = Field(max_length=64, unique=True, index=True) class Donation(SQLModel, table=True): + __tablename__ = "donation" id: int = Field(default=None, primary_key=True) player_id: int = Field(foreign_key="player.id", index=True) amount: int = Field() diff --git a/app/routes/v1/wl/whitelist.py b/app/routes/v1/wl/whitelist.py index b832f1a..36403b0 100644 --- a/app/routes/v1/wl/whitelist.py +++ b/app/routes/v1/wl/whitelist.py @@ -9,7 +9,7 @@ from app.database.models import Player, Whitelist, WhitelistBan from app.deps import BEARER_DEP_RESPONSES, SessionDep, verify_bearer -from app.schemas.generic import PaginatedResponse +from app.schemas.generic import T, PaginatedResponse from app.schemas.whitelist import (NewWhitelistBanBase, NewWhitelistBanCkey, NewWhitelistBanDiscord, NewWhitelistBanInternal, NewWhitelistBase, NewWhitelistCkey, NewWhitelistDiscord, @@ -65,24 +65,11 @@ async def create_whitelist_helper( logger.info("Whitelist created: %s", wl.model_dump_json()) return wl - -@router.get("s/", # /whitelists - status_code=status.HTTP_200_OK, - responses={ - status.HTTP_200_OK: {"description": "List of matching whitelists"}, - status.HTTP_400_BAD_REQUEST: {"description": "Invalid filter combination"}, - }) -async def get_whitelists(session: SessionDep, - request: Request, - ckey: str | None = None, - discord_id: str | None = None, - wl_type: str | None = None, - active_only: bool = True, - page: int = 1, - page_size: int = 50) -> PaginatedResponse[Whitelist]: - selection = select(Whitelist).join( - Player, Player.id == Whitelist.player_id) # type: ignore - +def filter_whitelists(selection: SelectOfScalar[Whitelist], + ckey: str | None = None, + discord_id: str | None = None, + wl_type: str | None = None, + active_only: bool = True) -> SelectOfScalar[Whitelist]: if active_only: selection = select_only_active_whitelists(selection) if ckey is not None: @@ -91,7 +78,13 @@ async def get_whitelists(session: SessionDep, selection = selection.where(Player.discord_id == discord_id) if wl_type is not None: selection = selection.where(Whitelist.wl_type == wl_type) + return selection +def paginate_selection(session: SessionDep, + selection: SelectOfScalar[T], + request: Request, + page: int, + page_size: int) -> PaginatedResponse[T]: total = session.exec(selection.with_only_columns(func.count())).first() # type: ignore # pylint: disable=not-callable selection = selection.offset((page-1)*page_size).limit(page_size) items = session.exec(selection).all() @@ -104,6 +97,50 @@ async def get_whitelists(session: SessionDep, current_url=request.url, ) +@router.get("s/", # /whitelists + status_code=status.HTTP_200_OK, + responses={ + status.HTTP_200_OK: {"description": "List of matching whitelists"}, + status.HTTP_400_BAD_REQUEST: {"description": "Invalid filter combination"}, + }) +async def get_whitelists(session: SessionDep, + request: Request, + ckey: str | None = None, + discord_id: str | None = None, + wl_type: str | None = None, + active_only: bool = True, + page: int = 1, + page_size: int = 50) -> PaginatedResponse[Whitelist]: + selection = select(Whitelist).join( + Player, Player.id == Whitelist.player_id) # type: ignore + + selection = filter_whitelists(selection, ckey, discord_id, wl_type, active_only) + + return paginate_selection(session, selection, request, page, page_size) + + +@router.get("/ckeys/", + status_code=status.HTTP_200_OK, + responses={ + status.HTTP_200_OK: {"description": "Whitelistd ckeys"}, + status.HTTP_400_BAD_REQUEST: {"description": "Invalid filter combination"}, + }) +async def get_whitelisted_ckeys(session: SessionDep, + request: Request, + wl_type: str | None = None, + active_only: bool = True, + page: int = 1, + page_size: int = 50) -> PaginatedResponse[str]: + selection = select(Player.ckey).join( + Whitelist, Player.id == Whitelist.player_id).distinct() # type: ignore + + selection = filter_whitelists(selection, + wl_type=wl_type, + active_only=active_only) + + return paginate_selection(session, selection, request, page, page_size) + + WHITELIST_POST_RESPONSES = { **BEARER_DEP_RESPONSES, status.HTTP_201_CREATED: {"description": "Whitelist created"},