From de8e8c8b08f4156970a93c169e9665478ea7f2f0 Mon Sep 17 00:00:00 2001 From: Martin Pelteshki <39273158+Al4ise@users.noreply.github.com> Date: Fri, 1 Nov 2024 15:07:39 +0200 Subject: [PATCH] revert backtesting optimizations until issues are resolved --- lumibot/backtesting/backtesting_broker.py | 68 ++++--------- lumibot/entities/data.py | 115 +++++++++++++++------- lumibot/entities/dataline.py | 5 +- lumibot/strategies/strategy_executor.py | 5 +- lumibot/tools/polygon_helper.py | 87 +++++----------- 5 files changed, 127 insertions(+), 153 deletions(-) diff --git a/lumibot/backtesting/backtesting_broker.py b/lumibot/backtesting/backtesting_broker.py index d509e32b6..82a6a8b37 100644 --- a/lumibot/backtesting/backtesting_broker.py +++ b/lumibot/backtesting/backtesting_broker.py @@ -117,34 +117,20 @@ def should_continue(self): def is_market_open(self): """Return True if market is open else false""" now = self.datetime - increased_idx = False - if not hasattr(self, '_cached_idx_right'): - idx = self._trading_days.index.searchsorted(now, side='right') - self._cached_idx_right = idx - else: - if now > self._previous_market_close_right: - self._cached_idx_right += 1 - increased_idx = True - - idx = self._cached_idx_right - - if hasattr(self, '_previous_market_close_right') and not increased_idx: - market_close = self._previous_market_close_right - market_open = self._previous_market_open_right - else: - if idx >= len(self._trading_days): - logging.error("Cannot predict future") - return 0 + # As the index is sorted, use searchsorted to find the relevant day + idx = self._trading_days.index.searchsorted(now, side='right') - # The index of the trading_day is used as the market close time - market_close = self._trading_days.index[idx] + # Check that the index is not out of bounds + if idx >= len(self._trading_days): + logging.error("Cannot predict future") + return False - # Retrieve market open time using .at since idx is a valid datetime index - market_open = self._trading_days.at[market_close, 'market_open'] + # The index of the trading_day is used as the market close time + market_close = self._trading_days.index[idx] - self._previous_market_open_right = market_open - self._previous_market_close_right = market_close + # Retrieve market open time using .at since idx is a valid datetime index + market_open = self._trading_days.at[market_close, 'market_open'] # Check if 'now' is within the trading hours of the located day return market_open <= now < market_close @@ -185,36 +171,18 @@ def get_time_to_open(self): def get_time_to_close(self): """Return the remaining time for the market to close in seconds""" now = self.datetime - increased_idx = False # Use searchsorted for efficient searching and reduce unnecessary DataFrame access - if not hasattr(self, '_cached_idx'): - idx = self._trading_days.index.searchsorted(now, side='left') - self._cached_idx = idx - else: - if now > self._previous_market_close: - self._cached_idx += 1 - increased_idx = True - - idx = self._cached_idx - - if hasattr(self, '_previous_market_close') and not increased_idx: - # Skip computation if done already - market_close = self._previous_market_close - market_open = self._previous_market_open - - else: - if idx >= len(self._trading_days): - logging.error("Cannot predict future") - return 0 + idx = self._trading_days.index.searchsorted(now, side='left') - # Directly access the data needed using more efficient methods - market_close_time = self._trading_days.index[idx] - market_open = self._trading_days.at[market_close_time, 'market_open'] - market_close = market_close_time # Assuming this is a scalar value directly from the index + if idx >= len(self._trading_days): + logging.error("Cannot predict future") + return 0 - self._previous_market_open = market_open - self._previous_market_close = market_close + # Directly access the data needed using more efficient methods + market_close_time = self._trading_days.index[idx] + market_open = self._trading_days.at[market_close_time, 'market_open'] + market_close = market_close_time # Assuming this is a scalar value directly from the index if now < market_open: return None diff --git a/lumibot/entities/data.py b/lumibot/entities/data.py index dcf866dcf..b644e9ad8 100644 --- a/lumibot/entities/data.py +++ b/lumibot/entities/data.py @@ -8,7 +8,7 @@ from .asset import Asset from .dataline import Dataline -from functools import lru_cache + class Data: """Input and manage Pandas dataframes for backtesting. @@ -278,16 +278,19 @@ def repair_times_and_fill(self, idx): idx = idx[(idx >= self.datetime_start) & (idx <= self.datetime_end)] # After all time series merged, adjust the local dataframe to reindex and fill nan's. - self.df = self.df.reindex(idx, method="ffill") - self.df.loc[self.df["volume"].isna(), "volume"] = 0 - self.df.loc[:, ~self.df.columns.isin(["open", "high", "low"])] = self.df.loc[ - :, ~self.df.columns.isin(["open", "high", "low"]) + df = self.df.reindex(idx, method="ffill") + df.loc[df["volume"].isna(), "volume"] = 0 + df.loc[:, ~df.columns.isin(["open", "high", "low"])] = df.loc[ + :, ~df.columns.isin(["open", "high", "low"]) ].ffill() for col in ["open", "high", "low"]: - self.df.loc[self.df[col].isna(), col] = self.df.loc[self.df[col].isna(), "close"] + df.loc[df[col].isna(), col] = df.loc[df[col].isna(), "close"] + + self.df = df - iter_index = pd.Series(self.df.index) + iter_index = pd.Series(df.index) self.iter_index = pd.Series(iter_index.index, index=iter_index) + self.iter_index_dict = self.iter_index.to_dict() self.datalines = dict() self.to_datalines() @@ -298,7 +301,8 @@ def to_datalines(self): "datetime": Dataline( self.asset, "datetime", - self.df.index + self.df.index.to_numpy(), + self.df.index.dtype, ) } ) @@ -310,21 +314,67 @@ def to_datalines(self): column: Dataline( self.asset, column, - self.df[column] - ) - } - ) + self.df[column].to_numpy(), + self.df[column].dtype, + ) + } + ) setattr(self, column, self.datalines[column].dataline) - + + def get_iter_count(self, dt): + # Return the index location for a given datetime. + + # Check if the date is in the dataframe, if not then get the last + # known data (this speeds up the process) + i = None + + # Check if we have the iter_index_dict, if not then repair the times and fill (which will create the iter_index_dict) + if getattr(self, "iter_index_dict", None) is None: + self.repair_times_and_fill(self.df.index) + + # Search for dt in self.iter_index_dict + if dt in self.iter_index_dict: + i = self.iter_index_dict[dt] + else: + # If not found, get the last known data + i = self.iter_index.asof(dt) + + return i + def check_data(func): # Validates if the provided date, length, timeshift, and timestep # will return data. Runs function if data, returns None if no data. def checker(self, *args, **kwargs): + if type(kwargs.get("length", 1)) not in [int, float]: + raise TypeError(f"Length must be an integer. {type(kwargs.get('length', 1))} was provided.") + + dt = args[0] + + # Check if the iter date is outside of this data's date range. + if dt < self.datetime_start: + raise ValueError( + f"The date you are looking for ({dt}) for ({self.asset}) is outside of the data's date range ({self.datetime_start} to {self.datetime_end}). This could be because the data for this asset does not exist for the date you are looking for, or something else." + ) - # Search for dt in self.iter_index - if getattr(self, "iter_index", None) is None: + # Search for dt in self.iter_index_dict + if getattr(self, "iter_index_dict", None) is None: self.repair_times_and_fill(self.df.index) + if dt in self.iter_index_dict: + i = self.iter_index_dict[dt] + else: + # If not found, get the last known data + i = self.iter_index.asof(dt) + + length = kwargs.get("length", 1) + timeshift = kwargs.get("timeshift", 0) + data_index = i + 1 - length - timeshift + is_data = data_index >= 0 + if not is_data: + # Log a warning + logging.warning( + f"The date you are looking for ({dt}) is outside of the data's date range ({self.datetime_start} to {self.datetime_end}) after accounting for a length of {kwargs.get('length', 1)} and a timeshift of {kwargs.get('timeshift', 0)}. Keep in mind that the length you are requesting must also be available in your data, in this case we are {data_index} rows away from the data you need." + ) res = func(self, *args, **kwargs) # print(f"Results last price: {res}") @@ -332,11 +382,6 @@ def checker(self, *args, **kwargs): return checker - @lru_cache(maxsize=32) - @check_data - def get_iter_count(self, dt): - return self.iter_index.index.searchsorted(dt, side='right') - 1 - @check_data def get_last_price(self, dt, length=1, timeshift=0): """Returns the last known price of the data. @@ -357,8 +402,8 @@ def get_last_price(self, dt, length=1, timeshift=0): float """ iter_count = self.get_iter_count(dt) - open_price = self.datalines["open"].dataline.iloc[iter_count] - close_price = self.datalines["close"].dataline.iloc[iter_count] + open_price = self.datalines["open"].dataline[iter_count] + close_price = self.datalines["close"].dataline[iter_count] price = close_price if dt > self.datalines["datetime"].dataline[iter_count] else open_price return price @@ -382,19 +427,19 @@ def get_quote(self, dt, length=1, timeshift=0): dict """ iter_count = self.get_iter_count(dt) - open = round(self.datalines["open"].dataline.iloc[iter_count], 2) - high = round(self.datalines["high"].dataline.iloc[iter_count], 2) - low = round(self.datalines["low"].dataline.iloc[iter_count], 2) - close = round(self.datalines["close"].dataline.iloc[iter_count], 2) - bid = round(self.datalines["bid"].dataline.iloc[iter_count], 2) - ask = round(self.datalines["ask"].dataline.iloc[iter_count], 2) - volume = round(self.datalines["volume"].dataline.iloc[iter_count], 0) - bid_size = round(self.datalines["bid_size"].dataline.iloc[iter_count], 0) - bid_condition = round(self.datalines["bid_condition"].dataline.iloc[iter_count], 0) - bid_exchange = round(self.datalines["bid_exchange"].dataline.iloc[iter_count], 0) - ask_size = round(self.datalines["ask_size"].dataline.iloc[iter_count], 0) - ask_condition = round(self.datalines["ask_condition"].dataline.iloc[iter_count], 0) - ask_exchange = round(self.datalines["ask_exchange"].dataline.iloc[iter_count], 0) + open = round(self.datalines["open"].dataline[iter_count], 2) + high = round(self.datalines["high"].dataline[iter_count], 2) + low = round(self.datalines["low"].dataline[iter_count], 2) + close = round(self.datalines["close"].dataline[iter_count], 2) + bid = round(self.datalines["bid"].dataline[iter_count], 2) + ask = round(self.datalines["ask"].dataline[iter_count], 2) + volume = round(self.datalines["volume"].dataline[iter_count], 0) + bid_size = round(self.datalines["bid_size"].dataline[iter_count], 0) + bid_condition = round(self.datalines["bid_condition"].dataline[iter_count], 0) + bid_exchange = round(self.datalines["bid_exchange"].dataline[iter_count], 0) + ask_size = round(self.datalines["ask_size"].dataline[iter_count], 0) + ask_condition = round(self.datalines["ask_condition"].dataline[iter_count], 0) + ask_exchange = round(self.datalines["ask_exchange"].dataline[iter_count], 0) return { "open": open, diff --git a/lumibot/entities/dataline.py b/lumibot/entities/dataline.py index b164dffc3..9b70514f3 100644 --- a/lumibot/entities/dataline.py +++ b/lumibot/entities/dataline.py @@ -1,5 +1,6 @@ class Dataline: - def __init__(self, asset, name, dataline): + def __init__(self, asset, name, dataline, dtype): self.asset = asset self.name = name - self.dataline = dataline \ No newline at end of file + self.dataline = dataline + self.dtype = dtype diff --git a/lumibot/strategies/strategy_executor.py b/lumibot/strategies/strategy_executor.py index 4e1905912..fed49f902 100644 --- a/lumibot/strategies/strategy_executor.py +++ b/lumibot/strategies/strategy_executor.py @@ -1032,10 +1032,7 @@ def run(self): market = self.broker.market # Get the trading days based on the market that the strategy is trading on - if self.strategy.is_backtesting: - self.broker._trading_days = get_trading_days(market=market, start_date=self.strategy._backtesting_start) - else: - self.broker._trading_days = get_trading_days(market) + self.broker._trading_days = get_trading_days(market) # Sort the trading days by market close time so that we can search them faster self.broker._trading_days.sort_values('market_close', inplace=True) # Ensure sorted order diff --git a/lumibot/tools/polygon_helper.py b/lumibot/tools/polygon_helper.py index 47d215ff2..3325a04c1 100644 --- a/lumibot/tools/polygon_helper.py +++ b/lumibot/tools/polygon_helper.py @@ -22,7 +22,6 @@ from lumibot.entities import Asset from lumibot import LUMIBOT_DEFAULT_PYTZ from lumibot.credentials import POLYGON_API_KEY -import bisect MAX_POLYGON_DAYS = 30 @@ -30,81 +29,48 @@ schedule_cache = {} buffered_schedules = {} -from datetime import timedelta -import pandas as pd - -# Initialize caches globally -buffered_schedules = {} -schedule_cache = {} def get_cached_schedule(cal, start_date, end_date, buffer_days=30): """ - Fetch schedule with a buffer at the end to reduce the number of API calls. - - Parameters: - - cal: pandas market calendar object with a 'name' attribute and a 'schedule' method. - - start_date (str or datetime-like): The start date for the schedule. - - end_date (str or datetime-like): The end date for the schedule. - - buffer_days (int): Number of buffer days to extend the end_date to minimize API calls. - - Returns: - - pd.DataFrame: The schedule DataFrame filtered between start_date and end_date. + Fetch schedule with a buffer at the end. This is done to reduce the number of calls to the calendar API (which is slow). """ - global buffered_schedules, schedule_cache - - # Validate buffer_days - if buffer_days < 0: - logging.error("buffer_days must be non-negative.") - return None + global buffered_schedules - # Convert start_date and end_date to pd.Timestamp - start_ts = pd.to_datetime(start_date) - end_ts = pd.to_datetime(end_date) + buffer_end = end_date + timedelta(days=buffer_days) + cache_key = (cal.name, start_date, end_date) - if start_ts > end_ts: - logging.error("start_date must be earlier than or equal to end_date.") - return None - - # Define the cache key - cache_key = (cal.name, start_ts, end_ts) - - # Return from cache if available + # Check if the required range is in the schedule cache if cache_key in schedule_cache: return schedule_cache[cache_key] - # Define buffer_end to extend the schedule range - buffer_end = end_ts + timedelta(days=buffer_days) - - # Retrieve the buffered schedule for the calendar if it exists - buffered_schedule = buffered_schedules.get(cal.name) - - if buffered_schedule is not None: - schedule_min = buffered_schedule.index[0] - schedule_max = buffered_schedule.index[-1] - - # Check if the buffered schedule covers the required range - if schedule_min <= start_ts and schedule_max >= end_ts: - # Use .loc for efficient slicing - filtered_schedule = buffered_schedule.loc[start_ts:end_ts] - # Cache and return + # Convert start_date and end_date to pd.Timestamp for comparison + start_timestamp = pd.Timestamp(start_date) + end_timestamp = pd.Timestamp(end_date) + + # Check if we have the buffered schedule for this calendar + if cal.name in buffered_schedules: + buffered_schedule = buffered_schedules[cal.name] + # Check if the current buffered schedule covers the required range + if buffered_schedule.index.min() <= start_timestamp and buffered_schedule.index.max() >= end_timestamp: + filtered_schedule = buffered_schedule[(buffered_schedule.index >= start_timestamp) & ( + buffered_schedule.index <= end_timestamp)] schedule_cache[cache_key] = filtered_schedule return filtered_schedule - # Fetch new buffered schedule from the calendar API - # Assuming cal.schedule returns a DataFrame with a sorted DatetimeIndex - new_buffered_schedule = cal.schedule(start_date=start_date, end_date=buffer_end) + # Fetch and cache the new buffered schedule + buffered_schedule = cal.schedule(start_date=start_date, end_date=buffer_end) + buffered_schedules[cal.name] = buffered_schedule # Store the buffered schedule for this calendar - # Update the buffered_schedules with the new buffered schedule - buffered_schedules[cal.name] = new_buffered_schedule + # Filter the schedule to only include the requested date range + filtered_schedule = buffered_schedule[(buffered_schedule.index >= start_timestamp) + & (buffered_schedule.index <= end_timestamp)] - # Filter the new buffered schedule to the requested date range using .loc - filtered_schedule = new_buffered_schedule.loc[start_ts:end_ts] - - # Cache the filtered schedule for future requests + # Cache the filtered schedule for quick lookup schedule_cache[cache_key] = filtered_schedule return filtered_schedule + def get_price_data_from_polygon( api_key: str, asset: Asset, @@ -434,10 +400,7 @@ def get_missing_dates(df_all, asset, start, end): # For Options, don't need any dates passed the expiration date if asset.asset_type == "option": - # Find the index where asset.expiration would be inserted to keep trading_dates sorted - index = bisect.bisect_right(trading_dates, asset.expiration) - # Slice the list to include only dates up to asset.expiration - trading_dates = trading_dates[:index] + trading_dates = [x for x in trading_dates if x <= asset.expiration] if df_all is None or not len(df_all) or df_all.empty: return trading_dates