From 27e42c0c5dbecbf04cc11c7da3e340a8cd76494d Mon Sep 17 00:00:00 2001 From: Roman Prokofyev Date: Sat, 18 Nov 2023 21:51:37 +0100 Subject: [PATCH] add collector service tests --- datacollector/config/data_collector_config.py | 5 +- datacollector/services/collector_service.py | 65 +++++++++---------- .../services/collector_service_test.py | 50 ++++++++++++++ 3 files changed, 85 insertions(+), 35 deletions(-) create mode 100644 tests/datacollector/services/collector_service_test.py diff --git a/datacollector/config/data_collector_config.py b/datacollector/config/data_collector_config.py index dbe74af..abcf55e 100644 --- a/datacollector/config/data_collector_config.py +++ b/datacollector/config/data_collector_config.py @@ -18,5 +18,8 @@ def get_data_collector_service(asset_symbol: str, dt: datetime | None) -> DataCo data_service = DataProcessService(data_repo, dt_service) book_manager = BookManager(asset_symbol) - collector_service = DataCollectorService(data_service, symbol=asset_symbol, book_manager=book_manager) + collector_service = DataCollectorService(data_service, + dt_service=dt_service, + symbol=asset_symbol, + book_manager=book_manager) return collector_service diff --git a/datacollector/services/collector_service.py b/datacollector/services/collector_service.py index d155aca..14d23c4 100644 --- a/datacollector/services/collector_service.py +++ b/datacollector/services/collector_service.py @@ -5,6 +5,7 @@ from binance import DepthCacheManager, AsyncClient, Client from datacollector.services.data_process_service import DataProcessService +from services.datetime_service import DateTimeService class BookManager: @@ -15,32 +16,34 @@ class BookManager: def __init__(self, asset_symbol): self.asset_symbol = asset_symbol - async def connect(self): - self.client = await AsyncClient.create() - self.dcm = DepthCacheManager(self.client, symbol=f'{self.asset_symbol.upper()}USDT', - limit=5000, # initial number of order from the order book - refresh_interval=0, # disable cache refresh - ws_interval=100 # update interval on websocket, ms - ) - - @asynccontextmanager - async def get_dcm_socket(self): - async with self.dcm as dcm_socket: - yield dcm_socket - - async def close(self): - if self.client: - await self.client.close_connection() + async def get_data(self): + try: + self.client = await AsyncClient.create() + self.dcm = DepthCacheManager(self.client, symbol=f'{self.asset_symbol.upper()}USDT', + limit=5000, + refresh_interval=0, + ws_interval=100) + async with self.dcm as dcm_socket: + while True: + data = await dcm_socket.recv() + yield data + finally: + if self.client: + await self.client.close_connection() class DataCollectorService: logger: logging.Logger + dt_service: DateTimeService asset_symbol: str max_retries: int retry_delay: int + last_stored_minute: int = None - def __init__(self, data_service: DataProcessService, book_manager, symbol, max_retries=5, retry_delay=1): + def __init__(self, data_service: DataProcessService, dt_service: DateTimeService, book_manager: BookManager, + symbol: str, max_retries=5, retry_delay=1): self.data_service = data_service + self.dt_service = dt_service self.asset_symbol = symbol self.max_retries = max_retries self.retry_delay = retry_delay @@ -51,12 +54,13 @@ async def collect_data(self): retry_count = 0 while True: try: - await self.book_manager.connect() self.logger.info(f"Starting order book collection for {self.asset_symbol}-USDT") - async with self.book_manager.get_dcm_socket() as dcm_socket: - await self._process_depth_cache(dcm_socket) + async for data in self.book_manager.get_data(): + await self._process_depth_cache(data) retry_count = 0 + # in production the data will always continue + break except asyncio.TimeoutError as e: self.logger.error(f"Network error: {e}. Reconnecting...") @@ -73,18 +77,11 @@ async def collect_data(self): wait = self.retry_delay * 2 ** min(retry_count, self.max_retries) self.logger.info(f"Attempting to reconnect in {wait} seconds...") await asyncio.sleep(wait) - continue # Continue the while loop to retry connection - finally: - await self.book_manager.close() - - async def _process_depth_cache(self, dcm_socket): - last_stored_minute = None - while True: - ob = await dcm_socket.recv() - current_time = self.data_service.dt_service.get_datetime() - current_minute = current_time.minute - if current_minute != last_stored_minute: - data_entry = self.data_service.compute_data_entry(ob, current_time) - self.data_service.insert_one(data_entry) - last_stored_minute = current_minute + async def _process_depth_cache(self, ob): + current_time = self.dt_service.get_datetime() + current_minute = current_time.minute + if current_minute != self.last_stored_minute: + data_entry = self.data_service.compute_data_entry(ob, current_time) + self.data_service.insert_one(data_entry) + self.last_stored_minute = current_minute diff --git a/tests/datacollector/services/collector_service_test.py b/tests/datacollector/services/collector_service_test.py new file mode 100644 index 0000000..ae998de --- /dev/null +++ b/tests/datacollector/services/collector_service_test.py @@ -0,0 +1,50 @@ +import unittest +from unittest.mock import Mock, patch, AsyncMock +import asyncio + +from datacollector.services.data_process_service import DataProcessService +from datacollector.services.collector_service import DataCollectorService, BookManager +from tests.datacollector.util import string_to_date +from services.datetime_service import DateTimeService + + +async def async_generator(mock_data): + for item in mock_data: + yield item + + +class TestDataCollectorService(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.data_service = Mock(spec=DataProcessService) + self.book_manager = AsyncMock(spec=BookManager) + self.dt_service = DateTimeService(dt=string_to_date("2023-12-12")) # fix the time + self.collector_service = DataCollectorService(self.data_service, self.dt_service, self.book_manager, "BTC") + + @patch('datacollector.services.collector_service.asyncio.sleep', new_callable=AsyncMock) + async def test_collect_data_normal(self, mock_sleep): + self.book_manager.get_data.side_effect = lambda: async_generator([{}, {}]) + + # Test collect_data under normal conditions + await self.collector_service.collect_data() + # will be one because the minute didn't change + self.assertEqual(1, self.data_service.insert_one.call_count) + + @patch('datacollector.services.collector_service.asyncio.sleep', new_callable=AsyncMock) + async def test_collect_data_timeout_error(self, mock_sleep): + self.book_manager.get_data.side_effect = [asyncio.TimeoutError("Timeout"), Exception("Error")] + await self.collector_service.collect_data() + # 1 timeout error + max_retries + self.assertEqual(self.collector_service.max_retries + 1, mock_sleep.call_count) + + @patch('datacollector.services.collector_service.asyncio.sleep', new_callable=AsyncMock) + async def test_collect_data_generic_exception(self, mock_sleep): + # Test collect_data handling generic exception + self.book_manager.get_data.side_effect = Exception("Error") + await self.collector_service.collect_data() + self.assertEqual(self.collector_service.max_retries, mock_sleep.call_count) + + # You can add more tests for different scenarios and exception handling + + +if __name__ == '__main__': + unittest.main()