Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
Furrior committed Feb 10, 2025
1 parent 91232dc commit 8694575
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 49 deletions.
1 change: 1 addition & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def log_defaults(self) -> None:
logger.info("Checking nested model: %s", field_name)
current_value.log_defaults()


class General(CustomBaseModel):
project_name: str = "Space Station Central"
project_desc: str = "API для объеденения множества серверов SS13 и SS14 в одну систему. От него несет вульпой, но он работает."
Expand Down
1 change: 1 addition & 0 deletions app/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
def init() -> None:
init_db()


if __name__ == "__main__":
init()
1 change: 1 addition & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from app.init import init
from app.routes.v1.main_router import v1_router


@asynccontextmanager
async def lifespan(_: FastAPI):
init()
Expand Down
23 changes: 13 additions & 10 deletions app/routes/v1/donate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,28 @@
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlmodel import select
from sqlmodel.sql.expression import SelectOfScalar
from app.core.config import CONFIG
from app.database.models import DEFAULT_DONATION_EXPIRATION_TIME, Donation, Player, Whitelist
from app.routes.v1.whitelist import create_whitelist
from app.database.models import Donation, Player
from app.schemas.donate import NewDonationDiscord
from app.deps import SessionDep, verify_bearer

from app.schemas.generic import PaginatedResponse, paginate_selection
from app.schemas.whitelist import NewWhitelistBase, NewWhitelistInternal

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/donate", tags=["Donate"])

def filter_donations(selection: SelectOfScalar[Donation], ckey: str | None = None, discord_id: str | None = None, active_only: bool = True) -> SelectOfScalar[Donation]:

def filter_donations(selection: SelectOfScalar[Donation],
ckey: str | None = None,
discord_id: str | None = None,
active_only: bool = True) -> SelectOfScalar[Donation]:
if ckey:
selection = selection.where(Player.ckey == ckey)
if discord_id:
selection = selection.where(Player.discord_id == discord_id)
if active_only:
selection = selection.where(Donation.valid).where(Donation.expiration_time > datetime.datetime.now())
selection = selection.where(Donation.valid).where(
Donation.expiration_time > datetime.datetime.now())
return selection


Expand All @@ -36,11 +38,12 @@ async def get_donations(session: SessionDep,
page_size: int = 50) -> PaginatedResponse[Donation]:
selection = select(Donation).join(
Player, Player.id == Donation.player_id)

selection = filter_donations(selection, ckey, discord_id, active_only)

return paginate_selection(session, selection, request, page, page_size)


@router.post("", status_code=status.HTTP_201_CREATED)
async def create_donation(session: SessionDep, donation: Donation) -> Donation:
session.add(donation)
Expand All @@ -56,6 +59,7 @@ async def create_donation(session: SessionDep, donation: Donation) -> Donation:
status.HTTP_404_NOT_FOUND: {"description": "Player not found"},
}


@router.post("/by-discord", status_code=status.HTTP_201_CREATED, dependencies=[Depends(verify_bearer)], responses=WHITELIST_POST_RESPONSES)
async def create_donation_by_discord(session: SessionDep, new_donation: NewDonationDiscord) -> Donation:
player = session.exec(
Expand All @@ -69,7 +73,6 @@ async def create_donation_by_discord(session: SessionDep, new_donation: NewDonat
donation = Donation(
player_id=player.id,
tier=new_donation.tier
)

return await create_donation(session, donation)
)

return await create_donation(session, donation)
96 changes: 62 additions & 34 deletions app/routes/v1/whitelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ def select_only_active_whitelists(selection: SelectOfScalar[Whitelist]):
Whitelist.expiration_time > datetime.datetime.now()
)


def select_only_active_whitelist_bans(selection: SelectOfScalar[WhitelistBan]):
return selection.where(
WhitelistBan.valid).where(
WhitelistBan.expiration_time > datetime.datetime.now()
)


WHITELIST_TYPES_T = NewWhitelistCkey | NewWhitelistDiscord | NewWhitelistInternal


async def create_whitelist_helper(
session: SessionDep,
new_wl: NewWhitelistBase,
Expand All @@ -43,20 +46,24 @@ async def create_whitelist_helper(
ignore_bans: bool = False
) -> Whitelist:
"""Core logic for creating whitelist entries"""
player: Player = session.exec(select(Player).where(player_resolver(new_wl))).first()
admin: Player = session.exec(select(Player).where(admin_resolver(new_wl))).first()
player: Player = session.exec(
select(Player).where(player_resolver(new_wl))).first()
admin: Player = session.exec(
select(Player).where(admin_resolver(new_wl))).first()

if not player or not admin:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Player or admin not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail="Player or admin not found")

