Skip to content

Commit

Permalink
Updated data adapters live to wait for the orderbook and strategy to …
Browse files Browse the repository at this point in the history
…process historical and gave strategy ability to wait for positions/account data to load.
  • Loading branch information
anthonyb8 committed Feb 14, 2025
1 parent e1a0c02 commit 4527e36
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 24 deletions.
44 changes: 41 additions & 3 deletions midastrader/core/adapters/base_strategy.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import queue
import threading
import pandas as pd
import importlib.util
from typing import Type
Expand Down Expand Up @@ -45,6 +46,7 @@ def __init__(self, symbols_map: SymbolMap, bus: MessageBus):
self.order_book = OrderBook.get_instance()
self.portfolio_server = PortfolioServer.get_instance()
self.historical_data = None
self.threads = []

# Subscribe to orderbook updates
self.orderbook_queue = self.bus.subscribe(EventType.ORDER_BOOK)
Expand All @@ -58,17 +60,49 @@ def process(self) -> None:
event_type (EventType): The type of the event (e.g., `MARKET_DATA`).
event (MarketEvent): The market event containing data to process.
"""
self.logger.info("Strategy running ...")
self.is_running.set()
try:
self.threads.append(
threading.Thread(target=self.process_orderbook, daemon=True)
)
self.threads.append(
threading.Thread(target=self.process_initial_data, daemon=True)
)

for thread in self.threads:
thread.start()

self.logger.info("Strategy running ...")
self.is_running.set()

for thread in self.threads:
thread.join()

finally:
self.cleanup()

def process_orderbook(self) -> None:
"""
Handles incoming events and processes them according to the strategy logic.
Args:
subject (Subject): The subject that triggered the event.
event_type (EventType): The type of the event (e.g., `MARKET_DATA`).
event (MarketEvent): The market event containing data to process.
"""
while not self.shutdown_event.is_set():
try:
event = self.orderbook_queue.get(timeout=0.01)
self.handle_event(event)
except queue.Empty:
continue

self.cleanup()
def process_initial_data(self) -> None:
while not self.shutdown_event.is_set():
if self.bus.get_flag(EventType.INITIAL_DATA):
self.handle_initial_data()
break

self.logger.info("Strategy process initial data thread ending.")

def cleanup(self):
while True:
Expand All @@ -85,6 +119,10 @@ def cleanup(self):
def handle_event(self, event: MarketEvent) -> None:
pass

# @abstractmethod
def handle_initial_data(self) -> None:
pass

@abstractmethod
def get_strategy_data(self) -> pd.DataFrame:
"""
Expand Down
19 changes: 19 additions & 0 deletions midastrader/core/adapters/portfolio/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import queue
import time
import threading
from typing import Dict
from threading import Lock
Expand Down Expand Up @@ -143,6 +144,10 @@ def process(self):
threading.Thread(target=self.process_account, daemon=True)
)

self.threads.append(
threading.Thread(target=self.initial_data, daemon=True)
)

for thread in self.threads:
thread.start()

Expand All @@ -159,6 +164,20 @@ def cleanup(self) -> None:
self.logger.info("Shutting down PortfolioserverManager...")
self.is_shutdown.set()

def initial_data(self) -> None:
initial_data = False

while not initial_data:
initial_data = all(
[
self.server.position_manager.initial_data,
self.server.account_manager.initial_data,
]
)
time.sleep(0.1)

self.bus.publish(EventType.INITIAL_DATA, True)

def process_orders(self) -> None:
"""
Continuously processes market data events in a loop.
Expand Down
10 changes: 10 additions & 0 deletions midastrader/core/adapters/portfolio/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(self):
self.logger = SystemLogger.get_logger()
self.positions: Dict[int, Position] = {}
self.pending_positions_update = set()
self.initial_data = False

@property
def get_positions(self) -> Dict[int, Position]:
Expand Down Expand Up @@ -167,6 +168,10 @@ def update_positions(self, instrument_id: int, position: Position) -> None:
# Notify listener and log
self.pending_positions_update.discard(instrument_id)

# Signal the orders have been updated atleast once
if not self.initial_data:
self.initial_data = True

def _output_positions(self) -> str:
"""
Generates a formatted string representation of all positions for logging.
Expand Down Expand Up @@ -201,6 +206,7 @@ def __init__(self):
"""
self.logger = SystemLogger.get_logger()
self.account: Account = Account(0, 0, 0, 0, 0, 0, 0, "", 0, 0, 0)
self.initial_data = False

@property
def get_capital(self) -> float:
Expand Down Expand Up @@ -232,3 +238,7 @@ def update_account_details(self, account_details: Account) -> None:
self.logger.debug(
f"\nACCOUNT UPDATED: \n{self.account.pretty_print(" ")}"
)

# Signal the orders have been updated atleast once
if not self.initial_data:
self.initial_data = True
6 changes: 5 additions & 1 deletion midastrader/data/adaptors/ib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ def __init__(self, symbols_map: SymbolMap, bus: MessageBus, **kwargs):
super().__init__(symbols_map, bus)

self.logger = SystemLogger.get_logger()
self.app = DataApp(bus, kwargs["tick_interval"])
self.app = DataApp(bus)
self.data_type = LiveDataType[kwargs["data_type"].upper()]
self.host = kwargs["host"]
self.port = int(kwargs["port"])
self.clientId = kwargs["client_id"]
self.account = kwargs["account_id"]
self.tick_interval: int = kwargs["tick_interval"]
self.lock = threading.Lock()

self.validated_contracts = {}
Expand Down Expand Up @@ -242,6 +243,9 @@ def stream_quote_data(self, contract: Contract) -> None:
Args:
contract (Contract): The financial contract for which quote data is requested.
"""
# Needed if controlling the flow f tickes so as o not throttle system
self.app.set_tick_interval(self.tick_interval)

