diff --git a/alpaca_backtrader_api/alpacadata.py b/alpaca_backtrader_api/alpacadata.py index cb889c46..e037d965 100644 --- a/alpaca_backtrader_api/alpacadata.py +++ b/alpaca_backtrader_api/alpacadata.py @@ -3,6 +3,7 @@ from datetime import datetime, timedelta +import pytz from backtrader.feed import DataBase from backtrader import date2num, num2date from backtrader.utils.py3 import queue, with_metaclass @@ -133,6 +134,7 @@ class AlpacaData(with_metaclass(MetaAlpacaData, DataBase)): ('reconnect', True), ('reconnections', -1), # forever ('reconntimeout', 5.0), + ('tz', pytz.timezone('US/Eastern')), ) _store = alpacastore.AlpacaStore @@ -217,11 +219,11 @@ def _st_start(self, instart=True, tmout=None): self.put_notification(self.DELAYED) dtend = None if self.todate < float('inf'): - dtend = num2date(self.todate) + dtend = self.p.tz.localize(num2date(self.todate)) dtbegin = None if self.fromdate > float('-inf'): - dtbegin = num2date(self.fromdate) + dtbegin = self.p.tz.localize(num2date(self.fromdate)) self.qhist = self.o.candles( self.p.dataname, dtbegin, dtend, @@ -333,12 +335,12 @@ def _load(self): # len == 1 ... forwarded for the 1st time dtbegin = self.datetime.datetime(-1) elif self.fromdate > float('-inf'): - dtbegin = num2date(self.fromdate) + dtbegin = self.p.tz.localize(num2date(self.fromdate)) else: # 1st bar and no begin set # passing None to fetch max possible in 1 request dtbegin = None - dtend = datetime.utcfromtimestamp(int(msg['time'])) + dtend = datetime.fromtimestamp(int(msg['time']), tz=pytz.utc) self.qhist = self.o.candles( self.p.dataname, dtbegin, dtend, diff --git a/alpaca_backtrader_api/alpacastore.py b/alpaca_backtrader_api/alpacastore.py index c229cacd..7bfdedfa 100644 --- a/alpaca_backtrader_api/alpacastore.py +++ b/alpaca_backtrader_api/alpacastore.py @@ -183,6 +183,7 @@ async def on_trade(self, conn, stream, msg): class MetaSingleton(MetaParams): '''Metaclass to make a metaclassed class a singleton''' + def __init__(cls, name, bases, dct): super(MetaSingleton, cls).__init__(name, bases, dct) cls._singleton = None @@ -466,7 +467,10 @@ def _make_sure_dates_are_initialized_properly(self, dtbegin, dtend, if not dtend: dtend = pd.Timestamp('now', tz=NY) else: - dtend = pd.Timestamp(pytz.timezone('UTC').localize(dtend)) + if dtend.tzinfo: + dtend = pd.Timestamp(dtend.astimezone(pytz.timezone(NY))) + else: + dtend = pd.Timestamp(pytz.timezone(NY).localize(dtend)) if granularity == Granularity.Minute: calendar = trading_calendars.get_calendar(name='NYSE') while not calendar.is_open_on_minute(dtend): @@ -480,7 +484,11 @@ def _make_sure_dates_are_initialized_properly(self, dtbegin, dtend, delta = timedelta(days=days) dtbegin = dtend - delta else: - dtbegin = pd.Timestamp(pytz.timezone('UTC').localize(dtbegin)) + if dtbegin.tzinfo: + dtbegin = pd.Timestamp(dtbegin.astimezone(pytz.timezone(NY))) + else: + dtbegin = pd.Timestamp(pytz.timezone(NY).localize(dtbegin)) + while dtbegin > dtend: # if we start the script during market hours we could get this # situation. this resolves that. @@ -506,6 +514,7 @@ def get_aggs_from_polygon(self, time window of 2 weeks, and split the calls until we get all required data """ + def _clear_out_of_market_hours(df): """ only interested in samples between 9:30, 16:00 NY time @@ -575,6 +584,7 @@ def get_aggs_from_alpaca(self, but we need to manipulate it to be able to work with it smoothly and return data the same way polygon does """ + def _iterate_api_calls(): """ you could get max 1000 samples from the server. if we need more @@ -847,6 +857,7 @@ def _check_if_transaction_occurred(order_id): if trans is None: break self._process_transaction(order_id, trans) + while True: try: if self.q_ordercreate.empty(): @@ -934,7 +945,7 @@ def _transaction(self, trans): self._transpend[oid].append(trans) self._process_transaction(oid, trans) - _X_ORDER_FILLED = ('partially_filled', 'filled', ) + _X_ORDER_FILLED = ('partially_filled', 'filled',) def _process_transaction(self, oid, trans): try: