diff --git a/midastrader/core/adapters/base_strategy.py b/midastrader/core/adapters/base_strategy.py index ba826e6..e7c2f86 100644 --- a/midastrader/core/adapters/base_strategy.py +++ b/midastrader/core/adapters/base_strategy.py @@ -1,6 +1,4 @@ import queue - -# import threading import pandas as pd import importlib.util from typing import Type @@ -47,7 +45,6 @@ def __init__(self, symbols_map: SymbolMap, bus: MessageBus): self.order_book = OrderBook.get_instance() self.portfolio_server = PortfolioServer.get_instance() self.historical_data = None - # self.running = threading.Event() # Subscribe to orderbook updates self.orderbook_queue = self.bus.subscribe(EventType.ORDER_BOOK) diff --git a/midastrader/core/adapters/order_book.py b/midastrader/core/adapters/order_book.py index 39234ed..fb6214e 100644 --- a/midastrader/core/adapters/order_book.py +++ b/midastrader/core/adapters/order_book.py @@ -9,8 +9,6 @@ from midastrader.structs.symbol import SymbolMap from midastrader.structs.events import MarketEvent, EODEvent from midastrader.message_bus import MessageBus, EventType - -# from midastrader.structs.events.base import SystemEvent from midastrader.core.adapters.base import CoreAdapter from midastrader.utils.logger import SystemLogger @@ -77,7 +75,6 @@ def retrieve_all(self) -> Dict[int, RecordMsg]: """ Retrieve market data for all instruments. """ - # with self._lock: # Ensure thread-safe read for a full copy return self._book.copy() # Methods reserved for OrderBookManager @@ -113,11 +110,9 @@ def __init__(self, symbols_map: SymbolMap, bus: MessageBus, mode: Mode): super().__init__(symbols_map, bus) self.mode = mode self.book = OrderBook.get_instance() - # self.running = threading.Event() # Subscribe to events self.data_queue = self.bus.subscribe(EventType.DATA) - # self.equity_update_flag = self.bus.subscribe(EventType.EQUITY_UPDATED) def process(self) -> None: """ @@ -136,8 +131,6 @@ def process(self) -> None: self.handle_record(item) elif isinstance(item, EODEvent): self.handle_eod(item) - # elif isinstance(item, RolloverEvent): - # self.handle_rollover() except queue.Empty: continue @@ -147,7 +140,7 @@ def cleanup(self) -> None: while True: try: item = self.data_queue.get(timeout=1) - if RecordMsg.is_record(item): # isinstance(item, RecordMsg): + if RecordMsg.is_record(item): self.handle_record(item) elif isinstance(item, EODEvent): self.handle_eod(item) @@ -159,8 +152,6 @@ def cleanup(self) -> None: def handle_eod(self, event: EODEvent) -> None: self.logger.debug(event) - # Publish that EOD processing is complete - # processed only in backtest situations self.bus.publish(EventType.EOD, True) while self.bus.get_flag(EventType.EOD): @@ -177,23 +168,12 @@ def handle_record(self, record: RecordMsg) -> None: self.book._update(record) # Put market event in the event queue - market_event = MarketEvent( - timestamp=record.ts_event, - data=record, - ) - - # self.logger.debug(market_event) # delete + market_event = MarketEvent(record.ts_event, record) # Check inital data loaded if not self.book.tickers_loaded: self.book._tickers_loaded = self.check_tickers_loaded() - # Backtest only - # if self.mode == Mode.BACKTEST: - - # Notify any observers about the market update - # self.bus.publish(EventType.ORDER_BOOK, market_event) - if self.mode == Mode.BACKTEST: self.await_equity_updated() self.await_market_data_processed(market_event) @@ -201,7 +181,6 @@ def handle_record(self, record: RecordMsg) -> None: self.bus.publish(EventType.ORDER_BOOK, market_event) def handle_rollover(self, record: RecordMsg) -> None: - # if record.rollover_flag == 1: id = record.hd.instrument_id symbol = self.symbols_map.get_symbol_by_id(id) @@ -219,76 +198,6 @@ def handle_rollover(self, record: RecordMsg) -> None: self.await_rollover_flag(rollover_event) - # self.book._update(record) - - # self.await_rollover_flag(rollover_event) - - # def handle_event(self, event: ) -> None: - # """ - # Handles market data events and updates the order book. - # - # Behavior: - # - Updates the order book with the new market data. - # - Logs the market event. - # - Checks if initial data for all tickers has been loaded. - # - Notifies observers of the updated market state. - # - # Args: - # subject (Subject): The subject that triggered the event. - # event_type (EventType): The type of event being handled (e.g., `MARKET_DATA`). - # record (RecordMsg): The market data record to process. - # - # """ - # if isinstance(event, EODEvent): - # self.logger.debug(event) - # # Publish that EOD processing is complete - # # processed only in backtest situations - # self.bus.publish(EventType.EOD, True) - # - # while self.bus.get_flag(EventType.EOD): - # continue - # - # self.bus.publish(EventType.EOD_PROCESSED, True) - # return - # - # # if self.mode == Mode.BACKTEST: - # # if event.rollover_flag == 1: - # # rollover_event = RolloverEvent( - # # event.hd.ts_event, - # # self.symbols_map.get_symbol_by_id(event.hd.instrument_id), - # # self.book.retrieve(event.hd.instrument_id), - # # event.close, # update - # # ) - # # - # # self.await_rollover_flag(rollover_event) - # - # # Update the order book with the new market data - # self.book._update(event) - # - # # Put market event in the event queue - # market_event = MarketEvent( - # timestamp=event.ts_event, - # data=event, - # ) - # - # self.logger.debug(market_event) - # - # # Check inital data loaded - # if not self.book.tickers_loaded: - # self.book._tickers_loaded = self.check_tickers_loaded() - # - # # Backtest only - # # if self.mode == Mode.BACKTEST: - # - # # Notify any observers about the market update - # # self.bus.publish(EventType.ORDER_BOOK, market_event) - # - # if self.mode == Mode.BACKTEST: - # self.await_equity_updated() - # self.await_market_data_processed(market_event) - # else: - # self.bus.publish(EventType.ORDER_BOOK, market_event) - def check_tickers_loaded(self) -> bool: """ Checks if market data for all tickers in the symbol map has been loaded. @@ -300,13 +209,6 @@ def check_tickers_loaded(self) -> bool: self.book._book.keys() ) - # def await_updates(self): - # """ - # Waits for the EOD_PROCESSED flag to be set. - # """ - # self.await_equity_updated() - # self.await_system_updated() - def await_rollover_flag( self, event: RolloverEvent, diff --git a/midastrader/core/adapters/order_manager.py b/midastrader/core/adapters/order_manager.py index d81a165..f001d46 100644 --- a/midastrader/core/adapters/order_manager.py +++ b/midastrader/core/adapters/order_manager.py @@ -32,7 +32,6 @@ def __init__(self, symbols_map: SymbolMap, bus: MessageBus): super().__init__(symbols_map, bus) self.order_book = OrderBook.get_instance() self.portfolio_server = PortfolioServer.get_instance() - # self.running = threading.Event() # Subcriptions self.signal_queue = self.bus.subscribe(EventType.SIGNAL) @@ -68,7 +67,7 @@ def process(self) -> None: self.cleanup() - def cleanup(self): + def cleanup(self) -> None: while True: try: event = self.signal_queue.get(timeout=1) @@ -152,29 +151,14 @@ def _handle_signal( if not order: self.logger.warning( - f"Skipping order {signal.signal_id}: error creating order." + f"Error creating order {signal.signal_id}." ) return - # - # if current_price is None: - # self.logger.warning( - # f"Skipping trade {trade.trade_id}: No current price for {symbol.instrument_id}" - # ) - # return - order_cost = symbol.cost( float(order.quantity), current_price.pretty_price, ) - # order_details = { - # "timestamp": timestamp, - # # "signal_id": signal.signal_id, - # # "action": signal.action, - # # "symbol": symbol, - # "order": order, - # } - # orders.append(order) # SELL/Cover are exits so available capital will be freed up @@ -187,27 +171,12 @@ def _handle_signal( ) if total_capital_required <= self.portfolio_server.capital: - # for order in orders: self._set_order(timestamp, orders) - - # order["timestamp"], - # order["signal_id"], - # order["action"], - # order["symbol"], - # order["order"], - # ) else: - self.logger.debug("Not enough capital to execute all orders") + self.logger.info("Not enough capital to execute all orders") self.bus.publish(EventType.UPDATE_SYSTEM, False) - def _set_order( - self, - timestamp: int, - # signal_id: int, - # action: Action, - # symbol: Symbol, - orders: List[BaseOrder], - ) -> None: + def _set_order(self, timestamp: int, orders: List[BaseOrder]) -> None: """ Queues an OrderEvent for execution based on the provided order details. @@ -223,14 +192,7 @@ def _set_order( RuntimeError: If creating the `OrderEvent` fails due to invalid input or unexpected errors. """ try: - order_event = OrderEvent( - timestamp=timestamp, - # signal_id=signal_id, - # action=action, - # symbol=symbol, - orders=orders, - ) - self.logger.info(order_event) + order_event = OrderEvent(timestamp, orders) self.bus.publish(EventType.ORDER, order_event) except (ValueError, TypeError) as e: raise RuntimeError(f"Failed to set OrderEvent due to input : {e}") diff --git a/midastrader/core/adapters/performance/base.py b/midastrader/core/adapters/performance/base.py index 35a2c26..51763b1 100644 --- a/midastrader/core/adapters/performance/base.py +++ b/midastrader/core/adapters/performance/base.py @@ -88,7 +88,6 @@ def __init__( self.output_dir = output_dir self._strategy: Optional[BaseStrategy] = None self.threads = [] - # self.running = threading.Event() # Subscribe to events self.account_queue = self.bus.subscribe(EventType.ACCOUNT_UPDATE_LOG) diff --git a/midastrader/core/adapters/performance/managers.py b/midastrader/core/adapters/performance/managers.py index 8dfe560..7eecec7 100644 --- a/midastrader/core/adapters/performance/managers.py +++ b/midastrader/core/adapters/performance/managers.py @@ -6,36 +6,15 @@ from quant_analytics.backtest.metrics import Metrics from midastrader.structs.trade import Trade +from midastrader.utils.unix import resample_timestamp +from midastrader.structs.account import EquityDetails, Account +from midastrader.structs.symbol import SymbolMap +from midastrader.utils.logger import SystemLogger from midastrader.structs.events import ( SignalEvent, TradeEvent, TradeCommissionEvent, ) -from midastrader.utils.unix import resample_timestamp -from midastrader.structs.account import EquityDetails, Account -from midastrader.structs.symbol import SymbolMap -from midastrader.utils.logger import SystemLogger - - -# def _convert_timestamp(df: pd.DataFrame, column: str = "timestamp") -> None: -# """ -# Converts a Unix timestamp column in a DataFrame to a localized and human-readable timestamp. -# -# The function converts Unix timestamps in the specified column to ISO 8601 format, adjusts the timezone -# to 'America/New_York', and removes the timezone information for consistency. -# -# Args: -# df (pd.DataFrame): The DataFrame containing the timestamp column to convert. -# column (str, optional): The name of the column with Unix timestamps. Defaults to "timestamp". -# -# Returns: -# None: The function modifies the DataFrame in place. -# """ -# df[column] = pd.to_datetime( -# df[column].map(lambda x: unix_to_iso(x, "EST")) -# ) -# df[column] = df[column].dt.tz_convert("America/New_York") -# df[column] = df[column].dt.tz_localize(None) class TradeManager: @@ -57,7 +36,6 @@ def __init__(self): """ self.logger = SystemLogger.get_logger() self.trades: Dict[str, Trade] = {} - # self.logger = logger def update_trades(self, event: TradeEvent) -> None: """ @@ -160,36 +138,7 @@ def _aggregate_trades(self) -> pd.DataFrame: ].sum(), ), ], - # "trade_cost": [ - # ( - # "entry_cost", - # lambda x: x[ - # (df["action"].isin(["LONG", "SHORT"])) - # & (~df["is_rollover"]) - # ].sum(), - # ), - # ( - # "exit_cost", - # lambda x: x[ - # df["action"].isin(["SELL", "COVER"]) - # ].sum(), - # ), - # ], - # "trade_cost": [ - # ( - # "entry_value", - # lambda x: x[ - # df["action"].isin(["LONG", "SHORT"]) - # ].sum(), - # ), - # ( - # "exit_value", - # lambda x: x[ - # df["action"].isin(["SELL", "COVER"]) - # ].sum(), - # ), - # ], - "fees": "sum", # Sum of all fees for each trade group + "fees": "sum", } ) @@ -219,28 +168,6 @@ def _aggregate_trades(self) -> pd.DataFrame: "entry_cost" ].replace(0, np.nan) - # # Calculate percentage gain/loss based on the entry value - # gain_loss = (aggregated["exit_value"] + aggregated["entry_value"]) * -1 - # aggregated["gain/loss"] = gain_loss - # - # # Calculate Profit and Loss (PnL) - # pnl = gain_loss + aggregated["fees"] - # aggregated["pnl"] = pnl - # aggregated["pnl_percentage"] = pnl / aggregated["entry_cost"] - - # aggregated["gain/loss"] = ( - # aggregated["exit_value"] + aggregated["entry_value"] - # ) * -1 - # - # # Calculate Profit and Loss (PnL) - # aggregated["pnl"] = ( - # aggregated["exit_value"] + aggregated["entry_value"] - # ) * -1 + aggregated["fees"] - # - # aggregated["pnl_percentage"] = ( - # aggregated["pnl"] / aggregated["entry_cost"] - # ) # * 100 - # Reset index to make 'trade_id' a column again aggregated.reset_index(inplace=True) @@ -839,7 +766,4 @@ def to_mbinary(self, symbols_map: SymbolMap) -> List[mbinary.Signals]: Returns: List[mbinary.Signals]: A list of signals converted into the `mbinary.Signals` format. """ - # for signal in self.signals: - # self.logger.info(signal.instructions[0].quantity) - # self.logger.info(signal.instructions[0].weight) return [signal.to_mbinary(symbols_map) for signal in self.signals] diff --git a/midastrader/core/adapters/portfolio/base.py b/midastrader/core/adapters/portfolio/base.py index 1abe076..8622469 100644 --- a/midastrader/core/adapters/portfolio/base.py +++ b/midastrader/core/adapters/portfolio/base.py @@ -124,7 +124,6 @@ def __init__(self, symbols_map: SymbolMap, bus: MessageBus): super().__init__(symbols_map, bus) self.server = PortfolioServer.get_instance() self.threads = [] - # self.running = threading.Event() # Subscribe to events self.order_queue = self.bus.subscribe(EventType.ORDER_UPDATE) diff --git a/midastrader/core/adapters/portfolio/managers.py b/midastrader/core/adapters/portfolio/managers.py index ade806b..6a01e47 100644 --- a/midastrader/core/adapters/portfolio/managers.py +++ b/midastrader/core/adapters/portfolio/managers.py @@ -83,9 +83,6 @@ def update_orders(self, order: ActiveOrder) -> None: f"\nORDERS UPDATED: \n{self._ouput_orders()}" ) - # self.notify(EventType.ORDER_UPDATE) # update database - # self.logger.debug(f"\nORDERS UPDATED: \n{self._ouput_orders()}") - def _ouput_orders(self) -> str: """ Generates a formatted string representation of all active orders for logging. @@ -169,7 +166,6 @@ def update_positions(self, instrument_id: int, position: Position) -> None: # Notify listener and log self.pending_positions_update.discard(instrument_id) - # self.logger.debug(f"\nPOSITIONS UPDATED: \n{self._output_positions()}") def _output_positions(self) -> str: """ diff --git a/midastrader/core/engine.py b/midastrader/core/engine.py index ad28ee8..b0f870f 100644 --- a/midastrader/core/engine.py +++ b/midastrader/core/engine.py @@ -12,8 +12,6 @@ PerformanceManager, ) -# from midastrader.core.adapters.risk import RiskHandler - class CoreEngine: def __init__( @@ -85,8 +83,6 @@ def set_risk_model(self): Attaches the risk model to the database observer to track risk updates. """ - return - # if self.config.risk_class: # self.risk_model = RiskHandler(self.config.risk_class) # @@ -95,6 +91,7 @@ def set_risk_model(self): # self.observer, # EventType.RISK_MODEL_UPDATE, # ) + return def set_strategy(self, strategy: BaseStrategy): """ @@ -104,17 +101,9 @@ def set_strategy(self, strategy: BaseStrategy): """ if not isinstance(strategy, BaseStrategy): raise TypeError("Strategy must be an instance of BaseStrategy") - # self._strategy = value self.adapters["strategy"] = strategy - # strategy( - # self.symbols_map, - # self.message_bus, - # ) - - self.adapters["performance_manager"].set_strategy( - self.adapters["strategy"] - ) + self.adapters["performance_manager"].set_strategy(strategy) def start(self): """Start adapters in seperate threads.""" @@ -122,7 +111,7 @@ def start(self): for adapter in self.adapters.values(): thread = threading.Thread(target=adapter.process, daemon=True) - self.threads.append(thread) # Keep track of threads + self.threads.append(thread) thread.start() adapter.is_running.wait() @@ -139,7 +128,7 @@ def _monitor_threads(self): thread.join() # Wait for each thread to finish self.logger.info("CoreEngine threads completed, shutting down ...") - self.completed.set() # Signal that the DataEngine is done + self.completed.set() def wait_until_complete(self): """ @@ -154,23 +143,13 @@ def stop(self): self.adapters["order_book"].shutdown_event.set() self.adapters["order_book"].is_shutdown.wait() - # Shutdown strategy - # self.adapters["strategy"].shutdown_event.set() - # self.adapters["strategy"].is_shutdown.wait() - # Shutdown OrderExecutionManager self.adapters["order_manager"].shutdown_event.set() self.adapters["order_manager"].is_shutdown.wait() - # self.logger.info("Shutting down CoreEngine main components ...") - # self.adapters["performance_manager"].save() - # self.adapters["portfolio_server"].shutdown_event.set() - # self.adapters["performance_manager"].shutdown_event.set() - def save(self): """Start adapters in separate threads.""" # Shutdown performance manaer - # self.adapters["performance_manager"].save() self.adapters["performance_manager"].shutdown_event.set() self.adapters["performance_manager"].is_shutdown.wait() @@ -178,7 +157,6 @@ def save(self): self.adapters["portfolio_server"].shutdown_event.set() self.adapters["portfolio_server"].is_shutdown.wait() + # Shutdown strategy self.adapters["strategy"].shutdown_event.set() self.adapters["strategy"].is_shutdown.wait() - - # self.logger.info("Shutting down CoreEngine components ...") diff --git a/midastrader/data/adaptors/historical/data_client.py b/midastrader/data/adaptors/historical/data_client.py index 1920ade..6c79ea9 100644 --- a/midastrader/data/adaptors/historical/data_client.py +++ b/midastrader/data/adaptors/historical/data_client.py @@ -47,6 +47,7 @@ def __init__(self, symbols_map: SymbolMap, bus: MessageBus, **kwargs): self.next_date = None self.current_date = None self.eod_triggered = False + self.eod_event = threading.Event() # Thread-safe synchronization def process(self): @@ -111,27 +112,6 @@ def get_data(self, parameters: Parameters) -> bool: self.data = data return True - # - # def next_record(self) -> RecordMsg: - # """ - # Retrieves the next record from the data buffer and adjusts its instrument ID. - # - # Returns: - # RecordMsg: The next record with updated instrument ID. - # """ - # record = self.data.replay() - # - # if record is None: - # return RecordMsg() - # - # # Adjust instrument id - # id = record.hd.instrument_id - # ticker = self.data.metadata.mappings.get_ticker(id) - # new_id = self.symbols_map.get_symbol(ticker).instrument_id - # record.instrument_id = new_id - # - # return record - def data_stream(self) -> bool: """ Simulates streaming of market data by processing the next record in the data buffer. @@ -155,7 +135,6 @@ def data_stream(self) -> bool: new_id = symbol.instrument_id record.instrument_id = new_id - # Check for end of trading da if self.mode == Mode.BACKTEST: self._check_eod(record) @@ -179,7 +158,7 @@ def _check_eod(self, record: RecordMsg) -> None: if not self.current_date or date > self.current_date: self.current_date = date self.eod_triggered = False - self.bus.publish(EventType.EOD_PROCESSED, False) # Reset flag + self.bus.publish(EventType.EOD_PROCESSED, False) symbol = self.symbols_map.map[record.instrument_id] diff --git a/midastrader/data/adaptors/ib/client.py b/midastrader/data/adaptors/ib/client.py index 9eed31b..6c4b7f1 100644 --- a/midastrader/data/adaptors/ib/client.py +++ b/midastrader/data/adaptors/ib/client.py @@ -59,7 +59,7 @@ def __init__(self, symbols_map: SymbolMap, bus: MessageBus, **kwargs): self.port = int(kwargs["port"]) self.clientId = kwargs["client_id"] self.account = kwargs["account_id"] - self.lock = threading.Lock() # create a lock + self.lock = threading.Lock() self.validated_contracts = {} diff --git a/midastrader/data/engine.py b/midastrader/data/engine.py index e4fd2a8..9ee8893 100644 --- a/midastrader/data/engine.py +++ b/midastrader/data/engine.py @@ -89,7 +89,7 @@ def start_backtest(self): for adapter in self.adapters.values(): # Start the threads for each vendor thread = threading.Thread(target=adapter.process, daemon=True) - self.threads.append(thread) # Keep track of threads + self.threads.append(thread) thread.start() adapter.is_running.wait() @@ -98,24 +98,22 @@ def start_backtest(self): def start_live(self): """Start adapters in seperate threads.""" + # Start historical data and load all into the buffer and processes buffer historical = self.adapters["historical"] hist_thread = threading.Thread(target=historical.process, daemon=True) - # self.threads.append(thread) hist_thread.start() historical.is_shutdown.wait() - hist_thread.join() # Hold until historical data loaded + hist_thread.join() while not self.message_bus.topics[EventType.DATA].empty(): continue self.logger.info("Historical data fully processed ...") - # self.logger.info("all data processed proceeding to others") for adapter in self.adapters.values(): if not isinstance(adapter, HistoricalAdaptor): - # Start the threads for each vendor thread = threading.Thread(target=adapter.process, daemon=True) - self.threads.append(thread) # Keep track of threads + self.threads.append(thread) thread.start() adapter.is_running.wait() @@ -136,12 +134,10 @@ def wait_until_complete(self): """ Wait for the engine to complete processing. """ - self.completed.wait() # Block until the completed event is set + self.completed.wait() def stop(self): """Start adapters in separate threads.""" for adapter in self.adapters.values(): adapter.shutdown_event.set() adapter.is_shutdown.wait() - - # self.logger.info("Shutting down DataEngine ...") diff --git a/midastrader/engine.py b/midastrader/engine.py index 8010bdb..19053d4 100644 --- a/midastrader/engine.py +++ b/midastrader/engine.py @@ -11,8 +11,6 @@ from midastrader.message_bus import MessageBus from midastrader.core import CoreEngine -# from midastrader.core.risk.risk_handler import RiskHandler - class EngineBuilder: """ @@ -261,8 +259,6 @@ def initialize(self): Raises: RuntimeError: If the system fails to load required components. """ - # self.logger.info(f"Initializing system with mode: {self.mode.value}") - # Risk Model if self.config.risk_class: self.core_engine.set_risk_model() @@ -274,11 +270,8 @@ def initialize(self): ) strategy = strategy_class(self.symbols_map, self.bus) - self.core_engine.set_strategy(strategy) - # self.logger.info("Trading system initialized successfully.") - def start(self): """ Start the main event loop of the trading system. @@ -317,10 +310,8 @@ def _backtest_loop(self): self.data_engine.stop() # Shut down engines in order - # self.logger.info("Saving performance results...") self.core_engine.stop() - # self.logger.info("Liquidating positions...") self.execution_engine.stop() self.core_engine.save() self.core_engine.wait_until_complete() @@ -342,25 +333,9 @@ def _live_loop(self): self.execution_engine.stop() self.core_engine.save() - # self.core_engine.wait_until_complete() self.logger.info("Live completed ...") - # self.broker_client.request_account_summary() - # time.sleep(5) # time for final account summary request-maybe shorten - # self.performance_manager.save() - - # def stop(self): - # """ - # Gracefully shut down the trading engine. - # - # Disconnects live data feeds and performs cleanup operations. - # """ - # self.logger.info("Shutting down the engine.") - # if self.mode == Mode.LIVE: - # self.live_data_client.disconnect() - # self.logger.info("Engine shutdown complete.") - def _signal_handler(self, signum, frame): """ Handle system signals (e.g., SIGINT) to stop the event loop. @@ -371,99 +346,3 @@ def _signal_handler(self, signum, frame): """ self.logger.info("Signal received, preparing to shut down.") self.running = False # Stop the event loop - - -# ========= Delete below ========== - -# def _run_backtest_event_loop(self): -# """Event loop for backtesting.""" -# # Load Initial account data -# self.broker_client.update_account() -# -# while self.hist_data_client.data_stream(): -# continue -# -# # Perform EOD operations for the last trading day -# self.broker_client.liquidate_positions() -# -# # Finalize and save to database -# self.performance_manager.save(self.mode, self.config.output_path) - - -# def connect_execution_engine(self): -# self.execution_engine.connect() - -# def connect_data_engine(self): -# pass - -# def connect_core_engine(self): -# pass - -# -# def setup_live_environment(self): -# """ -# Configure the live trading environment. -# -# Establishes connections to live data feeds, brokers, and validates trading contracts. -# -# Raises: -# RuntimeError: If contract validation fails or live data cannot be loaded. -# """ -# # Set up connections -# self.broker_client.connect() -# -# # Validate Contracts -# self.contract_handler = ContractManager(self.broker_client) -# for symbol in self.symbols_map.symbols: -# if not self.contract_handler.validate_contract(symbol.contract): -# raise RuntimeError(f"{symbol.broker_ticker} invalid contract.") -# -# # Laod Hist Data -# self._load_historical_data() -# -# # Load Live Data -# self.live_data_client.connect() -# self._load_live_data() -# -# def setup_backtest_environment(self): -# """ -# Configure the backtest environment. -# -# Loads historical data needed for simulation and backtesting. -# Raises: -# RuntimeError: If backtest data cannot be loaded. -# """ -# self._load_historical_data() -# -# def _load_live_data(self): -# """ -# Subscribe to live data feeds for the trading symbols. -# -# Raises: -# ValueError: If live data fails to load for any symbol. -# """ -# try: -# for symbol in self.symbols_map.symbols: -# self.live_data_client.get_data( -# data_type=self.parameters.data_type, -# contract=symbol.contract, -# ) -# except ValueError: -# raise ValueError(f"Error loading live data for {symbol.ticker}.") -# -# def _load_historical_data(self): -# """ -# Load historical data for backtesting. -# -# Raises: -# RuntimeError: If the backtest data fails to load. -# """ -# response = self.hist_data_client.get_data( -# self.parameters, -# self.config.data_file, -# ) -# -# if response: -# self.logger.info("Backtest data loaded.") -# else: -# raise RuntimeError("Backtest data did not load.") diff --git a/midastrader/execution/adaptors/base.py b/midastrader/execution/adaptors/base.py index a8dec27..10db913 100644 --- a/midastrader/execution/adaptors/base.py +++ b/midastrader/execution/adaptors/base.py @@ -17,8 +17,6 @@ def __init__(self, symbols_map: SymbolMap, bus: MessageBus): self.is_running = threading.Event() self.is_shutdown = threading.Event() - # self.order_queue = self.bus.subscribe(EventType.ORDER) - @abstractmethod def process(self) -> None: pass diff --git a/midastrader/execution/adaptors/dummy/broker_client.py b/midastrader/execution/adaptors/dummy/broker_client.py index a47caf1..f44d7ac 100644 --- a/midastrader/execution/adaptors/dummy/broker_client.py +++ b/midastrader/execution/adaptors/dummy/broker_client.py @@ -35,9 +35,7 @@ def __init__(self, symbols_map: SymbolMap, bus: MessageBus, capital: int): # Subscriptions self.order_queue = self.bus.subscribe(EventType.ORDER) - def process(self): - # try: - # Start sub-threads + def process(self) -> None: self.threads.append( threading.Thread(target=self.process_orders, daemon=True) ) @@ -59,20 +57,6 @@ def process(self): self.logger.info("Shutting down DummyBrokerAdaptor ...") self.is_shutdown.set() - # finally: - # # Always call cleanup on exit - # self.cleanup() - - # def _monitor_threads(self): - # """ - # Monitor all adapter threads and signal when all are done. - # """ - # for thread in self.threads: - # thread.join() # Wait for each thread to finish - - # self.logger.info("All adapter threads have completed.") - # self.completed.set() # Signal that the DataEngine is done - def process_orders(self) -> None: while not self.shutdown_event.is_set(): try: @@ -95,164 +79,3 @@ def cleanup(self) -> None: self.broker.shutdown_event.set() self.broker.is_shutdown.wait() - # self.is_shutdown.set() - - # self.broker.liquidate_positions() - # self.logger.info("Liquidated positions.") - - # def handle_order(self, event: OrderEvent) -> None: - # """ - # Processes and executes an order based on given details. - # - # Args: - # event (OrderEvent): The event containing order details for execution. - # """ - # self.logger.debug(event) - # - # timestamp = event.timestamp - # trade_id = event.trade_id - # leg_id = event.leg_id - # action = event.action - # contract = event.contract - # order = event.order - # - # self.broker.placeOrder( - # timestamp, - # trade_id, - # leg_id, - # action, - # contract, - # order, - # ) - - # def handle_execution(self, event: ExecutionEvent) -> None: - # """ - # Responds to execution events and updates system states such as positions and account details. - # - # Args: - # event (ExecutionEvent): The event detailing the trade execution. - # """ - # self.logger.debug(event) - # - # # Update trades look with current event - # contract = event.contract - # self.update_trades(contract) - # - # # If last trade placed ex. only trade or last leg of a trade update data - # trade_details = event.trade_details - # if trade_details == self.broker.last_trade: - # self.update_positions() - # self.update_account() - # self.update_equity_value() - # - # def handle_eod(self, event: EODEvent) -> None: - # """ - # Performs end-of-day updates, including marking positions to market values and checking margin requirements. - # - # Args: - # event (EODEvent): The end-of-day event. - # """ - # self.update_account() - - # def handle_event( - # self, - # subject: Subject, - # event_type: EventType, - # event, - # ) -> None: - # """ - # Handles events from the event queue and initiates appropriate processing. - # - # Args: - # subject (Subject): The subject sending the event. - # event_type (EventType): The type of event being processed. - # event: The event object containing relevant details. - # - # Raises: - # ValueError: If the event is not of the expected type for the given event_type. - # """ - # if event_type == EventType.ORDER_CREATED: - # if not isinstance(event, OrderEvent): - # raise ValueError( - # "'event' must be of type OrderEvent instance." - # ) - # self.handle_order(event) - # - # elif event_type == EventType.TRADE_EXECUTED: - # if not isinstance(event, ExecutionEvent): - # raise ValueError( - # "'event' must be of type ExecutionEvent instance." - # ) - # self.handle_execution(event) - # - # elif event_type == EventType.EOD_EVENT: - # if not isinstance(event, EODEvent): - # raise ValueError("'event' must be of type EODEvent.") - # - # self.handle_eod(event) - # - # elif event_type == EventType.ORDER_BOOK: - # if not isinstance(event, MarketEvent): - # raise ValueError("'event' must be of MarketEvent.") - # - # self.update_equity_value() - - # def update_positions(self) -> None: - # """ - # Fetches and updates the positions from the broker and notifies observers of position updates. - # - # This method retrieves the current positions held within the simulated broker and updates the portfolio records - # to maintain consistency with the simulated market conditions. - # """ - # positions = self.broker.return_positions() - # for contract, position_data in positions.items(): - # id = self.symbols_map.get_id(contract.symbol) - # self.notify(EventType.POSITION_UPDATE, id, position_data) - # - # def update_trades(self, contract: Contract = None) -> None: - # """ - # Updates trade details either for a specific contract or for all recent trades. - # - # Args: - # contract (Contract, optional): Specific contract for which trades need to be updated. If None, updates all recent trades. - # """ - # if contract: - # trade = self.broker.return_executed_trades(contract) - # trade_id = f"{trade.trade_id}{trade.leg_id}{trade.action}" - # self.notify(EventType.TRADE_UPDATE, trade_id, trade) - # else: - # last_trades = self.broker.return_executed_trades() - # for contract, trade in last_trades.items(): - # trade_id = f"{trade.trade_id}{trade.leg_id}{trade.action}" - # self.notify(EventType.TRADE_UPDATE, trade_id, trade) - # - # def update_account(self) -> None: - # """ - # Retrieves and updates the account details from the broker. - # - # This method synchronizes the account state with the broker's state and notifies observers of the update. - # """ - # account = self.broker.return_account() - # self.notify(EventType.ACCOUNT_UPDATE, account) - # - # def update_equity_value(self) -> None: - # """ - # Updates the equity value of the account based on the latest market valuations. - # - # This method ensures the current market value of holdings is accurately reflected. - # """ - # self.broker._update_account() - # equity = self.broker.return_equity_value() - # self.notify(EventType.EQUITY_VALUE_UPDATE, equity) - # - # def liquidate_positions(self) -> None: - # """ - # Handles the liquidation of all positions, typically used at the end of a trading period or in response to a margin call. - # - # This method ensures all positions are closed out and the account is updated. - # """ - # self.update_positions() - # self.update_account() - # self.update_equity_value() - # self.broker.liquidate_positions() - # self.update_trades() diff --git a/midastrader/execution/adaptors/dummy/dummy_broker.py b/midastrader/execution/adaptors/dummy/dummy_broker.py index 89030b2..dcbbdbe 100644 --- a/midastrader/execution/adaptors/dummy/dummy_broker.py +++ b/midastrader/execution/adaptors/dummy/dummy_broker.py @@ -66,9 +66,6 @@ def __init__( self.init_margin_required: Dict[int, float] = {} self.maintenance_margin_required: Dict[int, float] = {} self.liquidation_value: Dict[int, float] = {} - # self.unrealized_pnl: Dict[str, float] = {"account": 0} - # self.margin_required: Dict[str, float] = {"account": 0} - # self.liquidation_value: Dict[str, float] = {"account": 0} self.last_trades: Dict[int, Trade] = {} self.account = Account( timestamp=0, @@ -276,9 +273,8 @@ def _handle_trade(self, event: OrderEvent) -> None: """ orders = event.orders - # action = event.action timestamp = event.timestamp - # signal_id = event.signal_id + for order in orders: symbol = self.symbols_map.get_symbol_by_id(order.instrument_id) if symbol: @@ -304,7 +300,6 @@ def _handle_trade(self, event: OrderEvent) -> None: # Create Execution Events self._update_trades( timestamp, - # trade_id, order.signal_id, symbol, quantity, @@ -374,8 +369,6 @@ def _update_account(self) -> None: margin requirements, and net liquidation value. """ for instrument_id, position in self.positions.items(): - # symbol = self.symbols_map.get_symbol(contract.symbol) - # if symbol: mkt_data = self.order_book.retrieve(instrument_id) position.market_price = mkt_data.pretty_price impact = position.position_impact() @@ -410,7 +403,6 @@ def _update_trades( self, timestamp: int, signal_id: int, - # leg_id: int, symbol: Symbol, quantity: float, action: Action, @@ -456,7 +448,6 @@ def _update_trades( # Keep for liquidation if needed at the end self.last_trades[symbol.instrument_id] = trade - # trade_id_str = f"{trade_id}{leg_id}{action}" self.bus.publish( EventType.TRADE_UPDATE, TradeEvent(str(self.trade_id), trade), @@ -526,20 +517,11 @@ def liquidate_positions(self) -> None: is_rollover=False, ) - # self.last_trades[contract] = trade - # id = f"{trade.trade_id}{trade.leg_id}{trade.action}" self.bus.publish( EventType.TRADE_UPDATE, TradeEvent(str(self.trade_id), trade), ) - # # Output liquidation - # string = "Positions liquidate:" - # for contract, trade in self.last_trades.items(): - # string += f"\n {contract} : {trade}" - # - # self.logger.info(f"\n{string}") - def return_positions(self) -> None: """ Return the current positions held by the broker. @@ -586,5 +568,3 @@ def return_equity_value(self) -> None: EquityDetails: Details of the broker's equity value. """ self.bus.publish(EventType.EQUITY_UPDATE, self.account.equity_value()) - - # return self.account.equity_value() diff --git a/midastrader/execution/adaptors/ib/wrapper.py b/midastrader/execution/adaptors/ib/wrapper.py index cab4d7d..d4f2a4c 100644 --- a/midastrader/execution/adaptors/ib/wrapper.py +++ b/midastrader/execution/adaptors/ib/wrapper.py @@ -218,7 +218,6 @@ def process_account_updates(self) -> None: # Updating portfolio server outside the lock to avoid deadlocks self.bus.publish(EventType.ACCOUNT_UPDATE, account_info_copy) self.bus.publish(EventType.ACCOUNT_UPDATE_LOG, account_info_copy) - # self.logger.debug("Processed buffered account updates.") #### wrapper function for reqAccountUpdates. Get position information def updatePortfolio( diff --git a/midastrader/execution/engine.py b/midastrader/execution/engine.py index cbd8d56..101eea4 100644 --- a/midastrader/execution/engine.py +++ b/midastrader/execution/engine.py @@ -89,7 +89,7 @@ def start(self): for adapter in self.adapters: # Start the threads for each vendor thread = threading.Thread(target=adapter.process, daemon=True) - self.threads.append(thread) # Keep track of threads + self.threads.append(thread) thread.start() adapter.is_running.wait() @@ -103,18 +103,18 @@ def _monitor_threads(self): Monitor all adapter threads and signal when all are done. """ for thread in self.threads: - thread.join() # Wait for each thread to finish + thread.join() self.logger.info( "ExecutionEngine threads completed, shutting down ..." ) - self.completed.set() # Signal that the DataEngine is done + self.completed.set() def wait_until_complete(self): """ Wait for the engine to complete processing. """ - self.completed.wait() # Block until the completed event is set + self.completed.wait() def stop(self): """Start adapters in separate threads.""" @@ -122,5 +122,3 @@ def stop(self): for adapter in self.adapters: adapter.shutdown_event.set() adapter.is_shutdown.wait() - - # self.logger.info("Shutting down ExecutionEngine ...") diff --git a/midastrader/structs/active_orders.py b/midastrader/structs/active_orders.py index aee0996..39376ee 100644 --- a/midastrader/structs/active_orders.py +++ b/midastrader/structs/active_orders.py @@ -167,18 +167,11 @@ def pretty_print(self, indent: str = "") -> str: print(active_order.pretty_print(indent=" ")) """ return ( - # f"{indent}permId: {self.permId}\n" - # f"{indent}clientId: {self.clientId}\n" f"{indent}orderId: {self.orderId}\n" - # f"{indent}parentId: {self.parentId}\n" - # f"{indent}account: {self.account}\n" f"{indent}instrument: {self.instrument}\n" - # f"{indent}secType: {self.secType}\n" - # f"{indent}exchange: {self.exchange}\n" f"{indent}action: {self.action}\n" f"{indent}orderType: {self.orderType}\n" f"{indent}totalQty: {self.totalQty}\n" - # f"{indent}cashQty: {self.cashQty}\n" f"{indent}lmtPrice: {self.lmtPrice}\n" f"{indent}auxPrice: {self.auxPrice}\n" f"{indent}status: {self.status}\n" @@ -187,5 +180,12 @@ def pretty_print(self, indent: str = "") -> str: f"{indent}avgFillPrice: {self.avgFillPrice}\n" f"{indent}lastFillPrice: {self.lastFillPrice}\n" f"{indent}whyHeld: {self.whyHeld}\n" + # f"{indent}cashQty: {self.cashQty}\n" + # f"{indent}permId: {self.permId}\n" + # f"{indent}clientId: {self.clientId}\n" + # f"{indent}parentId: {self.parentId}\n" + # f"{indent}account: {self.account}\n" + # f"{indent}secType: {self.secType}\n" + # f"{indent}exchange: {self.exchange}\n" # f"{indent}mktCapPrice: {self.mktCapPrice}\n" ) diff --git a/midastrader/structs/events/market_event.py b/midastrader/structs/events/market_event.py index 2c91eca..90dcdd9 100644 --- a/midastrader/structs/events/market_event.py +++ b/midastrader/structs/events/market_event.py @@ -1,5 +1,4 @@ -from typing import Union -from mbinary import OhlcvMsg, BboMsg, RecordMsg +from mbinary import RecordMsg from dataclasses import dataclass, field from midastrader.structs.events.base import SystemEvent @@ -21,7 +20,7 @@ class MarketEvent(SystemEvent): """ timestamp: int - data: RecordMsg # Union[OhlcvMsg, BboMsg] + data: RecordMsg type: str = field(init=False, default="MARKET_DATA") def __post_init__(self): diff --git a/midastrader/structs/signal.py b/midastrader/structs/signal.py index 8992c79..e29c569 100644 --- a/midastrader/structs/signal.py +++ b/midastrader/structs/signal.py @@ -35,8 +35,6 @@ class SignalInstruction: order_type: OrderType action: Action signal_id: int - # trade_id: int = 0 - # leg_id: int = 0 weight: float = 0.0 quantity: float = 0.0 limit_price: Optional[float] = 0.0 @@ -61,8 +59,6 @@ def __post_init__(self): raise TypeError("'action' field must be of type Action enum.") if not isinstance(self.signal_id, int): raise TypeError("'signal_id' field must of type int.") - # if not isinstance(self.leg_id, int): - # raise TypeError("'leg_id' field must be of type int.") if not isinstance(self.quantity, float): raise TypeError("'quantity' field must be of type float.") if self.order_type == OrderType.LIMIT and not isinstance( @@ -81,8 +77,6 @@ def __post_init__(self): # Value Constraint if self.signal_id <= 0: raise ValueError("'signal_id' field must be greater than zero.") - # if self.leg_id <= 0: - # raise ValueError("'leg_id' field must must be greater than zero.") if self.limit_price and self.limit_price <= 0: raise ValueError( "'limit_price' field must must be greater than zero." @@ -104,7 +98,6 @@ def to_dict(self): "order_type": self.order_type.value, "action": self.action.value, "signal_id": self.signal_id, - # "leg_id": self.leg_id, "weight": round(self.weight, 4), "quantity": self.quantity, "limit_price": (self.limit_price if self.limit_price else ""), @@ -180,7 +173,6 @@ def __str__(self) -> str: f"Instrument: {self.instrument}, " f"Order Type: {self.order_type.name}, " f"Action: {self.action.name}, " - # f"Trade ID: {self.trade_id}, " f"Signal ID: {self.signal_id}, " f"Weight: {self.weight}, " f"Quantity: {self.quantity}, " diff --git a/midastrader/structs/symbol.py b/midastrader/structs/symbol.py index 40f5788..e69c34a 100644 --- a/midastrader/structs/symbol.py +++ b/midastrader/structs/symbol.py @@ -1,16 +1,10 @@ -# noqa: C901 -# import pandas as pd -# import pandas_market_calendars as mcal -# from datetime import timedelta from enum import Enum from typing import Optional, Dict, List from ibapi.contract import Contract from abc import ABC, abstractmethod -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import time, datetime -# from pandas_market_calendars.market_calendar import DEFAULT - from midastrader.structs.orders import Action from midastrader.utils.unix import unix_to_iso @@ -329,9 +323,8 @@ class Symbol(ABC): price_multiplier: float trading_sessions: TradingSession slippage_factor: float - # contract: Contract = field(init=False) - def __post_init__(self): # noqa: C901 + def __post_init__(self): """ Validates the input attributes and enforces constraints on numeric fields. @@ -386,40 +379,6 @@ def ib_contract(self) -> Contract: contract.multiplier = str(self.quantity_multiplier) return contract - # def to_contract_data(self) -> dict: - # """ - # Constructs a dictionary containing key contract details for IB API. - # - # Returns: - # Dict[str, str]: A dictionary with details such as symbol, security type, currency, exchange, and multiplier. - # """ - # return { - # "symbol": self.broker_ticker, - # "secType": self.security_type.value, - # "currency": self.currency.value, - # "exchange": self.exchange.value, - # "multiplier": self.quantity_multiplier, - # } - # - # def to_contract(self) -> Contract: - # """ - # Creates an IB API `Contract` object using the symbol's details. - # - # Returns: - # Contract: A fully initialized `Contract` object. - # - # Raises: - # Exception: If an error occurs during contract creation. - # """ - # try: - # contract_data = self.to_contract_data() - # contract = Contract() - # for key, value in contract_data.items(): - # setattr(contract, key, value) - # return contract - # except Exception as e: - # raise Exception(f"Unexpected error during Contract creation: {e}") - def to_dict(self) -> dict: """ Converts the symbol into a dictionary representation. @@ -499,7 +458,6 @@ def in_day_session(self, timestamp_ns: int) -> bool: dt = datetime.fromisoformat( unix_to_iso(timestamp_ns, tz_info="America/New_York") ) - # time = dt.time() return ( self.trading_sessions.day_open @@ -591,22 +549,9 @@ def __post_init__(self): if not isinstance(self.shares_outstanding, int): raise TypeError("'shares_outstanding' must be of type int.") - # Create contract object - # self.contract = self.to_contract() - def ib_contract(self) -> Contract: return super().ib_contract() - # def to_contract_data(self) -> dict: - # """ - # Constructs a dictionary containing key contract details for the equity. - # - # Returns: - # dict: Contract data including broker ticker, security type, currency, exchange, and multiplier. - # """ - # data = super().to_contract_data() - # return data - def to_dict(self) -> dict: """ Converts the Equity object to a dictionary representation. @@ -741,9 +686,6 @@ def __post_init__(self): if self.tick_size <= 0: raise ValueError("'tickSize' must be greater than zero.") - # Create contract object - # self.contract = self.to_contract() - def ib_contract(self) -> Contract: contract = super().ib_contract() contract.lastTradeDateOrContractMonth = ( @@ -752,20 +694,6 @@ def ib_contract(self) -> Contract: return contract - # - # def to_contract_data(self) -> dict: - # """ - # Generates contract data required for trading or IB API. - # - # Returns: - # dict: A dictionary with contract-specific attributes including last trade date. - # """ - # data = super().to_contract_data() - # data["lastTradeDateOrContractMonth"] = ( - # self.lastTradeDateOrContractMonth - # ) - # return data - def to_dict(self) -> dict: """ Converts the Future object to a dictionary representation. @@ -1080,9 +1008,6 @@ def __post_init__(self): if self.strike_price <= 0: raise ValueError("'strike' must be greater than zero.") - # Create contract object - # self.contract = self.to_contract() - def ib_contract(self) -> Contract: contract = super().ib_contract() contract.lastTradeDateOrContractMonth = ( @@ -1090,26 +1015,9 @@ def ib_contract(self) -> Contract: ) contract.right = self.option_type.value contract.strike = self.strike_price - # data["strike"] = self.strike_price return contract - # def to_contract_data(self) -> dict: - # """ - # Constructs a dictionary representation for creating an IBKR Contract object. - # - # Returns: - # dict: A dictionary containing option-specific contract details such as strike price, - # expiration date, and option type, in addition to base contract details. - # """ - # data = super().to_contract_data() - # data["lastTradeDateOrContractMonth"] = ( - # self.lastTradeDateOrContractMonth - # ) - # data["right"] = self.option_type.value - # data["strike"] = self.strike_price - # return data - def to_dict(self) -> dict: """ Constructs a dictionary representation of the Option object. diff --git a/midastrader/structs/trade.py b/midastrader/structs/trade.py index 9842e13..16f7eed 100644 --- a/midastrader/structs/trade.py +++ b/midastrader/structs/trade.py @@ -4,8 +4,6 @@ from midastrader.structs.symbol import SecurityType -# from midastrader.structs.constants import PRICE_SCALE - @dataclass class Trade: diff --git a/midastrader/utils/date_adjust.py b/midastrader/utils/date_adjust.py deleted file mode 100644 index 657d433..0000000 --- a/midastrader/utils/date_adjust.py +++ /dev/null @@ -1,79 +0,0 @@ -# import pandas as pd -# from pandas.tseries.offsets import CustomBusinessDay -# from pandas.tseries.holiday import USFederalHolidayCalendar -# -# -# def adjust_to_business_time(df, frequency="daily"): -# """ -# Adjust a DataFrame's datetime index to align with business time frequencies: 'daily', 'hourly', or 'minute'. -# -# This function reindexes the DataFrame to align its data with U.S. business days and optionally -# restricts timestamps to business hours for hourly or minute-level frequencies. -# -# Args: -# df (pd.DataFrame): The DataFrame to be adjusted. Its index must be a DatetimeIndex. -# frequency (str, optional): The target frequency for adjustment. -# Options are: -# - 'daily': Aligns to business days. -# - 'hourly': Aligns to hours within business days. -# - 'minute': Aligns to minutes within business days and business hours (9:00 AM to 5:00 PM). -# Defaults to 'daily'. -# -# Returns: -# pd.DataFrame: A DataFrame reindexed to the specified frequency, with missing values forward-filled. -# -# Raises: -# ValueError: If an unsupported frequency is specified. -# -# Example: -# >>> import pandas as pd -# >>> from pandas.tseries.holiday import USFederalHolidayCalendar -# >>> from pandas.tseries.offsets import CustomBusinessDay -# >>> data = {'value': [1, 2, 3]} -# >>> index = pd.to_datetime(['2023-01-02', '2023-01-03', '2023-01-05']) -# >>> df = pd.DataFrame(data, index=index) -# >>> adjust_to_business_time(df, frequency='daily') -# """ -# # Define the business day calendar -# us_business_day = CustomBusinessDay(calendar=USFederalHolidayCalendar()) -# -# # Determine the start and end dates from the DataFrame -# start_date = df.index.min() -# end_date = df.index.max() -# -# # Generate the appropriate date range based on the specified frequency -# if frequency == "daily": -# # Daily frequency, only business days -# target_range = pd.date_range( -# start_date, end_date, freq=us_business_day -# ) -# elif frequency == "hourly": -# # Generate hourly timestamps within business days -# business_days = pd.date_range( -# start_date, end_date, freq=us_business_day -# ) -# target_range = pd.date_range(start_date, end_date, freq="H") -# target_range = target_range[target_range.date.isin(business_days.date)] -# elif frequency == "minute": -# # Generate minute timestamps within business days, assuming 9:00 AM to 5:00 PM as business hours -# business_days = pd.date_range( -# start_date, end_date, freq=us_business_day -# ) -# target_range = pd.date_range( -# start_date, end_date, freq="T" -# ) # 1-minute frequency -# # Filter for business hours; adjust 'hour >= 9 & hour < 17' as needed for specific business hours -# target_range = target_range[ -# (target_range.date.isin(business_days.date)) -# & (target_range.hour >= 9) -# & (target_range.hour < 17) -# ] -# else: -# raise ValueError( -# "Unsupported frequency specified. Choose 'daily', 'hourly', or 'minute'." -# ) -# -# # Reindex the DataFrame to match the target range, forward-filling missing values -# adjusted_df = df.reindex(target_range).ffill() -# -# return adjusted_df