Skip to content

Commit

Permalink
improved yahoo downloads for large amounts of assets
Browse files Browse the repository at this point in the history
  • Loading branch information
grzesir committed Jan 14, 2025
1 parent 91a95a6 commit a16216d
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 52 deletions.
40 changes: 23 additions & 17 deletions lumibot/data_sources/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
import traceback
import time

import pandas as pd
Expand Down Expand Up @@ -307,42 +308,47 @@ def get_bars(
length,
timestep="minute",
timeshift=None,
chunk_size=10,
max_workers=200,
chunk_size=2,
max_workers=2,
quote=None,
exchange=None,
include_after_hours=True,
):
"""Get bars for the list of assets"""

def process_chunk(chunk):
"""Process a chunk of assets."""
chunk_result = {}
for asset in chunk:
chunk_result[asset] = self.get_historical_prices(
asset,
length,
timestep=timestep,
timeshift=timeshift,
quote=quote,
exchange=exchange,
include_after_hours=include_after_hours,
)
try:
chunk_result[asset] = self.get_historical_prices(
asset,
length,
timestep=timestep,
timeshift=timeshift,
quote=quote,
exchange=exchange,
include_after_hours=include_after_hours,
)

# Sleep to prevent rate limiting
time.sleep(0.1)
except Exception as e:
# Log once per asset to avoid spamming with a huge traceback
logging.warning(f"Error retrieving data for {asset.symbol}: {e}")
tb = traceback.format_exc()
logging.warning(tb) # This prints the traceback
chunk_result[asset] = None
return chunk_result

# Convert strings to Asset objects
assets = [Asset(symbol=a) if isinstance(a, str) else a for a in assets]

# Chunking the assets
# Chunk the assets
chunks = [assets[i : i + chunk_size] for i in range(0, len(assets), chunk_size)]

# Initialize ThreadPoolExecutor
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit tasks
futures = [executor.submit(process_chunk, chunk) for chunk in chunks]

# Collect results as they complete
for future in as_completed(futures):
results.update(future.result())

Expand Down
8 changes: 4 additions & 4 deletions lumibot/entities/bars.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ def __init__(self, df, source, asset, quote=None, raw=None):
self._raw = raw

if "dividend" in df.columns:
df["price_change"] = df["close"].pct_change()
df["dividend_yield"] = df["dividend"] / df["close"]
df["return"] = df["dividend_yield"] + df["price_change"]
df.loc[:, "price_change"] = df["close"].pct_change()
df.loc[:, "dividend_yield"] = df["dividend"] / df["close"]
df.loc[:, "return"] = df["dividend_yield"] + df["price_change"]
else:
df["return"] = df["close"].pct_change()
df.loc[:, "return"] = df["close"].pct_change()

self.df = df

Expand Down
129 changes: 99 additions & 30 deletions lumibot/tools/yahoo_helper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import pickle
import time
from datetime import datetime, timedelta

import pandas as pd
Expand All @@ -11,6 +12,7 @@
from .helpers import get_lumibot_datetime

INFO_DATA = "info"
INVALID_SYMBOLS = set()


class _YahooData:
Expand Down Expand Up @@ -91,6 +93,10 @@ def dump_pickle_file(symbol, type, data):

@staticmethod
def format_df(df, auto_adjust):
# Check if df is empty
if df is None or df.empty:
return df

if auto_adjust:
del df["Adj Ratio"]
del df["Close"]
Expand Down Expand Up @@ -169,41 +175,104 @@ def get_symbol_last_price(symbol):

return df["Close"].iloc[-1]

@staticmethod
def download_symbol_data(symbol, interval="1d"):
ticker = yf.Ticker(symbol)
try:
if interval == "1m":
# Yahoo only supports 1 minute interval for past 7 days
df = ticker.history(interval=interval, start=get_lumibot_datetime() - timedelta(days=7), auto_adjust=False)
elif interval == "15m":
# Yahoo only supports 15 minute interval for past 60 days
df = ticker.history(interval=interval, start=get_lumibot_datetime() - timedelta(days=60), auto_adjust=False)
else:
df = ticker.history(interval=interval, period="max", auto_adjust=False)
except Exception as e:
logging.debug(f"Error while downloading symbol day data for {symbol}, returning empty dataframe for now.")
logging.debug(e)
"""
Attempts to download historical data from yfinance for the specified symbol and interval.
Retries on empty/None data in case of transient rate limits.
If all attempts fail, marks the symbol as invalid (added to INVALID_SYMBOLS) to skip it in future.
If symbol info is unavailable, we just skip timezone adjustments (do not return None).
"""

