diff --git a/backend/.env.template b/backend/.env.template index c01c25192e..143458ce2c 100644 --- a/backend/.env.template +++ b/backend/.env.template @@ -25,3 +25,5 @@ GLM_SENDER_ADDRESS= GLM_SENDER_PRIVATE_KEY= GLM_SENDER_NONCE= MAINNET_PROPOSAL_CIDS= + +SABLIER_MAINNET_SUBGRAPH_URL= diff --git a/backend/app/engine/user/effective_deposit/__init__.py b/backend/app/engine/user/effective_deposit/__init__.py index 33abf1e3b0..e6b4b419c4 100644 --- a/backend/app/engine/user/effective_deposit/__init__.py +++ b/backend/app/engine/user/effective_deposit/__init__.py @@ -12,6 +12,17 @@ class EventType(StrEnum): UNLOCK = "Unlocked" +class SablierEventType(StrEnum): + CREATE = "Create" + WITHDRAW = "Withdraw" + CANCEL = "Cancel" + + +class DepositSource(StrEnum): + OCTANT = "Octant" + SABLIER = "Sablier" + + def _calculate_deposit_after_event( event_type: EventType, before: int, amount: int ) -> int: @@ -32,6 +43,8 @@ class DepositEvent: amount: int deposit_before: int deposit_after: int + source: DepositSource + mapped_event: Optional[SablierEventType] def __init__( self, @@ -40,6 +53,8 @@ def __init__( timestamp: int, amount: int, deposit_before: int, + source: DepositSource = DepositSource.OCTANT, + mapped_event: Optional[SablierEventType] = None, ): self.user = user self.type = type @@ -49,10 +64,17 @@ def __init__( self.deposit_after = _calculate_deposit_after_event( type, deposit_before, amount ) + self.source = source + self.mapped_event = mapped_event @staticmethod def from_dict(event: Dict): event_type = EventType(event["__typename"]) + source = DepositSource.OCTANT + mapped_event = None + if event.get("__source") == DepositSource.SABLIER: + mapped_event = event["type"] + source = DepositSource.SABLIER user = to_checksum_address(event["user"]) timestamp = int(event["timestamp"]) amount = int(event["amount"]) @@ -64,6 +86,8 @@ def from_dict(event: Dict): timestamp=timestamp, amount=amount, deposit_before=deposit_before, + source=source, + mapped_event=mapped_event, ) diff --git a/backend/app/extensions.py b/backend/app/extensions.py index 8a749b7c8a..0ac92f5d1e 100644 --- a/backend/app/extensions.py +++ b/backend/app/extensions.py @@ -14,7 +14,7 @@ from app.infrastructure.contracts.erc20 import ERC20 from app.infrastructure.contracts.projects import Projects from app.infrastructure.contracts.vault import Vault -from app.infrastructure import GQLConnectionFactory +from app.infrastructure import GQLConnectionFactory, SubgraphEndpoints # Flask extensions api = Api( @@ -39,7 +39,8 @@ vault = Vault(abi=abi.VAULT) # GQL extensions -gql_factory = GQLConnectionFactory() +gql_octant_factory = GQLConnectionFactory() +gql_sablier_factory = GQLConnectionFactory() def init_web3(app): @@ -55,7 +56,8 @@ def init_web3(app): def init_subgraph(app): - gql_factory.set_url(app.config) + gql_octant_factory.set_url(app.config, SubgraphEndpoints.OCTANT_SUBGRAPH) + gql_sablier_factory.set_url(app.config, SubgraphEndpoints.SABLIER_SUBGRAPH) def init_scheduler(app): diff --git a/backend/app/infrastructure/__init__.py b/backend/app/infrastructure/__init__.py index 509cdecc49..f2e0b9bb02 100644 --- a/backend/app/infrastructure/__init__.py +++ b/backend/app/infrastructure/__init__.py @@ -22,6 +22,11 @@ } +class SubgraphEndpoints: + OCTANT_SUBGRAPH = "SUBGRAPH_ENDPOINT" + SABLIER_SUBGRAPH = "SABLIER_MAINNET_SUBGRAPH_URL" + + class OctantResource(Resource): def __init__(self, *args, **kwargs): Resource.__init__(self, *args, *kwargs) @@ -108,8 +113,8 @@ class GQLConnectionFactory: def __init__(self): self._url = None - def set_url(self, config: Config): - self._url = config["SUBGRAPH_ENDPOINT"] + def set_url(self, config: Config, key: SubgraphEndpoints): + self._url = config[key] def build(self): if not self._url: diff --git a/backend/app/infrastructure/graphql/epochs.py b/backend/app/infrastructure/graphql/epochs.py index 74770981c5..a5454554de 100644 --- a/backend/app/infrastructure/graphql/epochs.py +++ b/backend/app/infrastructure/graphql/epochs.py @@ -1,7 +1,7 @@ from flask import current_app as app from gql import gql -from app.extensions import gql_factory +from app.extensions import gql_octant_factory from app import exceptions @@ -9,23 +9,25 @@ def get_epoch_by_number(epoch_number): query = gql( """ -query GetEpoch($epochNo: Int!) { - epoches(where: {epoch: $epochNo}) { - epoch - fromTs - toTs - duration - decisionWindow - } -} - """ + query GetEpoch($epochNo: Int!) { + epoches(where: {epoch: $epochNo}) { + epoch + fromTs + toTs + duration + decisionWindow + } + } + """ ) variables = {"epochNo": epoch_number} app.logger.debug( f"[Subgraph] Getting epoch properties for epoch number: {epoch_number}" ) - data = gql_factory.build().execute(query, variable_values=variables)["epoches"] + data = gql_octant_factory.build().execute(query, variable_values=variables)[ + "epoches" + ] if data: app.logger.debug(f"[Subgraph] Received epoch properties: {data[0]}") @@ -56,5 +58,5 @@ def get_epochs(): ) app.logger.debug("[Subgraph] Getting list of all epochs") - data = gql_factory.build().execute(query) + data = gql_octant_factory.build().execute(query) return data diff --git a/backend/app/infrastructure/graphql/info.py b/backend/app/infrastructure/graphql/info.py index 2ea3d39969..c9db09d5cf 100644 --- a/backend/app/infrastructure/graphql/info.py +++ b/backend/app/infrastructure/graphql/info.py @@ -1,6 +1,6 @@ from gql import gql -from app.extensions import gql_factory +from app.extensions import gql_octant_factory def get_indexed_block_num() -> int: @@ -15,7 +15,7 @@ def get_indexed_block_num() -> int: } """ ) - data = gql_factory.build().execute(query) + data = gql_octant_factory.build().execute(query) if data: return data["_meta"]["block"]["number"] else: diff --git a/backend/app/infrastructure/graphql/locks.py b/backend/app/infrastructure/graphql/locks.py index 12c4491467..f2c14efb8b 100644 --- a/backend/app/infrastructure/graphql/locks.py +++ b/backend/app/infrastructure/graphql/locks.py @@ -2,7 +2,7 @@ from flask import current_app as app from gql import gql -from app.extensions import gql_factory +from app.extensions import gql_octant_factory class LockEvent(TypedDict): @@ -43,9 +43,9 @@ def get_user_locks_history( } app.logger.debug(f"[Subgraph] Getting user {user_address} locks") - partial_result = gql_factory.build().execute(query, variable_values=variables)[ - "lockeds" - ] + partial_result = gql_octant_factory.build().execute( + query, variable_values=variables + )["lockeds"] result = [] @@ -90,7 +90,9 @@ def get_locks_by_timestamp_range(from_ts: int, to_ts: int) -> list[LockEvent]: "toTimestamp": to_ts, } app.logger.debug(f"[Subgraph] Getting locks in timestamp range {from_ts} - {to_ts}") - result = gql_factory.build().execute(query, variable_values=variables)["lockeds"] + result = gql_octant_factory.build().execute(query, variable_values=variables)[ + "lockeds" + ] app.logger.debug(f"[Subgraph] Received locks: {result}") return result @@ -124,7 +126,9 @@ def get_last_lock_before(user_address: str, before: int) -> LockEvent | None: app.logger.debug( f"[Subgraph] Getting user {user_address} last lock before {before}" ) - locks = gql_factory.build().execute(query, variable_values=variables)["lockeds"] + locks = gql_octant_factory.build().execute(query, variable_values=variables)[ + "lockeds" + ] app.logger.debug(f"[Subgraph] Received locks: {locks}") return locks[0] if locks else None @@ -160,7 +164,9 @@ def get_locks_by_address_and_timestamp_range( app.logger.debug( f"[Subgraph] Getting user {user_address} locks in timestamp range {from_ts} - {to_ts}" ) - result = gql_factory.build().execute(query, variable_values=variables)["lockeds"] + result = gql_octant_factory.build().execute(query, variable_values=variables)[ + "lockeds" + ] app.logger.debug(f"[Subgraph] Received locks: {result}") return result diff --git a/backend/app/infrastructure/graphql/merkle_roots.py b/backend/app/infrastructure/graphql/merkle_roots.py index 1c47aefbf5..d54d4f5b97 100644 --- a/backend/app/infrastructure/graphql/merkle_roots.py +++ b/backend/app/infrastructure/graphql/merkle_roots.py @@ -1,7 +1,7 @@ from flask import current_app as app from gql import gql -from app.extensions import gql_factory +from app.extensions import gql_octant_factory def get_all_vault_merkle_roots(): @@ -18,7 +18,7 @@ def get_all_vault_merkle_roots(): ) app.logger.debug("[Subgraph] Getting all vault merkle roots") - result = gql_factory.build().execute(query)["vaultMerkleRoots"] + result = gql_octant_factory.build().execute(query)["vaultMerkleRoots"] app.logger.debug(f"[Subgraph] Received merkle roots: {result}") return result diff --git a/backend/app/infrastructure/graphql/unlocks.py b/backend/app/infrastructure/graphql/unlocks.py index e51535cc1d..6b2cdc0813 100644 --- a/backend/app/infrastructure/graphql/unlocks.py +++ b/backend/app/infrastructure/graphql/unlocks.py @@ -1,8 +1,8 @@ -from typing import Literal, TypedDict +from typing import Literal, TypedDict, List from flask import current_app as app from gql import gql -from app.extensions import gql_factory +from app.extensions import gql_octant_factory class UnlockEvent(TypedDict): @@ -16,7 +16,7 @@ class UnlockEvent(TypedDict): def get_user_unlocks_history( user_address: str, from_timestamp: int, limit: int -) -> list[UnlockEvent]: +) -> List[UnlockEvent]: query = gql( """ query GetUnlocks($userAddress: Bytes!, $fromTimestamp: Int!, $limit: Int!) { @@ -44,9 +44,9 @@ def get_user_unlocks_history( } app.logger.debug(f"[Subgraph] Getting user {user_address} unlocks") - partial_result = gql_factory.build().execute(query, variable_values=variables)[ - "unlockeds" - ] + partial_result = gql_octant_factory.build().execute( + query, variable_values=variables + )["unlockeds"] result = [] @@ -94,7 +94,9 @@ def get_unlocks_by_timestamp_range(from_ts, to_ts) -> list[UnlockEvent]: app.logger.debug( f"[Subgraph] Getting unlocks in timestamp range {from_ts} - {to_ts}" ) - result = gql_factory.build().execute(query, variable_values=variables)["unlockeds"] + result = gql_octant_factory.build().execute(query, variable_values=variables)[ + "unlockeds" + ] app.logger.debug(f"[Subgraph] Received unlocks: {result}") return result @@ -130,7 +132,9 @@ def get_unlocks_by_address_and_timestamp_range( app.logger.debug( f"[Subgraph] Getting user {user_address} unlocks in timestamp range {from_ts} - {to_ts}" ) - result = gql_factory.build().execute(query, variable_values=variables)["unlockeds"] + result = gql_octant_factory.build().execute(query, variable_values=variables)[ + "unlockeds" + ] app.logger.debug(f"[Subgraph] Received unlocks: {result}") return result @@ -164,7 +168,9 @@ def get_last_unlock_before(user_address: str, before: int) -> UnlockEvent | None app.logger.debug( f"[Subgraph] Getting user {user_address} last unlock before {before}" ) - unlocks = gql_factory.build().execute(query, variable_values=variables)["unlockeds"] + unlocks = gql_octant_factory.build().execute(query, variable_values=variables)[ + "unlockeds" + ] app.logger.debug(f"[Subgraph] Received unlocks: {unlocks}") return unlocks[0] if unlocks else None diff --git a/backend/app/infrastructure/graphql/withdrawals.py b/backend/app/infrastructure/graphql/withdrawals.py index cf81133dba..de2f84b879 100644 --- a/backend/app/infrastructure/graphql/withdrawals.py +++ b/backend/app/infrastructure/graphql/withdrawals.py @@ -1,7 +1,7 @@ from flask import current_app as app from gql import gql -from app.extensions import gql_factory +from app.extensions import gql_octant_factory def get_user_withdrawals_history(user_address: str, from_timestamp: int, limit: int): @@ -32,9 +32,9 @@ def get_user_withdrawals_history(user_address: str, from_timestamp: int, limit: app.logger.debug( f"[Subgraph] Getting user {user_address} withdrawals before ts {from_timestamp}" ) - partial_result = gql_factory.build().execute(query, variable_values=variables)[ - "withdrawals" - ] + partial_result = gql_octant_factory.build().execute( + query, variable_values=variables + )["withdrawals"] result = [] @@ -81,7 +81,7 @@ def get_withdrawals_by_address_and_timestamp_range( f"[Subgraph] Getting user {user_address} withdrawals in timestamp range {from_timestamp} - {to_timestamp}" ) - result = gql_factory.build().execute(query, variable_values=variables)[ + result = gql_octant_factory.build().execute(query, variable_values=variables)[ "withdrawals" ] diff --git a/backend/app/infrastructure/routes/history.py b/backend/app/infrastructure/routes/history.py index 22fb0eb545..2c6fd6eb75 100644 --- a/backend/app/infrastructure/routes/history.py +++ b/backend/app/infrastructure/routes/history.py @@ -90,7 +90,10 @@ class History(OctantResource): @ns.param("cursor", description="History page cursor", _in="query") @ns.param("limit", description="History page size", _in="query") @ns.marshal_with(user_history_model) - @ns.response(200, "User history successfully retrieved") + @ns.response( + 200, + "User history from the Octant's and Sablier's Subgraphes successfully retrieved", + ) def get(self, user_address): page_cursor = request.args.get("cursor", type=str) page_limit = request.args.get("limit", type=int) diff --git a/backend/app/infrastructure/sablier/__init__.py b/backend/app/infrastructure/sablier/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/app/infrastructure/sablier/events.py b/backend/app/infrastructure/sablier/events.py new file mode 100644 index 0000000000..54d3affbd3 --- /dev/null +++ b/backend/app/infrastructure/sablier/events.py @@ -0,0 +1,143 @@ +from typing import TypedDict, List, Dict, Optional + +from flask import current_app as app +from gql import gql + +from app.extensions import gql_sablier_factory + + +class SablierAction(TypedDict): + category: str + addressA: str + addressB: str + amountA: int + amountB: int + timestamp: int + hash: str + + +class SablierStream(TypedDict): + actions: List[SablierAction] + intactAmount: int + + +def fetch_streams(query: str, variables: Dict) -> SablierStream: + """ + Fetch streams with retry logic for pagination. + """ + all_streams = [] + merged_actions = [] + final_intact_amount = 0 + has_more = True + limit = 1000 + skip = 0 + + while has_more: + variables.update({"limit": limit, "skip": skip}) + + app.logger.debug(f"[Sablier Subgraph] Querying streams with skip: {skip}") + result = gql_sablier_factory.build().execute( + gql(query), variable_values=variables + ) + streams = result.get("streams", []) + + app.logger.debug(f"[Sablier Subgraph] Received {len(streams)} streams.") + + all_streams.extend(streams) + + for stream in streams: + merged_actions.extend(stream.get("actions", [])) + final_intact_amount = stream.get("intactAmount", 0) + + if len(streams) < limit: + has_more = False + else: + skip += limit + + return SablierStream(actions=merged_actions, intactAmount=final_intact_amount) + + +def get_user_events_history(user_address: str) -> Optional[SablierStream]: + """ + Get all the locks and unlocks for a user. + """ + query = """ + query GetEvents($sender: String!, $recipient: String!, $tokenAddress: String!, $limit: Int!, $skip: Int!) { + streams( + where: { + sender: $sender + recipient: $recipient + asset_: {address: $tokenAddress} + transferable: false + } + first: $limit + skip: $skip + orderBy: timestamp + ) { + id + intactAmount + actions(where: {category_in: [Cancel, Withdraw, Create]}, orderBy: timestamp) { + category + addressA + addressB + amountA + amountB + timestamp + hash + } + } + } + """ + variables = { + "sender": _get_sender(), + "recipient": user_address, + "tokenAddress": _get_token_address(), + } + + return fetch_streams(query, variables) + + +def get_all_events_history() -> SablierStream: + """ + Get all the locks and unlocks in history. + """ + query = """ + query GetAllEvents($sender: String!, $tokenAddress: String!, $limit: Int!, $skip: Int!) { + streams( + where: { + sender: $sender + asset_: {address: $tokenAddress} + transferable: false + } + first: $limit + skip: $skip + orderBy: timestamp + ) { + id + intactAmount + actions(where: {category_in: [Cancel, Withdraw, Create]}, orderBy: timestamp) { + category + addressA + addressB + amountA + amountB + timestamp + hash + } + } + } + """ + variables = { + "sender": _get_sender(), + "tokenAddress": _get_token_address(), + } + + return fetch_streams(query, variables) + + +def _get_sender(): + return app.config["SABLIER_SENDER_ADDRESS"] + + +def _get_token_address(): + return app.config["GLM_TOKEN_ADDRESS"] diff --git a/backend/app/modules/common/sablier_events_mapper.py b/backend/app/modules/common/sablier_events_mapper.py new file mode 100644 index 0000000000..2c984947e9 --- /dev/null +++ b/backend/app/modules/common/sablier_events_mapper.py @@ -0,0 +1,169 @@ +from copy import deepcopy +from dataclasses import dataclass +from typing import List, TypedDict, Literal, Tuple + +from app.engine.user.effective_deposit import ( + SablierEventType, + DepositSource, + EventType, +) +from app.infrastructure.sablier.events import SablierStream, SablierAction + + +class SablierEvent(TypedDict): + __source: DepositSource + depositBefore: int + amount: int + timestamp: int + user: str + transactionHash: str + type: SablierEventType + + +class SablierEventLock(SablierEvent): + __typename: Literal["Locked"] + + +class SablierEventUnlock(SablierEvent): + __typename: Literal["Unlocked"] + + +@dataclass +class MappedEvents: + locks: List[SablierEvent] + unlocks: List[SablierEvent] + + +def process_to_locks_and_unlocks( + sablier_stream: SablierStream, + *, + from_timestamp: int = None, + to_timestamp: int = None, + inclusively: bool = False, +) -> MappedEvents: + """ + Returns TypedDict with locks and unlocks from Sablier stream. + """ + if len(sablier_stream["actions"]) == 0: + return MappedEvents(locks=[], unlocks=[]) + + event_items = _convert(sablier_stream["actions"]) + lock_items_with_filters = _apply_filters( + event_items, + from_timestamp=from_timestamp, + to_timestamp=to_timestamp, + inclusively=inclusively, + ) + + return MappedEvents( + locks=list( + filter( + lambda lock: lock["__typename"] == EventType.LOCK, + lock_items_with_filters, + ) + ), + unlocks=list( + filter( + lambda lock: lock["__typename"] == EventType.UNLOCK, + lock_items_with_filters, + ) + ), + ) + + +def _apply_filters( + event_items: List[SablierEvent], + *, + from_timestamp: int, + to_timestamp: int, + inclusively: bool, +) -> List[SablierEvent]: + copy_event_items = deepcopy(event_items) + + if inclusively is True: + to_timestamp += 1 + + for item in event_items: + if from_timestamp and item["timestamp"] < from_timestamp: + copy_event_items.remove(item) + if to_timestamp and item["timestamp"] > to_timestamp: + copy_event_items.remove(item) + + return copy_event_items + + +def _process_create( + action: SablierAction, starting_deposit: int +) -> Tuple[SablierEvent, int]: + amount = int(action["amountA"]) if action["amountA"] else 0 + deposit_before = starting_deposit + starting_deposit += amount + lock_item = SablierEventLock( + __source=DepositSource.SABLIER, + __typename=EventType.LOCK.value, + amount=amount, + timestamp=int(action["timestamp"]), + transactionHash=action["hash"], + depositBefore=deposit_before, + user=action["addressB"], + type=SablierEventType.CREATE, + ) + return lock_item, starting_deposit + + +def _process_withdraw( + action: SablierAction, starting_deposit: int +) -> Tuple[SablierEvent, int]: + amount = int(action["amountB"]) if action["amountB"] else 0 + deposit_before = starting_deposit + starting_deposit -= amount + lock_item = SablierEventUnlock( + __source=DepositSource.SABLIER, + __typename=EventType.UNLOCK.value, + amount=amount, + timestamp=int(action["timestamp"]), + transactionHash=action["hash"], + depositBefore=deposit_before, + user=action["addressB"], + type=SablierEventType.WITHDRAW, + ) + return lock_item, starting_deposit + + +def _process_cancel( + action: SablierAction, starting_deposit: int +) -> Tuple[SablierEvent, int]: + intact_amount = int(action["amountB"]) if action["amountB"] else 0 + cancelled_amount = int(action["amountA"]) - intact_amount + deposit_before = starting_deposit + starting_deposit = intact_amount + lock_item = SablierEventUnlock( + __source=DepositSource.SABLIER, + __typename=EventType.UNLOCK.value, + amount=cancelled_amount, + timestamp=int(action["timestamp"]), + transactionHash=action["hash"], + depositBefore=deposit_before, + user=action["addressB"], + type=SablierEventType.CANCEL, + ) + return lock_item, starting_deposit + + +def _convert(actions: List[SablierAction]) -> List[SablierEvent]: + lock_items = [] + action_strategy = { + SablierEventType.CREATE.value: _process_create, + SablierEventType.WITHDRAW.value: _process_withdraw, + SablierEventType.CANCEL.value: _process_cancel, + } + starting_deposit = 0 + + for action in actions: + category = action["category"] + if category in action_strategy: + process_func = action_strategy[category] + lock_item, starting_deposit = process_func(action, starting_deposit) + lock_items.append(lock_item) + + return lock_items diff --git a/backend/app/modules/common/time.py b/backend/app/modules/common/time.py index 3a9735b09b..a7738c7a8f 100644 --- a/backend/app/modules/common/time.py +++ b/backend/app/modules/common/time.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass from datetime import datetime as DateTime, timezone @@ -30,7 +32,7 @@ def __str__(self): def __repr__(self): return f"Timestamp({str(self.timestamp_us())})" - def __eq__(self, o): + def __eq__(self, o: Timestamp): if isinstance(o, Timestamp): return self._timestamp_us == o._timestamp_us elif isinstance(o, int): @@ -38,7 +40,7 @@ def __eq__(self, o): else: return False - def __le__(self, o): + def __le__(self, o: Timestamp): if isinstance(o, Timestamp): return self._timestamp_us <= o._timestamp_us else: @@ -46,6 +48,14 @@ def __le__(self, o): f"'<=' not supported between instances of type '{type(self)}' and '{type(o)}'" ) + def __lt__(self, o: Timestamp): + if isinstance(o, Timestamp): + return self._timestamp_us < o._timestamp_us + else: + raise TypeError( + f"'<' not supported between instances of type '{type(self)}' and '{type(o)}'" + ) + def from_timestamp_s(timestamp_s: float) -> Timestamp: return Timestamp(int(timestamp_s * 10**6)) diff --git a/backend/app/modules/user/deposits/service/calculated.py b/backend/app/modules/user/deposits/service/calculated.py index 3fac44847a..1c75c73b86 100644 --- a/backend/app/modules/user/deposits/service/calculated.py +++ b/backend/app/modules/user/deposits/service/calculated.py @@ -3,7 +3,9 @@ from app.context.manager import Context from app.engine.user.effective_deposit import UserDeposit, DepositEvent from app.infrastructure.graphql import locks, unlocks +from app.infrastructure.sablier.events import get_user_events_history from app.modules.common.effective_deposits import calculate_effective_deposits +from app.modules.common.sablier_events_mapper import process_to_locks_and_unlocks from app.modules.common.time import Timestamp, from_timestamp_s from app.modules.history.dto import LockItem, OpType from app.pydantic import Model @@ -49,8 +51,26 @@ def get_user_effective_deposit(self, context: Context, user_address: str) -> int def get_locks( self, user_address: str, from_timestamp: Timestamp, limit: int - ) -> list[LockItem]: - return [ + ) -> List[LockItem]: + sablier_events = get_user_events_history(user_address) + mapped_events = process_to_locks_and_unlocks( + sablier_events, + to_timestamp=int(from_timestamp.timestamp_s()), + inclusively=True, + ) + locks_from_sablier = mapped_events.locks + + sablier_locks = [ + LockItem( + type=OpType.LOCK, + amount=int(r["amount"]), + timestamp=from_timestamp_s(r["timestamp"]), + transaction_hash=r["transactionHash"], + ) + for r in locks_from_sablier + ] + + octant_subgraph_locks = [ LockItem( type=OpType.LOCK, amount=int(r["amount"]), @@ -62,10 +82,27 @@ def get_locks( ) ] + return sablier_locks + octant_subgraph_locks + def get_unlocks( self, user_address: str, from_timestamp: Timestamp, limit: int - ) -> list[LockItem]: - return [ + ) -> List[LockItem]: + sablier_events = get_user_events_history(user_address) + mapped_events = process_to_locks_and_unlocks( + sablier_events, to_timestamp=int(from_timestamp.timestamp_s()) + ) + unlocks_from_sablier = mapped_events.unlocks + + sablier_unlocks = [ + LockItem( + type=OpType.UNLOCK, + amount=int(r["amount"]), + timestamp=from_timestamp_s(r["timestamp"]), + transaction_hash=r["transactionHash"], + ) + for r in unlocks_from_sablier + ] + octant_subgraph_unlocks = [ LockItem( type=OpType.UNLOCK, amount=int(r["amount"]), @@ -76,3 +113,4 @@ def get_unlocks( user_address, int(from_timestamp.timestamp_s()), limit ) ] + return sablier_unlocks + octant_subgraph_unlocks diff --git a/backend/app/modules/user/events_generator/service/db_and_graph.py b/backend/app/modules/user/events_generator/service/db_and_graph.py index 1f80123f33..39da0a4fca 100644 --- a/backend/app/modules/user/events_generator/service/db_and_graph.py +++ b/backend/app/modules/user/events_generator/service/db_and_graph.py @@ -16,6 +16,11 @@ get_unlocks_by_timestamp_range, ) from app.pydantic import Model +from app.infrastructure.sablier.events import ( + get_all_events_history, + get_user_events_history, +) +from app.modules.common.sablier_events_mapper import process_to_locks_and_unlocks class DbAndGraphEventsGenerator(Model): @@ -23,7 +28,7 @@ def get_user_events( self, context: Context, user_address: str ) -> List[DepositEvent]: """ - Get user lock and unlock events from the subgraph within the given timestamp range, sort them by timestamp, + Get user lock and unlock events from the subgraph & sablier within the given timestamp range, sort them by timestamp, Returns: A list of event dictionaries sorted by timestamp. @@ -45,6 +50,12 @@ def get_user_events( get_unlocks_by_address_and_timestamp_range(user_address, start, end) ) + sablier_events = get_user_events_history(user_address) + mapped_events = process_to_locks_and_unlocks( + sablier_events, from_timestamp=start, to_timestamp=end + ) + events += mapped_events.locks + mapped_events.unlocks + events = list(map(DepositEvent.from_dict, events)) sorted_events = sorted(events, key=attrgetter("timestamp")) @@ -69,8 +80,15 @@ def get_all_users_events(self, context: Context) -> Dict[str, List[DepositEvent] end = context.epoch_details.end_sec epoch_start_events = self._get_epoch_start_deposits(epoch_num, start) - epoch_events = get_locks_by_timestamp_range(start, end) + sablier_events = get_all_events_history() + mapped_events = process_to_locks_and_unlocks( + sablier_events, from_timestamp=start, to_timestamp=end + ) + + epoch_events = mapped_events.locks + mapped_events.unlocks + epoch_events += get_locks_by_timestamp_range(start, end) epoch_events += get_unlocks_by_timestamp_range(start, end) + epoch_events = list(map(DepositEvent.from_dict, epoch_events)) sorted_events = sorted(epoch_events, key=attrgetter("user", "timestamp")) diff --git a/backend/app/settings.py b/backend/app/settings.py index 75c37cb388..a8b165f953 100644 --- a/backend/app/settings.py +++ b/backend/app/settings.py @@ -71,6 +71,13 @@ class Config(object): "MAINNET_PROPOSAL_CIDS", DEFAULT_MAINNET_PROJECT_CIDS ) + # Sablier + SABLIER_MAINNET_SUBGRAPH_URL = os.getenv("SABLIER_MAINNET_SUBGRAPH_URL") + SABLIER_SENDER_ADDRESS = os.getenv("SABLIER_SENDER_ADDRESS", "") + GLM_TOKEN_ADDRESS = os.getenv( + "GLM_TOKEN_ADDRESS", "0x7DD9c5Cba05E151C895FDe1CF355C9A1D5DA6429" + ) + class ProdConfig(Config): """Production configuration.""" diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 285bd962ab..63ceed9eea 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -22,8 +22,17 @@ from app import create_app from app.engine.user.effective_deposit import DepositEvent, EventType, UserDeposit from app.exceptions import ExternalApiException -from app.extensions import db, deposits, glm, gql_factory, w3, vault, epochs -from app.infrastructure import Client as GQLClient +from app.extensions import ( + db, + deposits, + glm, + gql_octant_factory, + w3, + vault, + epochs, + gql_sablier_factory, +) +from app.infrastructure import Client as GQLClient, SubgraphEndpoints from app.infrastructure import database from app.infrastructure.contracts.epochs import Epochs from app.infrastructure.contracts.erc20 import ERC20 @@ -71,7 +80,7 @@ LOW_UQ_SCORE, ) from tests.helpers.context import get_context -from tests.helpers.gql_client import MockGQLClient +from tests.helpers.gql_client import MockGQLClient, MockSablierGQLClient from tests.helpers.mocked_epoch_details import EPOCH_EVENTS from tests.helpers.octant_rewards import octant_rewards from tests.helpers.pending_snapshot import create_pending_snapshot @@ -1523,6 +1532,12 @@ def _split_deposit_events(deposit_events): return locks_events, unlocks_events +def mock_sablier_graphql(mocker): + mock_client = MockSablierGQLClient() + mocker.patch.object(gql_sablier_factory, "build") + gql_sablier_factory.build.return_value = mock_client + + def mock_graphql( mocker, deposit_events=None, @@ -1539,8 +1554,8 @@ def mock_graphql( withdrawals=withdrawals_events, merkle_roots=merkle_roots_events, ) - mocker.patch.object(gql_factory, "build") - gql_factory.build.return_value = mock_client + mocker.patch.object(gql_octant_factory, "build") + gql_octant_factory.build.return_value = mock_client @pytest.fixture(scope="function") @@ -1550,7 +1565,10 @@ def mock_failing_gql( monkeypatch, ): # this URL is not called in this test, but it needs to be a proper URL - gql_factory.set_url({"SUBGRAPH_ENDPOINT": "http://domain.example:12345"}) + gql_octant_factory.set_url( + {"SUBGRAPH_ENDPOINT": "http://domain.example:12345"}, + SubgraphEndpoints.OCTANT_SUBGRAPH, + ) mocker.patch.object(GQLClient, "execute_sync") GQLClient.execute_sync.side_effect = TransportQueryError( diff --git a/backend/tests/helpers/gql_client.py b/backend/tests/helpers/gql_client.py index fcaad7ef75..dcdccc01e4 100644 --- a/backend/tests/helpers/gql_client.py +++ b/backend/tests/helpers/gql_client.py @@ -175,3 +175,58 @@ def _bytes_to_lowercase(self): self.withdrawals = list(map(MockGQLClient._user_to_lower, self.withdrawals)) self.lockeds = list(map(MockGQLClient._user_to_lower, self.lockeds)) self.unlockeds = list(map(MockGQLClient._user_to_lower, self.unlockeds)) + + +class MockSablierGQLClient: + def __init__(self, *args, **kwargs): + pass + + def execute(self, query: DocumentNode, variable_values=None): + result = { + "streams": [ + { + "actions": [ + { + "addressA": "0x76273dcc41356e5f0c49bb68e525175dc7e83417", + "addressB": "0xc8ef823f4f154415bc4931071f53c61b4f979152", + "amountA": "10000000000000000000", + "amountB": None, + "category": "Create", + "hash": "0xe4395aa03aaf8bb3d2d8009106cc2a5049f9afde8a5b19bb70d3e19660eae43b", + "timestamp": "1726833047", + }, + { + "addressA": "0xc8ef823f4f154415bc4931071f53c61b4f979152", + "addressB": "0xc8ef823f4f154415bc4931071f53c61b4f979152", + "amountA": None, + "amountB": "355443302891933020", + "category": "Withdraw", + "hash": "0x685ec53bdcbaca88d87438b33f6c82b3720937126db1d3982cfd62a9bf71b138", + "timestamp": "1729075199", + }, + { + "addressA": "0x76273dcc41356e5f0c49bb68e525175dc7e83417", + "addressB": "0xc8ef823f4f154415bc4931071f53c61b4f979152", + "amountA": "9644339802130898030", + "amountB": "216894977168950", + "category": "Cancel", + "hash": "0x244c9de88860320b89575a0d8f62f9eb5c7ba4597947ac63f94b6ef0db354b83", + "timestamp": "1729076267", + }, + { + "addressA": "0xc8ef823f4f154415bc4931071f53c61b4f979152", + "addressB": "0xc8ef823f4f154415bc4931071f53c61b4f979152", + "amountA": None, + "amountB": "216894977168950", + "category": "Withdraw", + "hash": "0xcdf10032cf3bc74a255510e632d4e7fe876503bc2ec04c8d79dce714492ad11d", + "timestamp": "1729077035", + }, + ], + "id": "0x3962f6585946823440d274ad7c719b02b49de51e-1-1147", + "intactAmount": "0", + "transferable": False, + } + ] + } + return result diff --git a/backend/tests/infrastracture/graph/test_gql_retry_backoff.py b/backend/tests/infrastracture/graph/test_gql_retry_backoff.py index f949720c83..9024c3e6c4 100644 --- a/backend/tests/infrastracture/graph/test_gql_retry_backoff.py +++ b/backend/tests/infrastracture/graph/test_gql_retry_backoff.py @@ -3,7 +3,7 @@ from gql import gql from gql.transport.exceptions import TransportQueryError -from app.extensions import gql_factory +from app.extensions import gql_octant_factory from app.infrastructure import Client as GQLClient @@ -30,7 +30,7 @@ def test_with_failure(mock_failing_gql): with pytest.raises( TransportQueryError, match="the chain was reorganized while executing the query" ): - gql_factory.build().execute(query) + gql_octant_factory.build().execute(query) assert ( GQLClient.execute_sync.call_count > 2 diff --git a/backend/tests/modules/common/test_sablier_events_mapper.py b/backend/tests/modules/common/test_sablier_events_mapper.py new file mode 100644 index 0000000000..a099e14452 --- /dev/null +++ b/backend/tests/modules/common/test_sablier_events_mapper.py @@ -0,0 +1,133 @@ +from app.modules.common.sablier_events_mapper import process_to_locks_and_unlocks +from app.engine.user.effective_deposit import SablierEventType +from typing import Dict + + +def create_action( + category: str, timestamp: int, amountA: int = 0, amountB: int = 0 +) -> Dict: + return { + "category": category, + "addressA": "0xSender", + "addressB": "0xReceiver", + "amountA": amountA, + "amountB": amountB, + "timestamp": timestamp, + "hash": f"hash_{timestamp}", + } + + +def test_empty_actions(): + sablier_stream = {"actions": [], "intactAmount": 0} + result = process_to_locks_and_unlocks(sablier_stream) + assert len(result.locks) == 0 + assert len(result.unlocks) == 0 + + +def test_create_action(): + action = create_action(SablierEventType.CREATE, timestamp=100, amountA=100) + sablier_stream = {"actions": [action], "intactAmount": 0} + result = process_to_locks_and_unlocks(sablier_stream) + + assert len(result.locks) == 1 + assert len(result.unlocks) == 0 + lock = result.locks[0] + assert lock["amount"] == 100 + assert lock["__typename"] == "Locked" + assert lock["depositBefore"] == 0 + assert lock["__source"] == "Sablier" + + +def test_withdraw_action(): + create_action_item = create_action( + SablierEventType.CREATE, timestamp=100, amountA=200 + ) + withdraw_action_item = create_action( + SablierEventType.WITHDRAW, timestamp=200, amountB=50 + ) + sablier_stream = { + "actions": [create_action_item, withdraw_action_item], + "intactAmount": 0, + } + result = process_to_locks_and_unlocks(sablier_stream) + + assert len(result.locks) == 1 + assert len(result.unlocks) == 1 + + lock = result.locks[0] + assert lock["amount"] == 200 + assert lock["depositBefore"] == 0 + assert lock["__source"] == "Sablier" + + unlock = result.unlocks[0] + assert unlock["amount"] == 50 + assert unlock["__typename"] == "Unlocked" + assert unlock["depositBefore"] == 200 + assert unlock["__source"] == "Sablier" + + +def test_cancel_action(): + create_action_item = create_action( + SablierEventType.CREATE, timestamp=100, amountA=150 + ) + cancel_action_item = create_action( + SablierEventType.CANCEL, timestamp=300, amountA=150, amountB=50 + ) + sablier_stream = { + "actions": [create_action_item, cancel_action_item], + "intactAmount": 0, + } + result = process_to_locks_and_unlocks(sablier_stream) + + assert len(result.locks) == 1 + assert len(result.unlocks) == 1 + + lock = result.locks[0] + assert lock["amount"] == 150 + assert lock["depositBefore"] == 0 + assert lock["__source"] == "Sablier" + + unlock = result.unlocks[0] + assert unlock["amount"] == 100 + assert unlock["__typename"] == "Unlocked" + assert unlock["depositBefore"] == 150 + assert unlock["__source"] == "Sablier" + + +def test_mixed_actions(): + actions = [ + create_action(SablierEventType.CREATE, timestamp=100, amountA=100), + create_action(SablierEventType.WITHDRAW, timestamp=150, amountB=50), + create_action(SablierEventType.CREATE, timestamp=200, amountA=200), + create_action(SablierEventType.CANCEL, timestamp=250, amountA=150, amountB=50), + ] + sablier_stream = {"actions": actions, "intactAmount": 0} + + result = process_to_locks_and_unlocks(sablier_stream) + + assert len(result.locks) == 2 + assert len(result.unlocks) == 2 + + lock1 = result.locks[0] + assert lock1["amount"] == 100 + assert lock1["__typename"] == "Locked" + assert lock1["depositBefore"] == 0 + assert lock1["__source"] == "Sablier" + + unlock1 = result.unlocks[0] + assert unlock1["amount"] == 50 + assert unlock1["__typename"] == "Unlocked" + assert unlock1["depositBefore"] == 100 + assert unlock1["__source"] == "Sablier" + + lock2 = result.locks[1] + assert lock2["amount"] == 200 + assert lock2["__typename"] == "Locked" + assert lock2["depositBefore"] == 50 + assert lock2["__source"] == "Sablier" + + unlock2 = result.unlocks[1] + assert unlock2["amount"] == 100 + assert unlock2["__typename"] == "Unlocked" + assert unlock2["depositBefore"] == 250 + assert unlock2["__source"] == "Sablier" diff --git a/backend/tests/modules/user/deposits/test_calculated_user_deposits.py b/backend/tests/modules/user/deposits/test_calculated_user_deposits.py index cc36b3e694..315a9a6159 100644 --- a/backend/tests/modules/user/deposits/test_calculated_user_deposits.py +++ b/backend/tests/modules/user/deposits/test_calculated_user_deposits.py @@ -2,7 +2,7 @@ from app.modules.common.time import from_timestamp_s from app.modules.history.dto import LockItem, OpType from app.modules.user.deposits.service.calculated import CalculatedUserDeposits -from tests.conftest import USER1_ADDRESS, mock_graphql +from tests.conftest import USER1_ADDRESS, mock_graphql, mock_sablier_graphql from tests.helpers.context import get_context @@ -62,6 +62,7 @@ def test_get_locks_by_timestamp(app, mocker, alice, mock_events_generator): }, ], ) + mock_sablier_graphql(mocker) timestamp_before = from_timestamp_s(1710719999) timestamp_after = from_timestamp_s(1710720001) @@ -100,6 +101,7 @@ def test_get_unlocks_by_timestamp(app, mocker, alice, mock_events_generator): }, ], ) + mock_sablier_graphql(mocker) timestamp_before = from_timestamp_s(1710719999) timestamp_after = from_timestamp_s(1710720001) diff --git a/backend/tests/modules/user/events_generator/test_epoch_events_generator.py b/backend/tests/modules/user/events_generator/test_epoch_events_generator.py index 9d2029b186..e4711c123e 100644 --- a/backend/tests/modules/user/events_generator/test_epoch_events_generator.py +++ b/backend/tests/modules/user/events_generator/test_epoch_events_generator.py @@ -6,7 +6,7 @@ from app.modules.user.events_generator.service.db_and_graph import ( DbAndGraphEventsGenerator, ) -from tests.conftest import mock_graphql +from tests.conftest import mock_graphql, mock_sablier_graphql from tests.helpers import create_deposit_events, generate_epoch_events from tests.helpers.constants import ( ALICE_ADDRESS, @@ -49,6 +49,7 @@ def events(dave): def test_returns_locks_and_unlocks_for_first_epoch(mocker, events): mock_graphql(mocker, events, EPOCHS) + mock_sablier_graphql(mocker) context = get_context() expected = { ALICE_ADDRESS: [ @@ -117,6 +118,7 @@ def test_returns_locks_and_unlocks_for_second_epoch( mocker, dave, events, mock_pending_epoch_snapshot_db ): mock_graphql(mocker, events, EPOCHS) + mock_sablier_graphql(mocker) context = get_context(2, start=2000) expected = { ALICE_ADDRESS: [ @@ -193,6 +195,7 @@ def test_returns_events_with_one_element_if_deposit_is_gt_0(mocker, dave, events database.deposits.add(2, user, 300, 300) db.session.commit() mock_graphql(mocker, events, EPOCHS) + mock_sablier_graphql(mocker) context = get_context(3, start=3000) generator = DbAndGraphEventsGenerator() @@ -221,6 +224,8 @@ def test_returns_empty_list_if_there_is_one_event_with_deposit_eq_0( user = database.user.add_user(dave) database.deposits.add(3, user, 0, 0) mock_graphql(mocker, events, EPOCHS) + mock_sablier_graphql(mocker) + mock_sablier_graphql(mocker) context = get_context(4, start=4000) generator = DbAndGraphEventsGenerator() @@ -230,6 +235,7 @@ def test_returns_empty_list_if_there_is_one_event_with_deposit_eq_0( def test_returned_events_are_sorted_by_timestamp(mocker, events): mock_graphql(mocker, events, EPOCHS) + mock_sablier_graphql(mocker) context = get_context() generator = DbAndGraphEventsGenerator() diff --git a/localenv/apitest.yaml b/localenv/apitest.yaml index a1444a0ca6..21babe477c 100644 --- a/localenv/apitest.yaml +++ b/localenv/apitest.yaml @@ -33,6 +33,8 @@ services: DELEGATION_SALT: "salt" DELEGATION_SALT_PRIMARY: "salt_primary" + SABLIER_MAINNET_SUBGRAPH_URL: "${SABLIER_MAINNET_SUBGRAPH_URL}" + depends_on: - anvil - graph-node diff --git a/localenv/localenv.yaml b/localenv/localenv.yaml index 080f48b5f2..8a62a85935 100644 --- a/localenv/localenv.yaml +++ b/localenv/localenv.yaml @@ -48,6 +48,8 @@ services: DELEGATION_SALT: "salt" DELEGATION_SALT_PRIMARY: "salt_primary" + SABLIER_MAINNET_SUBGRAPH_URL: "${SABLIER_MAINNET_SUBGRAPH_URL}" + depends_on: - backend-postgres - anvil