Skip to content

Commit

Permalink
Merge pull request #747 from BoostryJP/feature/reduce-memory-usage-on…
Browse files Browse the repository at this point in the history
…-batch

Extract events from async contract instance to reduce memory usage
  • Loading branch information
YoshihitoAso authored Jan 20, 2025
2 parents 05b64ed + 9c3efc0 commit 424795b
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 22 deletions.
17 changes: 16 additions & 1 deletion app/utils/contract_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import Session
from web3.contract import AsyncContract, Contract
from web3.contract.async_contract import AsyncContractEvents
from web3.exceptions import (
ABIEventNotFound,
ABIFunctionNotFound,
Expand Down Expand Up @@ -290,6 +291,20 @@ def get_event_logs(
return result


class AsyncContractEventsView:
def __init__(self, address: str, contract_events: AsyncContractEvents) -> None:
self._address = address
self._events = contract_events

@property
def address(self) -> str:
return self._address

@property
def events(self) -> AsyncContractEvents:
return self._events


class AsyncContractUtils:
factory_map: dict[str, Type[AsyncContract]] = {}

Expand Down Expand Up @@ -585,7 +600,7 @@ async def get_block_by_transaction_hash(tx_hash: str):

@staticmethod
async def get_event_logs(
contract: AsyncContract,
contract: AsyncContract | AsyncContractEventsView,
event: str,
block_from: int = None,
block_to: int = None,
Expand Down
13 changes: 7 additions & 6 deletions batch/indexer_issue_redeem.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from sqlalchemy import and_, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from web3.contract import AsyncContract

from app.database import BatchAsyncSessionLocal
from app.exceptions import ServiceUnavailableError
Expand All @@ -38,7 +37,7 @@
IDXIssueRedeemEventType,
Token,
)
from app.utils.contract_utils import AsyncContractUtils
from app.utils.contract_utils import AsyncContractEventsView, AsyncContractUtils
from app.utils.web3_utils import AsyncWeb3Wrapper
from batch.utils import batch_log
from config import INDEXER_BLOCK_LOT_MAX_SIZE, INDEXER_SYNC_INTERVAL
Expand All @@ -51,7 +50,7 @@

class Processor:
def __init__(self):
self.token_list: dict[str, AsyncContract] = {}
self.token_list: dict[str, AsyncContractEventsView] = {}

async def sync_new_logs(self):
db_session = BatchAsyncSessionLocal()
Expand Down Expand Up @@ -145,7 +144,9 @@ async def __get_token_list(self, db_session: AsyncSession):
token_contract = web3.eth.contract(
address=load_required_token.token_address, abi=load_required_token.abi
)
self.token_list[load_required_token.token_address] = token_contract
self.token_list[load_required_token.token_address] = (
AsyncContractEventsView(token_contract.address, token_contract.events)
)

@staticmethod
async def __get_idx_issue_redeem_block_number(db_session: AsyncSession):
Expand Down Expand Up @@ -207,7 +208,7 @@ async def __sync_issue(
else:
await self.__insert_index(
db_session=db_session,
event_type=IDXIssueRedeemEventType.ISSUE.value,
event_type=IDXIssueRedeemEventType.ISSUE,
transaction_hash=transaction_hash,
token_address=to_checksum_address(token.address),
locked_address=args["lockAddress"],
Expand Down Expand Up @@ -248,7 +249,7 @@ async def __sync_redeem(
else:
await self.__insert_index(
db_session=db_session,
event_type=IDXIssueRedeemEventType.REDEEM.value,
event_type=IDXIssueRedeemEventType.REDEEM,
transaction_hash=transaction_hash,
token_address=to_checksum_address(token.address),
locked_address=args["lockAddress"],
Expand Down
11 changes: 6 additions & 5 deletions batch/indexer_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from sqlalchemy import and_, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from web3.contract import AsyncContract

from app.database import BatchAsyncSessionLocal
from app.exceptions import ServiceUnavailableError
Expand All @@ -41,7 +40,7 @@
IDXTransferSourceEventType,
Token,
)
from app.utils.contract_utils import AsyncContractUtils
from app.utils.contract_utils import AsyncContractEventsView, AsyncContractUtils
from app.utils.web3_utils import AsyncWeb3Wrapper
from batch.utils import batch_log
from config import INDEXER_BLOCK_LOT_MAX_SIZE, INDEXER_SYNC_INTERVAL, ZERO_ADDRESS
Expand All @@ -54,7 +53,7 @@

class Processor:
def __init__(self):
self.token_list: dict[str, AsyncContract] = {}
self.token_list: dict[str, AsyncContractEventsView] = {}

async def sync_new_logs(self):
db_session = BatchAsyncSessionLocal()
Expand Down Expand Up @@ -151,7 +150,9 @@ async def __get_token_list(self, db_session: AsyncSession):
token_contract = web3.eth.contract(
address=load_required_token.token_address, abi=load_required_token.abi
)
self.token_list[load_required_token.token_address] = token_contract
self.token_list[load_required_token.token_address] = (
AsyncContractEventsView(token_contract.address, token_contract.events)
)

@staticmethod
async def __get_idx_transfer_block_number(db_session: AsyncSession):
Expand Down Expand Up @@ -303,7 +304,7 @@ async def __sink_on_transfer(
transfer_record.from_address = from_address
transfer_record.to_address = to_address
transfer_record.amount = amount
transfer_record.source_event = source_event.value
transfer_record.source_event = source_event
transfer_record.data = data
transfer_record.message = message
transfer_record.block_timestamp = block_timestamp
Expand Down
8 changes: 5 additions & 3 deletions batch/indexer_transfer_approval.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
Token,
TokenType,
)
from app.utils.contract_utils import AsyncContractUtils
from app.utils.contract_utils import AsyncContractEventsView, AsyncContractUtils
from app.utils.web3_utils import AsyncWeb3Wrapper
from batch.utils import batch_log
from config import INDEXER_BLOCK_LOT_MAX_SIZE, INDEXER_SYNC_INTERVAL, ZERO_ADDRESS
Expand Down Expand Up @@ -71,7 +71,7 @@

class Processor:
def __init__(self):
self.token_list: dict[str, AsyncContract] = {}
self.token_list: dict[str, AsyncContractEventsView] = {}
self.exchange_list: list[AsyncContract] = []
self.token_type_map: dict[str, TokenType] = {}

Expand Down Expand Up @@ -165,7 +165,9 @@ async def __get_contract_list(self, db_session: AsyncSession):
token_contract = web3.eth.contract(
address=load_required_token.token_address, abi=load_required_token.abi
)
self.token_list[load_required_token.token_address] = token_contract
self.token_list[load_required_token.token_address] = (
AsyncContractEventsView(token_contract.address, token_contract.events)
)
self.token_type_map[load_required_token.token_address] = (
load_required_token.type
)
Expand Down
18 changes: 11 additions & 7 deletions batch/processor_create_utxo.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@
from sqlalchemy import and_, create_engine, select
from sqlalchemy.exc import DBAPIError, SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from web3.contract import AsyncContract

from app.database import BatchAsyncSessionLocal
from app.exceptions import ServiceUnavailableError
from app.model.blockchain import IbetShareContract, IbetStraightBondContract
from app.model.db import UTXO, Account, Token, TokenType, UTXOBlockNumber
from app.utils.contract_utils import AsyncContractUtils
from app.utils.contract_utils import AsyncContractEventsView, AsyncContractUtils
from app.utils.ledger_utils import request_ledger_creation
from app.utils.web3_utils import AsyncWeb3Wrapper
from batch.utils import batch_log
Expand All @@ -60,7 +59,7 @@

class Processor:
def __init__(self):
self.token_contract_list: list[AsyncContract] = []
self.token_contract_list: list[AsyncContractEventsView] = []
self.token_type_map: dict[str, TokenType] = {}

async def process(self):
Expand Down Expand Up @@ -151,7 +150,12 @@ async def __refresh_token_contract_list(self, db_session: AsyncSession):
token_contract = AsyncContractUtils.get_contract(
contract_name=_token.type, contract_address=_token.token_address
)
self.token_contract_list.append(token_contract)
self.token_contract_list.append(
AsyncContractEventsView(
token_contract.address,
token_contract.events,
)
)
self.token_type_map[_token.token_address] = _token.type

@staticmethod
Expand All @@ -177,7 +181,7 @@ async def __set_utxo_block_number(db_session: AsyncSession, block_number: int):
async def __process_transfer(
self,
db_session: AsyncSession,
token_contract: AsyncContract,
token_contract: AsyncContractEventsView,
block_from: int,
block_to: int,
):
Expand Down Expand Up @@ -333,7 +337,7 @@ async def __process_transfer(
async def __process_issue(
self,
db_session: AsyncSession,
token_contract: AsyncContract,
token_contract: AsyncContractEventsView,
block_from: int,
block_to: int,
):
Expand Down Expand Up @@ -389,7 +393,7 @@ async def __process_issue(
async def __process_redeem(
self,
db_session: AsyncSession,
token_contract: AsyncContract,
token_contract: AsyncContractEventsView,
block_from: int,
block_to: int,
):
Expand Down

0 comments on commit 424795b

Please sign in to comment.