Skip to content

Commit

Permalink
OCT-579: add-pagination-to-history
Browse files Browse the repository at this point in the history
  • Loading branch information
Piotr Żelazko committed Sep 5, 2023
1 parent f2a91f5 commit e2ea9bf
Show file tree
Hide file tree
Showing 15 changed files with 695 additions and 201 deletions.
77 changes: 36 additions & 41 deletions backend/app/controllers/history.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from dataclasses import dataclass
from enum import StrEnum
from typing import List
from typing import List, Tuple, Optional
from operator import attrgetter

from dataclass_wizard import JSONWizard
from app.core import history
from app.core.pagination import Paginator, Cursor


class OpType(StrEnum):
Expand All @@ -20,43 +22,36 @@ class HistoryEntry(JSONWizard):
timestamp: int # Should be in microseconds precision


def user_history(user_address: str) -> List[HistoryEntry]:
allocations = [
HistoryEntry(
type=OpType.ALLOCATION,
amount=r.amount,
timestamp=r.timestamp,
)
for r in history.get_allocations(user_address, 0)
]

locks = [
HistoryEntry(
type=OpType.LOCK,
amount=r.amount,
timestamp=r.timestamp,
)
for r in history.get_locks(user_address, 0)
]

unlocks = [
HistoryEntry(
type=OpType.UNLOCK,
amount=r.amount,
timestamp=r.timestamp,
)
for r in history.get_unlocks(user_address, 0)
]

withdrawals = [
HistoryEntry(
type=OpType.WITHDRAWAL,
amount=r.amount,
timestamp=r.timestamp,
)
for r in history.get_withdrawals(user_address, 0)
]

combined = allocations + locks + unlocks + withdrawals

return sorted(combined, key=lambda x: x.timestamp, reverse=True)
def user_history(
user_address: str, cursor: str = None, limit: int = 20
) -> Tuple[List[HistoryEntry], Optional[str]]:
limit = limit if limit < 100 else 100

(from_timestamp, offset_at_timestamp) = Cursor.decode(cursor)
query_limit = Paginator.query_limit(limit, offset_at_timestamp)

all = _collect_history_records(user_address, from_timestamp, query_limit)
return Paginator.extract_page(all, offset_at_timestamp, limit)


def _collect_history_records(
user_address, from_timestamp, query_limit
) -> List[HistoryEntry]:
events = []
for event_getter, event_type in [
(history.get_allocations, OpType.ALLOCATION),
(history.get_locks, OpType.LOCK),
(history.get_unlocks, OpType.UNLOCK),
(history.get_withdrawals, OpType.WITHDRAWAL),
]:
events += [
HistoryEntry(
type=event_type,
amount=e.amount,
timestamp=e.timestamp.timestamp_us(),
)
for e in event_getter(user_address, from_timestamp, query_limit)
]

sort_keys = attrgetter("timestamp", "type", "amount")
return sorted(events, key=sort_keys, reverse=True)
68 changes: 39 additions & 29 deletions backend/app/core/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
from typing import List

from app.database import allocations
from app.utils.timestamp import (
Timestamp,
from_datetime,
from_timestamp_s,
from_timestamp_us,
now,
)

from app.infrastructure.graphql.locks import get_locks_by_address
from app.infrastructure.graphql.unlocks import get_unlocks_by_address
from app.infrastructure.graphql.withdrawals import get_withdrawals_by_address_and_ts
from app.infrastructure.graphql import locks, unlocks, withdrawals


class OpType(StrEnum):
Expand All @@ -19,75 +24,80 @@ class OpType(StrEnum):
class LockItem:
type: OpType
amount: int
timestamp: int # Should be in microseconds
timestamp: Timestamp


@dataclass(frozen=True)
class AllocationItem:
address: str
epoch: int
amount: int
timestamp: int # Should be in microseconds
timestamp: Timestamp


@dataclass(frozen=True)
class WithdrawalItem:
amount: int
address: str
timestamp: int # Should be in microseconds
timestamp: Timestamp


def get_locks(user_address: str, from_timestamp_us: int) -> List[LockItem]:
def get_locks(
user_address: str, from_timestamp: Timestamp, limit: int
) -> List[LockItem]:
return [
LockItem(
type=OpType.LOCK,
amount=int(r["amount"]),
timestamp=_seconds_to_microseconds(r["timestamp"]),
timestamp=from_timestamp_s(r["timestamp"]),
)
for r in locks.get_user_locks_history(
user_address, int(from_timestamp.timestamp_s()), limit
)
for r in get_locks_by_address(user_address)
if int(r["timestamp"]) >= from_timestamp_us
]


def get_unlocks(user_address: str, from_timestamp_us: int) -> List[LockItem]:
def get_unlocks(
user_address: str, from_timestamp: Timestamp, limit: int
) -> List[LockItem]:
return [
LockItem(
type=OpType.UNLOCK,
amount=int(r["amount"]),
timestamp=_seconds_to_microseconds(r["timestamp"]),
timestamp=from_timestamp_s(r["timestamp"]),
)
for r in unlocks.get_user_unlocks_history(
user_address, int(from_timestamp.timestamp_s()), limit
)
for r in get_unlocks_by_address(user_address)
if int(r["timestamp"]) >= from_timestamp_us
]