if not ignore_bans and session.exec(
select_only_active_whitelist_bans(
select(WhitelistBan)
.where(WhitelistBan.player_id == player.id)
.where(WhitelistBan.wl_type == new_wl.wl_type)
)
).first():
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Player is banned from this type of whitelist.")
select_only_active_whitelist_bans(
select(WhitelistBan)
.where(WhitelistBan.player_id == player.id)
.where(WhitelistBan.wl_type == new_wl.wl_type)
)
).first():
raise HTTPException(status_code=status.HTTP_409_CONFLICT,
detail="Player is banned from this type of whitelist.")

wl = Whitelist(
**{**new_wl.model_dump(), "player_id": player.id, "admin_id": admin.id},
Expand All @@ -69,6 +76,7 @@ async def create_whitelist_helper(
logger.info("Whitelist created: %s", wl.model_dump_json())
return wl


def filter_whitelists(selection: SelectOfScalar[Whitelist],
ckey: str | None = None,
discord_id: str | None = None,
Expand All @@ -84,6 +92,7 @@ def filter_whitelists(selection: SelectOfScalar[Whitelist],
selection = selection.where(Whitelist.wl_type == wl_type)
return selection


def filter_whitelist_bans(selection: SelectOfScalar[WhitelistBan],
ckey: str | None = None,
discord_id: str | None = None,
Expand All @@ -99,6 +108,7 @@ def filter_whitelist_bans(selection: SelectOfScalar[WhitelistBan],
selection = selection.where(WhitelistBan.wl_type == wl_type)
return selection


@router.get("s", # /whitelists
status_code=status.HTTP_200_OK,
responses={
Expand All @@ -116,7 +126,8 @@ async def get_whitelists(session: SessionDep,
selection = select(Whitelist).join(
Player, Player.id == Whitelist.player_id) # type: ignore

selection = filter_whitelists(selection, ckey, discord_id, wl_type, active_only)
selection = filter_whitelists(
selection, ckey, discord_id, wl_type, active_only)

return paginate_selection(session, selection, request, page, page_size)

Expand All @@ -128,16 +139,16 @@ async def get_whitelists(session: SessionDep,
status.HTTP_400_BAD_REQUEST: {"description": "Invalid filter combination"},
})
async def get_whitelisted_ckeys(session: SessionDep,
request: Request,
wl_type: str | None = None,
active_only: bool = True,
page: int = 1,
page_size: int = 50) -> PaginatedResponse[str]:
request: Request,
wl_type: str | None = None,
active_only: bool = True,
page: int = 1,
page_size: int = 50) -> PaginatedResponse[str]:
selection = select(Player.ckey).join(
Whitelist, Player.id == Whitelist.player_id).distinct() # type: ignore

selection = filter_whitelists(selection,
wl_type=wl_type,
wl_type=wl_type,
active_only=active_only)

return paginate_selection(session, selection, request, page, page_size)
Expand Down Expand Up @@ -189,19 +200,24 @@ async def create_whitelist_by_discord(session: SessionDep, new_wl: NewWhitelistD
status.HTTP_404_NOT_FOUND: {"description": "Player or admin not found"},
}


def create_ban_helper(session: SessionDep,
new_ban: NewWhitelistBanBase,
player_resolver: Callable[[WHITELIST_TYPES_T], bool],
admin_resolver: Callable[[WHITELIST_TYPES_T], bool],
invalidate_wls: bool = True
) -> WhitelistBan:
player: Player = session.exec(select(Player).where(player_resolver(new_ban))).first()
admin: Player = session.exec(select(Player).where(admin_resolver(new_ban))).first()
player: Player = session.exec(
select(Player).where(player_resolver(new_ban))).first()
admin: Player = session.exec(
select(Player).where(admin_resolver(new_ban))).first()

if not player or not admin:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Player or admin not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail="Player or admin not found")

ban = WhitelistBan(**new_ban.model_dump(), player_id=player.id, admin_id=admin.id)
ban = WhitelistBan(**new_ban.model_dump(),
player_id=player.id, admin_id=admin.id)
session.add(ban)

if invalidate_wls:
Expand All @@ -219,21 +235,24 @@ def create_ban_helper(session: SessionDep,
logger.info("Whitelist ban created: %s", ban.model_dump_json())
return ban


@ban_router.get("s", status_code=status.HTTP_200_OK)
async def get_whitelist_bans(session: SessionDep,
request: Request,
ckey: str | None = None,
discord_id: str | None = None,
wl_type: str | None = None,
active_only: bool = True,
page: int = 1,
page_size: int = 50) -> PaginatedResponse[WhitelistBan]:
request: Request,
ckey: str | None = None,
discord_id: str | None = None,
wl_type: str | None = None,
active_only: bool = True,
page: int = 1,
page_size: int = 50) -> PaginatedResponse[WhitelistBan]:
selection = select(WhitelistBan).join(
Player, Player.id == WhitelistBan.player_id) # type: ignore

selection = filter_whitelist_bans(selection, ckey, discord_id, wl_type, active_only)
selection = filter_whitelist_bans(
selection, ckey, discord_id, wl_type, active_only)

total = session.exec(selection.with_only_columns(func.count())).first() # type: ignore # pylint: disable=not-callable
# type: ignore # pylint: disable=not-callable
total = session.exec(selection.with_only_columns(func.count())).first()
selection = selection.offset((page-1)*page_size).limit(page_size)
items = session.exec(selection).all()

Expand All @@ -245,8 +264,11 @@ async def get_whitelist_bans(session: SessionDep,
current_url=request.url,
)


@ban_router.post("", status_code=status.HTTP_201_CREATED, responses=BAN_POST_RESPONSES, dependencies=[Depends(verify_bearer)])
async def create_whitelist_ban(session: SessionDep, new_ban: NewWhitelistBanInternal, invalidate_wls: bool = True) -> WhitelistBan:
async def create_whitelist_ban(session: SessionDep,
new_ban: NewWhitelistBanInternal,
invalidate_wls: bool = True) -> WhitelistBan:
return create_ban_helper(
session,
new_ban,
Expand All @@ -255,8 +277,11 @@ async def create_whitelist_ban(session: SessionDep, new_ban: NewWhitelistBanInte
invalidate_wls
)


@ban_router.post("/by-ckey", status_code=status.HTTP_201_CREATED, responses=BAN_POST_RESPONSES, dependencies=[Depends(verify_bearer)])
async def create_whitelist_ban_by_ckey(session: SessionDep, new_ban: NewWhitelistBanCkey, invalidate_wls: bool = True) -> WhitelistBan:
async def create_whitelist_ban_by_ckey(session: SessionDep,
new_ban: NewWhitelistBanCkey,
invalidate_wls: bool = True) -> WhitelistBan:
return create_ban_helper(
session,
new_ban,
Expand All @@ -265,8 +290,11 @@ async def create_whitelist_ban_by_ckey(session: SessionDep, new_ban: NewWhitelis
invalidate_wls
)


@ban_router.post("/by-discord", status_code=status.HTTP_201_CREATED, responses=BAN_POST_RESPONSES, dependencies=[Depends(verify_bearer)])
async def create_whitelist_ban_by_discord(session: SessionDep, new_ban: NewWhitelistBanDiscord, invalidate_wls: bool = True) -> WhitelistBan:
async def create_whitelist_ban_by_discord(session: SessionDep,
new_ban: NewWhitelistBanDiscord,
invalidate_wls: bool = True) -> WhitelistBan:
return create_ban_helper(
session,
new_ban,
Expand All @@ -275,4 +303,4 @@ async def create_whitelist_ban_by_discord(session: SessionDep, new_ban: NewWhite
invalidate_wls
)

router.include_router(ban_router)
router.include_router(ban_router)
4 changes: 3 additions & 1 deletion app/schemas/donate.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import datetime
from pydantic import BaseModel


class NewDonationBase(BaseModel):
tier: int


class NewDonationDiscord(NewDonationBase):
discord_id: str
discord_id: str
10 changes: 6 additions & 4 deletions app/schemas/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,25 @@ def calculate_adjacent_pages(self, current_url: URL):
if (self.page * self.page_size) < self.total:
self.next_page = str(
current_url.include_query_params(
page=self.page+1, page_size=self.page_size)
page=self.page+1)
)
if self.page > 1:
self.previous_page = str(current_url.include_query_params(
page=self.page-1, page_size=self.page_size
page=self.page-1
))

def __init__(self, *args, current_url: URL, **kwargs):
super().__init__(*args, **kwargs)
self.calculate_adjacent_pages(current_url)


def paginate_selection(session: SessionDep,
selection: SelectOfScalar[T],
request: Request,
page: int,
page_size: int) -> PaginatedResponse[T]:
total = session.exec(selection.with_only_columns(func.count())).first() # type: ignore # pylint: disable=not-callable
# type: ignore # pylint: disable=not-callable
total = session.exec(selection.with_only_columns(func.count())).first()
selection = selection.offset((page-1)*page_size).limit(page_size)
items = session.exec(selection).all()

Expand All @@ -49,4 +51,4 @@ def paginate_selection(session: SessionDep,
page=page,
page_size=page_size,
current_url=request.url,
)
)
1 change: 1 addition & 0 deletions app/schemas/whitelist.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
from pydantic import BaseModel


class NewWhitelistBase(BaseModel):
wl_type: str
duration_days: int
Expand Down

0 comments on commit 8694575

Please sign in to comment.