# If we've already marked this symbol invalid, skip further calls
if symbol in INVALID_SYMBOLS:
logging.debug(f"{symbol} is already marked invalid. Skipping yfinance calls.")
return None

# Adjust the time when we are getting daily stock data to the beginning of the day
# This way the times line up when backtesting daily data
info = YahooHelper.get_symbol_info(symbol)
if info.get("info") and info.get("info").get("market") == "us_market":
# Check if the timezone is already set, if not set it to the default timezone
if df.index.tzinfo is None:
df.index = df.index.tz_localize(info.get("info").get("exchangeTimezoneName"))
else:
df.index = df.index.tz_convert(info.get("info").get("exchangeTimezoneName"))
df.index = df.index.map(lambda t: t.replace(hour=16, minute=0))
elif info.get("info") and info.get("info").get("market") == "ccc_market":
# Check if the timezone is already set, if not set it to the default timezone
if df.index.tzinfo is None:
df.index = df.index.tz_localize(info.get("info").get("exchangeTimezoneName"))
ticker = yf.Ticker(symbol)

# --- HISTORICAL DATA RETRY LOGIC ---
max_retries = 3
sleep_sec = 1
df = None

for attempt in range(1, max_retries + 1):
try:
if interval == "1m":
df = ticker.history(
interval=interval,
start=get_lumibot_datetime() - timedelta(days=7),
auto_adjust=False
)
elif interval == "15m":
df = ticker.history(
interval=interval,
start=get_lumibot_datetime() - timedelta(days=60),
auto_adjust=False
)
else:
df = ticker.history(
interval=interval,
period="max",
auto_adjust=False
)
except Exception as e:
logging.debug(f"{symbol}: Exception from ticker.history(): {e}")
if attempt < max_retries:
logging.debug(f"{symbol}: Attempt {attempt} failed. Sleeping {sleep_sec}s, then retry.")
time.sleep(sleep_sec)
sleep_sec *= 2
continue
else:
logging.debug(f"{symbol}: All {max_retries} attempts failed. Marking invalid.")
INVALID_SYMBOLS.add(symbol)
return None

if df is None or df.empty:
logging.debug(f"{symbol}: Attempt {attempt} returned empty or None data.")
if attempt < max_retries:
logging.debug(f"{symbol}: Sleeping {sleep_sec}s, then retry.")
time.sleep(sleep_sec)
sleep_sec *= 2
else:
logging.debug(f"{symbol}: Data still empty after {max_retries} attempts. Marking invalid.")
INVALID_SYMBOLS.add(symbol)
return None
else:
df.index = df.index.tz_convert(info.get("info").get("exchangeTimezoneName"))
df.index = df.index.map(lambda t: t.replace(hour=23, minute=59))
# Successfully got data, so break out of the loop
break

# --- SYMBOL INFO (OPTIONAL) ---
info = None
try:
info = YahooHelper.get_symbol_info(symbol)
except Exception as e:
logging.debug(f"{symbol}: Exception from get_symbol_info(): {e}")

# If we have valid info, handle timezone adjustments.
# Using sub_info to avoid accessing .get() on None.
if info and isinstance(info, dict):
sub_info = info.get("info", {})
if isinstance(sub_info, dict):
market = sub_info.get("market", "")
tz_name = sub_info.get("exchangeTimezoneName", None)

# US market
if market == "us_market" and tz_name:
if df.index.tzinfo is None:
df.index = df.index.tz_localize(tz_name)
else:
df.index = df.index.tz_convert(tz_name)
df.index = df.index.map(lambda t: t.replace(hour=16, minute=0))

# Crypto/CCC market
elif market == "ccc_market" and tz_name:
if df.index.tzinfo is None:
df.index = df.index.tz_localize(tz_name)
else:
df.index = df.index.tz_convert(tz_name)
df.index = df.index.map(lambda t: t.replace(hour=23, minute=59))

# Finally, run any custom DataFrame processing
df = YahooHelper.process_df(df, asset_info=info)
return df

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="lumibot",
version="3.8.25",
version="3.8.26",
author="Robert Grzesik",
author_email="[email protected]",
description="Backtesting and Trading Library, Made by Lumiwealth",
Expand Down

0 comments on commit a16216d

Please sign in to comment.