def get_allocations(user_address: str, from_timestamp_us: int) -> List[AllocationItem]:
def get_allocations(
user_address: str, from_timestamp: Timestamp, limit: int
) -> List[AllocationItem]:
return [
AllocationItem(
address=r.proposal_address,
epoch=r.epoch,
amount=int(r.amount),
timestamp=_datetime_to_microseconds(r.created_at),
timestamp=from_datetime(r.created_at),
)
for r in allocations.get_user_allocations_history(
user_address, from_timestamp.datetime(), limit
)
for r in allocations.get_all_by_user(user_address, with_deleted=True)
if _datetime_to_microseconds(r.created_at) >= from_timestamp_us
]


def get_withdrawals(user_address: str, from_timestamp_s: int) -> List[WithdrawalItem]:
def get_withdrawals(
user_address: str, from_timestamp: Timestamp, limit: int
) -> List[WithdrawalItem]:
return [
WithdrawalItem(
address=r["user"],
amount=int(r["amount"]),
timestamp=_seconds_to_microseconds(r["timestamp"]),
timestamp=from_timestamp_s(r["timestamp"]),
)
for r in withdrawals.get_user_withdrawals_history(
user_address, int(from_timestamp.timestamp_s()), limit
)
for r in get_withdrawals_by_address_and_ts(user_address, from_timestamp_s)
]


def _datetime_to_microseconds(date: datetime.datetime) -> int:
return int(date.timestamp() * 10**6)


def _seconds_to_microseconds(timestamp: int) -> int:
return timestamp * 10**6
61 changes: 61 additions & 0 deletions backend/app/core/pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import base64
from typing import List, Tuple, Optional

from app.utils.timestamp import now, from_timestamp_us, Timestamp


class Cursor:
@staticmethod
def encode(timestamp: int, offset: int) -> str:
return base64.urlsafe_b64encode(bytes(f"{timestamp}.{offset}", "ascii")).decode(
"ascii"
)

@staticmethod
def decode(cursor: Optional[str]) -> Tuple[Timestamp, int]:
if cursor is None:
return now(), 0
timestamp, offset = base64.urlsafe_b64decode(cursor).decode("ascii").split(".")
return from_timestamp_us(int(timestamp)), int(offset)


class Paginator:
@staticmethod
def query_limit(limit, offset):
# we must query for all elems at the current timestamp (also events from the prev page, thus we add offset)
# and one elem more to get calculate next page cursor
return limit + offset + 1

@staticmethod
def extract_page(
all: List, offset_at_timestamp: int, limit: int
) -> Tuple[List, Optional[str]]:
next_page_start_index = offset_at_timestamp + limit
current_page = all[offset_at_timestamp:next_page_start_index]

next_page_start_elem = (
all[next_page_start_index] if next_page_start_index < len(all) else None
)
next_page_cursor = None
if next_page_start_elem is not None:
next_offset = Paginator._get_offset(
current_page, next_page_start_elem.timestamp, offset_at_timestamp
)
next_page_cursor = Cursor.encode(
next_page_start_elem.timestamp, next_offset
)

return current_page, next_page_cursor

@staticmethod
def _get_offset(current_page_elems, next_elem_timestamp, prev_offset):
offset = 0
for elem in current_page_elems[::-1]:
if elem.timestamp == next_elem_timestamp:
offset += 1
else:
return offset

# If we reach this line,
# then all elems in the next page have same timestamp, so we have to add offset to the prev_offset
return offset + prev_offset
35 changes: 28 additions & 7 deletions backend/app/database/allocations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from eth_utils import to_checksum_address
from sqlalchemy.orm import Query
from sqlalchemy import func

from app.core.common import AccountFunds
from app.database.models import Allocation, User
Expand All @@ -20,18 +21,38 @@ def get_all_by_epoch(epoch: int, with_deleted=False) -> List[Allocation]:
return query.all()


def get_all_by_user(user_address: str, with_deleted=False) -> List[Allocation]:
def get_user_allocations_history(
user_address: str, from_datetime: datetime, limit: int
) -> List[Allocation]:
user: User = get_by_address(user_address)

if user is None:
return []

query: Query = Allocation.query.filter_by(user_id=user.id)
user_allocations: Query = (
Allocation.query.filter(
Allocation.user_id == user.id, Allocation.created_at <= from_datetime
)
.order_by(Allocation.created_at.desc())
.limit(limit)
.subquery()
)

if not with_deleted:
query = query.filter(Allocation.deleted_at.is_(None))
timestamp_at_limit_query = (
db.session.query(
func.min(user_allocations.c.created_at).label("limit_timestamp")
)
.group_by(user_allocations.c.user_id)
.subquery()
)

return query.all()
allocations = Allocation.query.filter(
Allocation.user_id == user.id,
Allocation.created_at <= from_datetime,
Allocation.created_at >= timestamp_at_limit_query.c.limit_timestamp,
).order_by(Allocation.created_at.desc())

return allocations.all()


def get_all_by_user_addr_and_epoch(
Expand Down Expand Up @@ -106,7 +127,7 @@ def get_alloc_sum_by_epoch(epoch: int) -> int:


def add_all(epoch: int, user_id: int, allocations):
now = datetime.now()
now = datetime.utcnow()

new_allocations = [
Allocation(
Expand All @@ -126,7 +147,7 @@ def soft_delete_all_by_epoch_and_user_id(epoch: int, user_id: int):
epoch=epoch, user_id=user_id, deleted_at=None
).all()

now = datetime.now()
now = datetime.utcnow()

for allocation in existing_allocations:
allocation.deleted_at = now
Expand Down
Loading

0 comments on commit e2ea9bf

Please sign in to comment.