Skip to content

Commit

Permalink
stream: New custom stream added to implement broker polling for order…
Browse files Browse the repository at this point in the history
… updates.
  • Loading branch information
davidlatte committed Jan 30, 2024
1 parent 964d6d2 commit 9264ae6
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 82 deletions.
3 changes: 3 additions & 0 deletions lumibot/brokers/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def get_historical_account_value(self):
"""
pass

# =========Streaming functions=======================

@abstractmethod
def _get_stream_object(self):
"""
Expand Down Expand Up @@ -166,6 +168,7 @@ def _pull_broker_open_orders(self):
pass

# =========Market functions=======================

def get_last_price(self, asset: Asset, quote=None, exchange=None) -> float:
"""
Takes an asset and returns the last known price
Expand Down
251 changes: 187 additions & 64 deletions lumibot/brokers/tradier.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
import logging

from lumiwealth_tradier import Tradier as _Tradier
import traceback

from lumibot.brokers import Broker
from lumibot.data_sources.tradier_data import TradierData
from lumibot.entities import Asset, Order, Position
from lumibot.tools.helpers import create_options_symbol, parse_symbol
from lumibot.trading_builtins import PollingStream
from lumiwealth_tradier import Tradier as _Tradier


class Tradier(Broker):
"""
Broker that connects to Tradier API to place orders and retrieve data
Broker that connects to Tradier API to place orders and retrieve data. Tradier API only supports Order streaming
for live accounts, paper trading accounts must use a 'polling' method to retrieve order updates. This class will
still use a CustomStream object to process order updates (which can be confusing!), but this will more seamlessly
match what other LumiBrokers are doing without requiring changes to the stategy_executor. The CustomStream object
will be used to process order updates from the broker when the 'parse_broker_order' function is called, which
works because the strategy_executor calls sync_broker() for the start of every trading iteration. This
polling method will also work for Live accounts, so it will be used by default. However, future updates will be
made to natively support streaming for Live accounts.
"""
POLL_EVENT = PollingStream.POLL_EVENT

def __init__(
self,
Expand All @@ -22,6 +31,7 @@ def __init__(
max_workers=20,
connect_stream=True,
data_source=None,
polling_interval=10.0,
):
# Check if the user provided both config file and keys
if (access_token is not None or account_number is not None or paper is not None) and config is not None:
Expand Down Expand Up @@ -53,6 +63,7 @@ def __init__(
self._tradier_access_token = access_token
self._tradier_account_number = account_number
self._tradier_paper = paper
self.polling_interval = polling_interval

self.market = "NYSE" # The default market is NYSE.

Expand All @@ -76,9 +87,6 @@ def __init__(
connect_stream=connect_stream,
)

def validate_credentials(self):
pass

def cancel_order(self, order: Order):
"""Cancels an order at the broker. Nothing will be done for orders that are already cancelled or filled."""
# Check if the order is already cancelled or filled
Expand Down Expand Up @@ -121,58 +129,20 @@ def _submit_order(self, order: Order):
tag=tag,
)
elif order.asset.asset_type == "option":
side = order.side

# Convert the side to the Tradier side for options orders if necessary
if side == "buy" or side == "sell":
# Check if we currently own the option
position = self._pull_position(None, order.asset)

# Check if we own the option then we need to sell to close or buy to close
if position is not None:
if position.quantity > 0 and side == "sell":
side = "sell_to_close"
elif position.quantity > 0 and side == "buy":
side = "buy_to_open"
elif position.quantity < 0 and side == "buy":
side = "buy_to_close"
elif position.quantity < 0 and side == "sell":
side = "sell_to_open"
else:
logging.error(
f"Unable to determine the correct side for the order. "
f"Position: {position}, Order: {order}"
)
return None

# Otherwise, we don't own the option so we need to buy to open or sell to open
else:
if side == "buy":
side = "buy_to_open"
elif side == "sell":
side = "sell_to_open"
else:
logging.error(
f"Unable to determine the correct side for the order. "
f"Position: {position}, Order: {order}"
)
return None

# Check if the sie is a valid Tradier side
if side not in ["buy_to_open", "buy_to_close", "sell_to_open", "sell_to_close"]:
logging.error(f"Invalid order side for Tradier: {side}")
return None

tradier_side = self._lumi_side2tradier(order)
stock_symbol = order.asset.symbol
option_symbol = create_options_symbol(
order.asset.symbol, order.asset.expiration, order.asset.right, order.asset.strike
)

symbol_data = parse_symbol(option_symbol)
stock_symbol = symbol_data["stock_symbol"]
if not tradier_side or not option_symbol:
logging.error(f"Unable to parse order {order} for Tradier.")
return None

order_response = self.tradier.orders.order_option(
stock_symbol,
option_symbol,
side,
tradier_side,
order.quantity,
order_type=order.type,
duration=order.time_in_force,
Expand Down Expand Up @@ -204,15 +174,6 @@ def _get_balances_at_broker(self, quote_asset: Asset):
def get_historical_account_value(self):
pass

def _get_stream_object(self):
pass

def _register_stream_events(self):
pass

def _run_stream(self):
pass

def _pull_positions(self, strategy):
positions_df = self.tradier.account.get_positions()

Expand Down Expand Up @@ -285,11 +246,173 @@ def _pull_position(self, strategy, asset):

return None

def _parse_broker_order(self, response, strategy_name, strategy_object=None):
pass
def _parse_broker_order(self, response: dict, strategy_name: str, strategy_object=None):
"""
Parse a broker order representation to a Lumi order object. Once the Lumi order has been created, it will
be dispatched to our "stream" queue for processing until a time when Live Streaming can be implemented.
:param response: The output from TradierAPI call returned _by pull_broker_order()
:param strategy_name: The name of the strategy that placed the order
:param strategy_object: The strategy object that placed the order
"""
asset = Asset(
symbol=response["symbol"],
asset_type=response["class"],
expiration=response["expiration_date"],
right=response["option_type"],
strike=response["strike_price"],
)
order = Order(
strategy=strategy_name,
asset=asset,
)

# There has been an update to this order's status or quantity, to send it to the queue for processing
self.stream.dispatch(

order=order,
)
return order

def _pull_broker_order(self, identifier):
pass
"""
This function pulls a single order from the broker by its identifier. Order is converted to a dictionary,
and then returned. It is expected that the caller will convert the dictionary to an Order object by
calling parse_broker_order() on the dictionary. Parsing the order will also dispatch it to the stream for
processing.
"""
orders = self.tradier.orders.get_order(identifier).to_dict('records')
return orders[0] if len(orders) > 0 else None

def _pull_broker_open_orders(self):
pass
"""
This function pulls all open orders from the broker. Orders are converted to a list of dictionaries,
and then returned. It is expected that the caller will convert each dictionary to an Order object by
calling parse_broker_order() on the dictionary. Parsing the order will also dispatch it to the stream for
processing.
"""
df = self.tradier.orders.get_orders().to_dict('records')
df_open = df[df['status'].isin(['open', 'partially_filled', 'pending'])]
return df_open.to_dict('records')

def _lumi_side2tradier(self, order: Order) -> str:
side = order.side
if order.asset.asset_type == "stock":
return side

# Convert the side to the Tradier side for options orders if necessary
if side == "buy" or side == "sell":
# Check if we currently own the option
position = self._pull_position(order.strategy, order.asset)

# Check if we own the option then we need to sell to close or buy to close
if position is not None:
if position.quantity > 0 and side == "sell":
side = "sell_to_close"
elif position.quantity >= 0 and side == "buy":
side = "buy_to_open"
elif position.quantity < 0 and side == "buy":
side = "buy_to_close"
elif position.quantity <= 0 and side == "sell":
side = "sell_to_open"
else:
logging.error(
f"Unable to determine the correct side for the order. "
f"Position: {position}, Order: {order}"
)

# Otherwise, we don't own the option so we need to buy to open or sell to open
else:
side = "buy_to_open" if side == "buy" else "sell_to_open"

# Check if the side is a valid Tradier side
if side not in ["buy_to_open", "buy_to_close", "sell_to_open", "sell_to_close"]:
logging.error(f"Invalid option order side for Tradier: {order.side}")
return ''

return side

@staticmethod
def _tradier_side2lumi(side):
"""
Converts a Tradier side to a Lumi side.
Valid Stock Sides: buy, buy_to_cover, sell, sell_short
Valid Option Sides: buy_to_open, buy_to_close, sell_to_open, sell_to_close
"""
if "buy" in side:
return "buy"
elif "sell" in side:
return "sell"
else:
raise ValueError(f"Invalid side {side} for Tradier.")

# ==========Processing streams data=======================

def _get_stream_object(self):
"""get the broker stream connection"""
stream = PollingStream(self.polling_interval)
return stream

def _register_stream_events(self):
"""Register the function on_trade_event
to be executed on each trade_update event"""
broker = self

@broker.stream.add_action(broker.POLL_EVENT)
def on_trade_event_poll():
# Get current orders from the broker and dispatch them to the stream for processing
pass

@broker.stream.add_action(broker.NEW_ORDER)
def on_trade_event_new(order):
try:
broker._process_trade_event(
order,
broker.NEW_ORDER,
)
return True
except:
logging.error(traceback.format_exc())

@broker.stream.add_action(broker.FILLED_ORDER)
def on_trade_event_fill(order, price, filled_quantity):
try:
broker._process_trade_event(
order,
broker.FILLED_ORDER,
price=price,
filled_quantity=filled_quantity,
multiplier=order.asset.multiplier,
)
return True
except:
logging.error(traceback.format_exc())

@broker.stream.add_action(broker.CANCELED_ORDER)
def on_trade_event_cancel(order):
try:
broker._process_trade_event(
order,
broker.CANCELED_ORDER,
)
return True
except:
logging.error(traceback.format_exc())

@broker.stream.add_action(broker.CASH_SETTLED)
def on_trade_event(order, price, filled_quantity):
try:
broker._process_trade_event(
order,
broker.CASH_SETTLED,
price=price,
filled_quantity=filled_quantity,
multiplier=order.asset.multiplier,
)
return True
except:
logging.error(traceback.format_exc())

def _run_stream(self):
self._stream_established()
self.stream._run()
11 changes: 11 additions & 0 deletions lumibot/entities/order.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ def __init__(
self.custom_params = custom_params
self._trail_stop_price = None
self.tag = tag
self.avg_fill_price = 0.0 # The weighted average filled price for this order. Calculated if not given by broker
self.broker_create_date = None # The datetime the order was created by the broker
self.broker_update_date = None # The datetime the order was last updated by the broker

# Options:
self.exchange = exchange
Expand Down Expand Up @@ -477,6 +480,14 @@ def _set_type(
self.limit_price = check_price(limit_price, "limit_price must be positive float.")
self.stop_price = check_price(stop_price, "stop_price must be positive float.")

@property
def avg_fill_price(self):
return self._avg_fill_price

@avg_fill_price.setter
def avg_fill_price(self, value):
self._avg_fill_price = round(float(value), 2) if value else 0.0

@property
def status(self):
return self._status
Expand Down
2 changes: 1 addition & 1 deletion lumibot/trading_builtins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .custom_stream import CustomStream
from .custom_stream import CustomStream, PollingStream
from .safe_list import SafeList
Loading

0 comments on commit 9264ae6

Please sign in to comment.