Skip to content

Commit

Permalink
OCT-1825: Initial passthrough FastAPI server (rebased) (#525)
Browse files Browse the repository at this point in the history
## Description
Adds FastAPI as the main application.
Mounts old flask app under /flask.
If route does not exist in FastAPI it's routed to old implementation under flask.
Rewrites socketio connection handling in FastAPI directly (using async).
Fixes the socketio manager to share sessions via redis.
  • Loading branch information
adam-gf authored Oct 21, 2024
1 parent 674ce8a commit c6c0039
Show file tree
Hide file tree
Showing 59 changed files with 3,477 additions and 36 deletions.
4 changes: 2 additions & 2 deletions backend/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
db,
migrate,
cors,
socketio,
cache,
init_web3,
api,
Expand Down Expand Up @@ -47,7 +46,8 @@ def register_extensions(app):
cors.init_app(app)
db.init_app(app)
migrate.init_app(app, db)
socketio.init_app(app)
# This is meant to be disabled because we migrate to FastAPI
# socketio.init_app(app)
cache.init_app(app)
init_scheduler(app)
init_logger(app)
Expand Down
1 change: 1 addition & 0 deletions backend/app/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,7 @@
TIMEOUT_LIST_NOT_MAINNET = {
"0xdf486eec7b89c390569194834a2f7a71da05ee13",
"0x689f1a51c177cce66e3afdca4b1ded7721f531f9",
"0x018d43ac91432d00c4ad1531c98b6ccd2b352538",
}

GUEST_LIST_STAMP_PROVIDERS = [
Expand Down
3 changes: 2 additions & 1 deletion backend/app/infrastructure/database/allocations.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,13 @@ def get_allocation_request_by_user_and_epoch(


def get_user_last_allocation_request(user_address: str) -> AllocationRequest | None:
return (
result = (
AllocationRequest.query.join(User, User.id == AllocationRequest.user_id)
.filter(User.address == user_address)
.order_by(AllocationRequest.nonce.desc())
.first()
)
return result


def get_user_allocation_epoch_count(user_address: str) -> int:
Expand Down
7 changes: 7 additions & 0 deletions backend/app/infrastructure/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ def handle_connect():
{"project": project.address, "donors": _serialize_donors(donors)},
)

for project in project_rewards:
donors = controller.get_all_donations_by_project(project.address)
emit(
"project_donors",
{"project": project.address, "donors": _serialize_donors(donors)},
)


@socketio.on("disconnect")
def handle_disconnect():
Expand Down
5 changes: 5 additions & 0 deletions backend/app/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ def config(app_level):
"apscheduler.executors.default": {
"level": "WARNING",
},
"uvicorn": { # Adding for the uvicorn logger (FastAPI)
"level": app_level,
"handlers": ["stdout", "stderr"],
"propagate": 0,
},
},
}

Expand Down
518 changes: 506 additions & 12 deletions backend/poetry.lock

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ pandas = "^2.2.0"
gmpy2 = "^2.1.5"
sentry-sdk = {extras = ["flask"], version = "^2.5.1"}
redis = "^5.0.7"
fastapi = "^0.112.0"
mypy = "^1.11.2"
isort = "^5.13.2"
pydantic-settings = "^2.4.0"
uvicorn = {extras = ["standard"], version = "^0.31.0"}
asyncpg = "^0.29.0"
uvloop = "^0.20.0"
python-socketio = "^5.11.4"

[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"
Expand All @@ -43,6 +51,10 @@ pyright = "^1.1.366"
pylookup = "^0.2.2"
importmagic = "^0.1.7"
epc = "^0.0.5"
isort = "^5.13.2"
mypy = "^1.11.2"
ruff = "^0.6.2"
aiosqlite = "^0.20.0"

[tool.poetry.group.prod]
optional = true
Expand Down
62 changes: 41 additions & 21 deletions backend/startup.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
# !!! IMPORTANT: DO NOT REARRANGE IMPORTS IN THIS FILE !!!
# The eventlet monkey patch needs to be applied before importing the Flask application for the following reasons:
# 1. Enabling Asynchronous I/O: The monkey patch is required to activate eventlet’s asynchronous and non-blocking I/O capabilities.
# Without this patch, the app's I/O requests might be blocked, which is not desirable for our API's performance.
# 2. Import Order Significance: The monkey patch must be applied before importing the Flask application to ensure that the app utilizes
# the asynchronous versions of standard library modules that have been patched by eventlet. If not done in this order, we might experience issues similar to
# what is reported in the following eventlet issue: https://github.com/eventlet/eventlet/issues/371
# This comment provides additional insight and helped resolve our specific problem: https://github.com/eventlet/eventlet/issues/371#issuecomment-779967181
# 3. Issue with dnspython: If dnspython is present in the environment, eventlet monkeypatches socket.getaddrinfo(),
# which breaks dns functionality. By setting the EVENTLET_NO_GREENDNS environment variable before importing eventlet,
# we prevent this monkeypatching

import os
from fastapi import Request
from fastapi.middleware.wsgi import WSGIMiddleware


os.environ["EVENTLET_NO_GREENDNS"] = "yes"
import eventlet # noqa
from starlette.middleware.base import BaseHTTPMiddleware

eventlet.monkey_patch()
from app import create_app as create_flask_app
from app.extensions import db as flask_db

if os.getenv("SENTRY_DSN"):
import sentry_sdk
Expand Down Expand Up @@ -51,16 +42,45 @@ def sentry_before_send(event, hint):
before_send=sentry_before_send,
)

from app import create_app # noqa
from app.extensions import db # noqa

app = create_app()
flask_app = create_flask_app()


@app.teardown_request
@flask_app.teardown_request
def teardown_session(*args, **kwargs):
db.session.remove()
flask_db.session.remove()


# I'm importing it here to make sure that the flask initializes before the fastapi one
from v2.main import app as fastapi_app # noqa


# Middleware to check if the path exists in FastAPI
# If it does, proceed with the request
# If it doesn't, modify the request to forward to the Flask app
class PathCheckMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
path = request.url.path

for route in fastapi_app.routes:
if path == route.path:
# If path exists, proceed with the request
return await call_next(request)

# If path does not exist, modify the request to forward to the Flask app
if path.startswith("/flask"):
return await call_next(request)
request.scope["path"] = "/flask" + path # Adjust the path as needed
response = await call_next(request)
return response


# Setup the pass-through to Flask app
fastapi_app.add_middleware(PathCheckMiddleware)
fastapi_app.mount("/flask", WSGIMiddleware(flask_app))


if __name__ == "__main__":
eventlet.wsgi.server(eventlet.listen(("0.0.0.0", 5000)), app, log=app.logger)
import uvicorn

uvicorn.run(fastapi_app, host="0.0.0.0", port=5000)
Empty file added backend/v2/__init__.py
Empty file.
Empty file.
45 changes: 45 additions & 0 deletions backend/v2/allocations/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Annotated

from fastapi import Depends
from v2.allocations.services import Allocator
from v2.allocations.validators import SignatureVerifier
from v2.core.dependencies import GetChainSettings, GetSession
from v2.epochs.dependencies import GetEpochsSubgraph, GetOpenAllocationWindowEpochNumber
from v2.matched_rewards.dependencies import GetMatchedRewardsEstimator
from v2.projects.dependencies import GetProjectsContracts
from v2.uniqueness_quotients.dependencies import GetUQScoreGetter


def get_signature_verifier(
session: GetSession,
epochs_subgraph: GetEpochsSubgraph,
projects_contracts: GetProjectsContracts,
settings: GetChainSettings,
) -> SignatureVerifier:
return SignatureVerifier(
session, epochs_subgraph, projects_contracts, settings.chain_id
)


GetSignatureVerifier = Annotated[SignatureVerifier, Depends(get_signature_verifier)]


async def get_allocator(
epoch_number: GetOpenAllocationWindowEpochNumber,
session: GetSession,
signature_verifier: GetSignatureVerifier,
uq_score_getter: GetUQScoreGetter,
projects_contracts: GetProjectsContracts,
matched_rewards_estimator: GetMatchedRewardsEstimator,
) -> Allocator:
return Allocator(
session,
signature_verifier,
uq_score_getter,
projects_contracts,
matched_rewards_estimator,
epoch_number,
)


GetAllocator = Annotated[Allocator, Depends(get_allocator)]
174 changes: 174 additions & 0 deletions backend/v2/allocations/repositories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from datetime import datetime
from decimal import Decimal

from app.infrastructure.database.models import Allocation
from app.infrastructure.database.models import AllocationRequest as AllocationRequestDB
from app.infrastructure.database.models import UniquenessQuotient, User
from eth_utils import to_checksum_address
from sqlalchemy import Numeric, cast, func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from sqlalchemy.sql.functions import coalesce
from v2.allocations.schemas import (
AllocationWithUserUQScore,
ProjectDonation,
UserAllocationRequest,
)
from v2.core.types import Address
from v2.users.repositories import get_user_by_address


async def sum_allocations_by_epoch(session: AsyncSession, epoch_number: int) -> int:
"""Get the sum of all allocations for a given epoch. We only consider the allocations that have not been deleted."""

result = await session.execute(
select(coalesce(func.sum(cast(Allocation.amount, Numeric)), 0))
.filter(Allocation.epoch == epoch_number)
.filter(Allocation.deleted_at.is_(None))
)
count = result.scalar()

if count is None:
return 0

return int(count)


async def get_allocations_with_user_uqs(
session: AsyncSession, epoch_number: int
) -> list[AllocationWithUserUQScore]:
"""Get all allocations for a given epoch, including the uniqueness quotients of the users."""

result = await session.execute(
select(
Allocation.project_address,
Allocation.amount,
User.address.label("user_address"),
UniquenessQuotient.score,
)
.join(User, Allocation.user_id == User.id)
.join(UniquenessQuotient, UniquenessQuotient.user_id == User.id)
.filter(Allocation.epoch == epoch_number)
.filter(Allocation.deleted_at.is_(None))
.filter(UniquenessQuotient.epoch == epoch_number)
)

rows = result.all()

return [
AllocationWithUserUQScore(
projectAddress=project_address,
amount=amount,
userAddress=user_address,
userUqScore=Decimal(uq_score),
)
for project_address, amount, user_address, uq_score in rows
]


async def soft_delete_user_allocations_by_epoch(
session: AsyncSession,
user_address: Address,
epoch_number: int,
) -> None:
"""Soft delete all user allocations for a given epoch."""

# Find all the allocations for the user and epoch that have not been deleted
user = await get_user_by_address(session, user_address)

if user is None:
return None

now = datetime.utcnow()

# Perform a batch update to soft delete the allocations
await session.execute(
update(Allocation)
.where(
Allocation.epoch == epoch_number,
Allocation.user_id == user.id,
Allocation.deleted_at.is_(None),
)
.values(deleted_at=now)
)


async def store_allocation_request(
session: AsyncSession,
user_address: Address,
epoch_number: int,
request: UserAllocationRequest,
leverage: float,
) -> None:
"""Store an allocation request in the database."""

user = await get_user_by_address(session, user_address)
if user is None:
return None

new_allocations = [
Allocation(
epoch=epoch_number,
user_id=user.id,
nonce=request.nonce,
project_address=to_checksum_address(a.project_address),
amount=str(a.amount),
)
for a in request.allocations
]

allocation_request = AllocationRequestDB(
user_id=user.id,
epoch=epoch_number,
nonce=request.nonce,
signature=request.signature,
is_manually_edited=request.is_manually_edited,
leverage=leverage,
)

session.add(allocation_request)
session.add_all(new_allocations)


async def get_last_allocation_request_nonce(
session: AsyncSession,
user_address: Address,
) -> int | None:
"""Get the last nonce of the allocation requests for a user."""

user = await get_user_by_address(session, user_address)
if user is None:
return None

return await session.scalar(
select(func.max(AllocationRequestDB.nonce)).filter(
AllocationRequestDB.user_id == user.id
)
)


async def get_donations_by_project(
session: AsyncSession,
project_address: str,
epoch_number: int,
) -> list[ProjectDonation]:
"""Get all donations for a project in a given epoch."""

result = await session.execute(
select(Allocation)
.options(joinedload(Allocation.user))
.filter(Allocation.project_address == project_address)
.filter(Allocation.epoch == epoch_number)
.filter(Allocation.deleted_at.is_(None))
)

allocations = result.scalars().all()

return [
ProjectDonation(
amount=a.amount,
donorAddress=a.user.address,
projectAddress=a.project_address,
)
for a in allocations
]
Loading

0 comments on commit c6c0039

Please sign in to comment.