Skip to content

Commit

Permalink
add collector service tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dragoon committed Nov 18, 2023
1 parent 82b44fd commit 27e42c0
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 35 deletions.
5 changes: 4 additions & 1 deletion datacollector/config/data_collector_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@ def get_data_collector_service(asset_symbol: str, dt: datetime | None) -> DataCo
data_service = DataProcessService(data_repo, dt_service)
book_manager = BookManager(asset_symbol)

collector_service = DataCollectorService(data_service, symbol=asset_symbol, book_manager=book_manager)
collector_service = DataCollectorService(data_service,
dt_service=dt_service,
symbol=asset_symbol,
book_manager=book_manager)
return collector_service
65 changes: 31 additions & 34 deletions datacollector/services/collector_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from binance import DepthCacheManager, AsyncClient, Client

from datacollector.services.data_process_service import DataProcessService
from services.datetime_service import DateTimeService


class BookManager:
Expand All @@ -15,32 +16,34 @@ class BookManager:
def __init__(self, asset_symbol):
self.asset_symbol = asset_symbol

async def connect(self):
self.client = await AsyncClient.create()
self.dcm = DepthCacheManager(self.client, symbol=f'{self.asset_symbol.upper()}USDT',
limit=5000, # initial number of order from the order book
refresh_interval=0, # disable cache refresh
ws_interval=100 # update interval on websocket, ms
)

@asynccontextmanager
async def get_dcm_socket(self):
async with self.dcm as dcm_socket:
yield dcm_socket

async def close(self):
if self.client:
await self.client.close_connection()
async def get_data(self):
try:
self.client = await AsyncClient.create()
self.dcm = DepthCacheManager(self.client, symbol=f'{self.asset_symbol.upper()}USDT',
limit=5000,
refresh_interval=0,
ws_interval=100)
async with self.dcm as dcm_socket:
while True:
data = await dcm_socket.recv()
yield data
finally:
if self.client:
await self.client.close_connection()


class DataCollectorService:
logger: logging.Logger
dt_service: DateTimeService
asset_symbol: str
max_retries: int
retry_delay: int
last_stored_minute: int = None

def __init__(self, data_service: DataProcessService, book_manager, symbol, max_retries=5, retry_delay=1):
def __init__(self, data_service: DataProcessService, dt_service: DateTimeService, book_manager: BookManager,
symbol: str, max_retries=5, retry_delay=1):
self.data_service = data_service
self.dt_service = dt_service
self.asset_symbol = symbol
self.max_retries = max_retries
self.retry_delay = retry_delay
Expand All @@ -51,12 +54,13 @@ async def collect_data(self):
retry_count = 0
while True:
try:
await self.book_manager.connect()
self.logger.info(f"Starting order book collection for {self.asset_symbol}-USDT")

async with self.book_manager.get_dcm_socket() as dcm_socket:
await self._process_depth_cache(dcm_socket)
async for data in self.book_manager.get_data():
await self._process_depth_cache(data)
retry_count = 0
# in production the data will always continue
break

except asyncio.TimeoutError as e:
self.logger.error(f"Network error: {e}. Reconnecting...")
Expand All @@ -73,18 +77,11 @@ async def collect_data(self):
wait = self.retry_delay * 2 ** min(retry_count, self.max_retries)
self.logger.info(f"Attempting to reconnect in {wait} seconds...")
await asyncio.sleep(wait)
continue # Continue the while loop to retry connection

finally:
await self.book_manager.close()

async def _process_depth_cache(self, dcm_socket):
last_stored_minute = None
while True:
ob = await dcm_socket.recv()
current_time = self.data_service.dt_service.get_datetime()
current_minute = current_time.minute
if current_minute != last_stored_minute:
data_entry = self.data_service.compute_data_entry(ob, current_time)
self.data_service.insert_one(data_entry)
last_stored_minute = current_minute
async def _process_depth_cache(self, ob):
current_time = self.dt_service.get_datetime()
current_minute = current_time.minute
if current_minute != self.last_stored_minute:
data_entry = self.data_service.compute_data_entry(ob, current_time)
self.data_service.insert_one(data_entry)
self.last_stored_minute = current_minute
50 changes: 50 additions & 0 deletions tests/datacollector/services/collector_service_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import unittest
from unittest.mock import Mock, patch, AsyncMock
import asyncio

from datacollector.services.data_process_service import DataProcessService
from datacollector.services.collector_service import DataCollectorService, BookManager
from tests.datacollector.util import string_to_date
from services.datetime_service import DateTimeService


async def async_generator(mock_data):
for item in mock_data:
yield item


class TestDataCollectorService(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.data_service = Mock(spec=DataProcessService)
self.book_manager = AsyncMock(spec=BookManager)
self.dt_service = DateTimeService(dt=string_to_date("2023-12-12")) # fix the time
self.collector_service = DataCollectorService(self.data_service, self.dt_service, self.book_manager, "BTC")

@patch('datacollector.services.collector_service.asyncio.sleep', new_callable=AsyncMock)
async def test_collect_data_normal(self, mock_sleep):
self.book_manager.get_data.side_effect = lambda: async_generator([{}, {}])

# Test collect_data under normal conditions
await self.collector_service.collect_data()
# will be one because the minute didn't change
self.assertEqual(1, self.data_service.insert_one.call_count)

@patch('datacollector.services.collector_service.asyncio.sleep', new_callable=AsyncMock)
async def test_collect_data_timeout_error(self, mock_sleep):
self.book_manager.get_data.side_effect = [asyncio.TimeoutError("Timeout"), Exception("Error")]
await self.collector_service.collect_data()
# 1 timeout error + max_retries
self.assertEqual(self.collector_service.max_retries + 1, mock_sleep.call_count)

@patch('datacollector.services.collector_service.asyncio.sleep', new_callable=AsyncMock)
async def test_collect_data_generic_exception(self, mock_sleep):
# Test collect_data handling generic exception
self.book_manager.get_data.side_effect = Exception("Error")
await self.collector_service.collect_data()
self.assertEqual(self.collector_service.max_retries, mock_sleep.call_count)

# You can add more tests for different scenarios and exception handling


if __name__ == '__main__':
unittest.main()

0 comments on commit 27e42c0

Please sign in to comment.