reqId = self._get_valid_id()
instrument_id = self.symbols_map.get_id(contract.symbol)

Expand Down
36 changes: 26 additions & 10 deletions midastrader/data/adaptors/ib/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime
from mbinary import OhlcvMsg
import threading
from typing import Union, Optional
from typing import Union
from decimal import Decimal
from ibapi.common import TickAttrib
from ibapi.client import EClient
Expand Down Expand Up @@ -35,7 +35,7 @@ class DataApp(EWrapper, EClient):
timer_thread (threading.Thread): Timer thread for periodic operations.
"""

def __init__(self, message_bus: MessageBus, tick_interval: Optional[int]):
def __init__(self, message_bus: MessageBus):
"""
Initializes a new instance of DataApp, setting up attributes for managing data interactions with IB API.
Expand All @@ -52,6 +52,11 @@ def __init__(self, message_bus: MessageBus, tick_interval: Optional[int]):
self.reqId_to_instrument = {}
self.tick_data = {}

# Tick Data specific
self.update_interval: int = 0
self.is_running: bool = False
self.timer_thread: Union[None, threading.Thread] = None

# Event Handling
self.connected_event = threading.Event()
self.valid_id_event = threading.Event()
Expand All @@ -62,13 +67,23 @@ def __init__(self, message_bus: MessageBus, tick_interval: Optional[int]):

# Tick interval updater FOR TICK DATA
# Seconds interval for pushing the event
if tick_interval:
self.update_interval = tick_interval
self.is_running = True
self.timer_thread = threading.Thread(
target=self._run_timer, daemon=True
)
self.timer_thread.start()
# self.logger.info(tick_interval)
# if tick_interval:
# self.update_interval = tick_interval
# self.is_running = True
# self.timer_thread = threading.Thread(
# target=self._run_timer, daemon=True
# )
# self.timer_thread.start()

def set_tick_interval(self, tick_interval: int) -> None:
self.update_interval = tick_interval
self.is_running = True
self.timer_thread = threading.Thread(
target=self._run_timer,
daemon=True,
)
self.timer_thread.start()

def _run_timer(self) -> None:
"""
Expand All @@ -83,7 +98,8 @@ def stop(self) -> None:
Gracefully stops the timer thread and other resources.
"""
self.is_running = False
self.timer_thread.join()
if self.timer_thread:
self.timer_thread.join()
self.logger.info("Shutting down the DataApp.")

def error(
Expand Down
10 changes: 4 additions & 6 deletions midastrader/data/engine.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import threading
from enum import Enum
from typing import Dict
Expand Down Expand Up @@ -100,13 +101,10 @@ def start_live(self):
"""Start adapters in seperate threads."""
# Start historical data and load all into the buffer and processes buffer
historical = self.adapters["historical"]
hist_thread = threading.Thread(target=historical.process, daemon=True)
hist_thread.start()
historical.is_shutdown.wait()
hist_thread.join()
historical.process()

while not self.message_bus.topics[EventType.DATA].empty():
continue
while not self.message_bus.is_queue_empty(EventType.ORDER_BOOK):
time.sleep(0.1) # Add a small sleep to avoid busy-waiting

self.logger.info("Historical data fully processed ...")

Expand Down
4 changes: 1 addition & 3 deletions midastrader/execution/adaptors/ib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ def handle_order(self, event: OrderEvent) -> None:
orderId = self._get_valid_id()
ib_order = order.ib_order()
symbol = self.symbols_map.get_symbol_by_id(order.instrument_id)
self.logger.info(
f"Order Id - {orderId},Ib order {ib_order}, symbol: {symbol.midas_ticker}"
)

if symbol:
self.app.placeOrder(
orderId=orderId,
Expand Down
14 changes: 14 additions & 0 deletions midastrader/message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class EventType(Enum):
RISK_UPDATE = auto()

# Flags
INITIAL_DATA = auto()
UPDATE_EQUITY = auto()
UPDATE_SYSTEM = auto()
ROLLED_OVER = auto()
Expand Down Expand Up @@ -65,6 +66,7 @@ def __init__(self):
EventType.ACCOUNT_UPDATE_LOG: queue.Queue(),
EventType.EQUITY_UPDATE: queue.Queue(),
EventType.TRADE_UPDATE: queue.Queue(),
EventType.INITIAL_DATA: False,
EventType.ORDER_BOOK_UPDATED: False,
EventType.OB_PROCESSED: False,
EventType.EOD_PROCESSED: False,
Expand Down Expand Up @@ -122,3 +124,15 @@ def get_flag(self, topic: EventType) -> object:
):
raise ValueError(f"Topic '{topic}' is not a flag-based topic.")
return self.topics[topic]

def is_queue_empty(self, topic: EventType) -> bool:
with self.lock:
if topic not in self.topics:
raise ValueError(f"Topic '{topic} is not defined.")

if not isinstance(self.topics[topic], queue.Queue):
raise ValueError(
f"Topic '{topic}' is not a queue-based topic."
)

return self.topics[topic].empty()
3 changes: 3 additions & 0 deletions tests/unit/core/performance/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def primer(self):
def prepare(self):
pass

def handle_initial_data(self):
pass

def handle_event(self, event: MarketEvent) -> None:
pass

Expand Down
3 changes: 2 additions & 1 deletion tests/unit/data/ib/test_data_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
class TestDataApp(unittest.TestCase):
def setUp(self):
self.bus = MessageBus()
self.data_app = DataApp(self.bus, tick_interval=5)
self.tick_interval = 5
self.data_app = DataApp(self.bus)

def test_200_error_valid(self):
# Simulate an error code for contract not found
Expand Down

0 comments on commit 4527e36

Please sign in to comment.