Skip to content

Commit

Permalink
Major changes:
Browse files Browse the repository at this point in the history
1. Bring up thetadata feature as datasource for backtesting

Small feature changes:
1. Introduced a automatic holiday + weekends detection feature in thetadata_helper.py
   When see weekends or holiday, skip streaming data from thetadata
2. Handled error messages, in skip days, avoid print out any error messages
3. If there is data stream error in normal days, still print out errors as original logic does
4. Debugged thetadata feature, added summer vs winter timezone adjustment, subtracted 1 min from thetadata
5. thetadata_helper.py Will only consider trading day as start day when acquiring data from thetadata, no further history needed
6. Time sync handling:THetadata works with NewYork time, but polygon works with UTC time. Also ThetaData is 1 min late from polygon data

Signed-off-by: Haochi Li <[email protected]>
  • Loading branch information
haochili committed Aug 7, 2024
1 parent b0cdb10 commit 06da7d7
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 162 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/cicd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ jobs:
AIOHTTP_NO_EXTENSIONS: 1
POLYGON_API_KEY: ${{secrets.POLYGON_API_KEY}} # Required for Polygon API BackTests
POLYGON_IS_PAID_SUBSCRIPTION: $POLYGON_IS_PAID_SUBSCRIPTION
THETADATA_USERNAME: ${{secrets.THETADATA_USERNAME}}
THETADATA_PASSWORD: ${{secrets.THETADATA_PASSWORD}}
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.11
Expand Down
19 changes: 12 additions & 7 deletions lumibot/backtesting/backtesting_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ class BacktestingBroker(Broker):
# Metainfo
IS_BACKTESTING_BROKER = True

def __init__(self, data_source, connect_stream=True, max_workers=20, config=None, **kwargs):
super().__init__(name="backtesting", data_source=data_source, connect_stream=connect_stream, **kwargs)
def __init__(self, data_source, option_source=None, connect_stream=True, max_workers=20, config=None, **kwargs):
super().__init__(name="backtesting", data_source=data_source,
option_source=option_source, connect_stream=connect_stream, **kwargs)
# Calling init methods
self.max_workers = max_workers
self.market = "NASDAQ"
self.option_source = option_source

# Legacy strategy.backtest code will always pass in a config even for Brokers that don't need it, so
# catch it here and ignore it in this class. Child classes that need it should error check it themselves.
Expand Down Expand Up @@ -91,8 +93,9 @@ def _update_datetime(self, update_dt, cash=None, portfolio_value=None):
new_datetime = self.datetime + timedelta(seconds=update_dt)
else:
new_datetime = update_dt

self.data_source._update_datetime(new_datetime, cash=cash, portfolio_value=portfolio_value)
if self.option_source:
self.option_source._update_datetime(new_datetime, cash=cash, portfolio_value=portfolio_value)
logging.info(f"Current backtesting datetime {self.datetime}")

# =========Clock functions=====================
Expand Down Expand Up @@ -166,10 +169,10 @@ def get_time_to_open(self):
def get_time_to_close(self):
"""Return the remaining time for the market to close in seconds"""
now = self.datetime

# Use searchsorted for efficient searching and reduce unnecessary DataFrame access
idx = self._trading_days.index.searchsorted(now, side='left')

if idx >= len(self._trading_days):
logging.error("Cannot predict future")
return 0
Expand Down Expand Up @@ -493,14 +496,14 @@ def process_expired_option_contracts(self, strategy):
"""
if self.data_source.SOURCE != "PANDAS":
return

# If it's the same day as the expiration, we need to check the time to see if it's after market close
time_to_close = self.get_time_to_close()

# If the time to close is None, then the market is not open so we should not sell the contracts
if time_to_close is None:
return

# Calculate the number of seconds before market close
seconds_before_closing = strategy.minutes_before_closing * 60

Expand Down Expand Up @@ -634,6 +637,8 @@ def process_pending_orders(self, strategy):
low = df["low"].iloc[0]
close = df["close"].iloc[0]
volume = df["volume"].iloc[0]
print(f"in BacktestingBroker, asset: {asset}")
print(f"in BacktestingBroker, OHLCV: {open}, {high}, {low}, {close}, {volume}")

#############################
# Determine transaction price.
Expand Down
5 changes: 0 additions & 5 deletions lumibot/backtesting/polygon_backtesting.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def _update_pandas_data(self, asset, quote, length, timestep, start_dt=None):
start_datetime, ts_unit = self.get_start_datetime_and_ts_unit(
length, timestep, start_dt, start_buffer=START_BUFFER
)

# Check if we have data for this asset
if search_asset in self.pandas_data:
asset_data = self.pandas_data[search_asset]
Expand Down Expand Up @@ -170,10 +169,8 @@ def _update_pandas_data(self, asset, quote, length, timestep, start_dt=None):

if (df is None) or df.empty:
return

data = Data(asset_separated, df, timestep=ts_unit, quote=quote_asset)
pandas_data_update = self._set_pandas_data_keys([data])

# Add the keys to the self.pandas_data dictionary
self.pandas_data.update(pandas_data_update)
if PolygonDataBacktesting.MAX_STORAGE_BYTES:
Expand All @@ -191,10 +188,8 @@ def _pull_source_symbol_bars(
):
# Get the current datetime and calculate the start datetime
current_dt = self.get_datetime()

# Get data from Polygon
self._update_pandas_data(asset, quote, length, timestep, current_dt)

return super()._pull_source_symbol_bars(
asset, length, timestep, timeshift, quote, exchange, include_after_hours
)
Expand Down
36 changes: 30 additions & 6 deletions lumibot/backtesting/thetadata_backtesting.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
from lumibot.data_sources import PandasData
from lumibot.entities import Asset, Data
from lumibot.tools import thetadata_helper
import subprocess

START_BUFFER = timedelta(days=5)


class ThetaDataBacktesting(PandasData):
"""
Backtesting implementation of Polygon
Backtesting implementation of ThetaData
"""

def __init__(
Expand All @@ -28,6 +29,25 @@ def __init__(

self._username = username
self._password = password
self.kill_processes_by_name("ThetaTerminal.jar")

def kill_processes_by_name(self, keyword):
try:
# Find all processes related to the keyword
result = subprocess.run(['pgrep', '-f', keyword], capture_output=True, text=True)
pids = result.stdout.strip().split('\n')

if pids:
for pid in pids:
if pid: # Ensure the PID is not empty
logging.info(f"Killing process with PID: {pid}")
subprocess.run(['kill', '-9', pid])
logging.info(f"All processes related to '{keyword}' have been killed.")
else:
logging.info(f"No processes found related to '{keyword}'.")

except Exception as e:
print(f"An error occurred during kill process: {e}")

def update_pandas_data(self, asset, quote, length, timestep, start_dt=None):
"""
Expand Down Expand Up @@ -106,9 +126,10 @@ def update_pandas_data(self, asset, quote, length, timestep, start_dt=None):
# We don't have enough data, so we need to get more (but in minutes)
ts_unit = "minute"

# Download data from Polygon
# Download data from ThetaData
try:
# Get data from Polygon
# Get data from ThetaData
date_time_now = self.get_datetime()
df = thetadata_helper.get_price_data(
self._username,
self._password,
Expand All @@ -117,9 +138,12 @@ def update_pandas_data(self, asset, quote, length, timestep, start_dt=None):
self.datetime_end,
timespan=ts_unit,
quote_asset=quote_asset,
dt=self.get_datetime()
)
# save df to csv file
# df.to_csv(f"theta_csv/wrong{date_time_now}_{asset.strike}_{asset.expiration}_{asset.right}.csv")
except Exception as e:
logging.error(traceback.format_exc())
logging.info(traceback.format_exc())
raise Exception("Error getting data from ThetaData") from e

if df is None:
Expand Down Expand Up @@ -187,13 +211,13 @@ def get_last_price(self, asset, timestep="minute", quote=None, exchange=None, **
self.pandas_data.update(pandas_data_update)
self._data_store.update(pandas_data_update)
except Exception as e:
print(f"Error get_last_price from Polygon: {e}")
print(f"Error get_last_price from ThetaData: {e}")

return super().get_last_price(asset=asset, quote=quote, exchange=exchange)

def get_chains(self, asset):
"""
Integrates the Polygon client library into the LumiBot backtest for Options Data in the same
Integrates the ThetaData client library into the LumiBot backtest for Options Data in the same
structure as Interactive Brokers options chain data
Parameters
Expand Down
13 changes: 10 additions & 3 deletions lumibot/brokers/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from lumibot.entities import Asset, Order, Position
from lumibot.trading_builtins import SafeList


class CustomLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
# Check if the level is enabled to avoid formatting costs if not necessary
Expand All @@ -31,6 +32,7 @@ def update_strategy_name(self, new_strategy_name):
# Pre-format part of the log message that's static or changes infrequently
self.formatted_prefix = f'[{new_strategy_name}]'


class Broker(ABC):
# Metainfo
IS_BACKTESTING_BROKER = False
Expand All @@ -43,7 +45,7 @@ class Broker(ABC):
CASH_SETTLED = "cash_settled"
ERROR_ORDER = "error"

def __init__(self, name="", connect_stream=True, data_source: DataSource = None, config=None, max_workers=20):
def __init__(self, name="", connect_stream=True, data_source: DataSource = None, option_source: DataSource = None, config=None, max_workers=20):
"""Broker constructor"""
# Shared Variables between threads
self.name = name
Expand All @@ -63,6 +65,7 @@ def __init__(self, name="", connect_stream=True, data_source: DataSource = None,
self._config = config
self._strategy_name = ""
self.data_source = data_source
self.option_source = option_source
self.max_workers = min(max_workers, 200)
self.quote_assets = set() # Quote positions will never be removed from tracking during sync operations

Expand Down Expand Up @@ -273,7 +276,10 @@ def get_last_price(self, asset: Asset, quote=None, exchange=None) -> float:
float
The last known price of the asset.
"""
return self.data_source.get_last_price(asset, quote=quote, exchange=exchange)
if self.option_source and asset.asset_type == "option":
return self.option_source.get_last_price(asset, quote=quote, exchange=exchange)
else:
return self.data_source.get_last_price(asset, quote=quote, exchange=exchange)

def get_last_prices(self, assets, quote=None, exchange=None):
"""
Expand Down Expand Up @@ -1116,7 +1122,8 @@ def _process_trade_event(self, stored_order, type_event, price=None, filled_quan
)

if filled_quantity is not None:
error = ValueError(f"filled_quantity must be a positive integer or float, received {filled_quantity} instead")
error = ValueError(
f"filled_quantity must be a positive integer or float, received {filled_quantity} instead")
try:
if not isinstance(filled_quantity, float):
filled_quantity = float(filled_quantity)
Expand Down
43 changes: 1 addition & 42 deletions lumibot/data_sources/pandas_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from datetime import date, timedelta

import pandas as pd

from lumibot.data_sources import DataSourceBacktesting
from lumibot.entities import Asset, AssetsMapping, Bars

Expand Down Expand Up @@ -250,6 +249,7 @@ def _pull_source_symbol_bars(
now = self.get_datetime()
try:
res = data.get_bars(now, length=length, timestep=timestep, timeshift=timeshift)
print(f"\npandas_data.py:_pull_source_symbol_bars after data.get_bars, time:{now}")
# Return None if data.get_bars returns a ValueError
except ValueError as e:
logging.info(f"Error getting bars for {asset}: {e}")
Expand Down Expand Up @@ -388,43 +388,6 @@ def get_start_datetime_and_ts_unit(self, length, timestep, start_dt=None, start_
str
The timestep unit.
"""


<< << << < HEAD
== == == =
strikes = []
for store_item, data in self._data_store.items():
store_asset = store_item[0]
if (
store_asset.asset_type == "option"
and store_asset.symbol == asset.symbol
and store_asset.expiration == asset.expiration
and store_asset.right == asset.right
):
strikes.append(float(store_asset.strike))

return sorted(list(set(strikes)))

def get_start_datetime_and_ts_unit(self, length, timestep, start_dt=None, start_buffer=timedelta(days=5)):
"""
Get the start datetime for the data.
Parameters
----------
length : int
The number of data points to get.
timestep : str
The timestep to use. For example, "1minute" or "1hour" or "1day".
Returns
-------
datetime
The start datetime.
str
The timestep unit.
"""
>>>>>>> thetadata almost complete. tests passing
# Convert timestep string to timedelta and get start datetime
td, ts_unit = self.convert_timestep_str_to_timedelta(timestep)

Expand All @@ -441,7 +404,6 @@ def get_start_datetime_and_ts_unit(self, length, timestep, start_dt=None, start_
start_datetime = start_datetime - start_buffer

return start_datetime, ts_unit
<<<<<<< HEAD

def get_historical_prices(
self, asset, length, timestep="", timeshift=None, quote=None, exchange=None, include_after_hours=True
Expand All @@ -452,7 +414,6 @@ def get_historical_prices(

if not timestep:
timestep = self.get_timestep()

response = self._pull_source_symbol_bars(
asset,
length,
Expand All @@ -469,5 +430,3 @@ def get_historical_prices(

bars = self._parse_source_symbol_bars(response, asset, quote=quote, length=length)
return bars
=======
>>>>>>> thetadata almost complete. tests passing
Loading

0 comments on commit 06da7d7

Please sign in to comment.