From 9c1da141bb5ca39de6fc4131d29562478e1bbee1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrew=20Mcdonald=20=D0=9F=D1=80=D0=B8=D1=82=D1=83=D0=BB?= =?UTF-8?q?=D0=B0?= Date: Fri, 25 Mar 2022 13:21:03 -0400 Subject: [PATCH] Adds a param to the stream classes to allow overriding of websocket defaults (#596) * Update stream classes and README * Fix flake8 warnings * Update Readme wording a little * Fix not none check --- README.md | 45 ++++++++++++++++++++++++---- alpaca_trade_api/stream.py | 61 ++++++++++++++++++++++++++++---------- 2 files changed, 84 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 35939c28..60d4c130 100644 --- a/README.md +++ b/README.md @@ -223,12 +223,18 @@ It provides a much faster way to retrieve the historic data for multiple symbols Under the hood we use the [aiohttp](https://docs.aiohttp.org/en/stable/) library.
We provide a code sample to get you started with this new approach and it is located [here](examples/historic_async.py).
Follow along the example code to learn more, and to utilize it to your own needs.
-### Live Stream Data -There are 2 streams available as described [here](https://alpaca.markets/docs/api-documentation/api-v2/market-data/alpaca-data-api-v2/real-time/).
-The free plan is using the `iex` stream, while the paid subscription is using the `sip` stream.
-You could subscribe to bars, trades or quotes and trade updates as well.
-Under the example folder you could find different code [samples](https://github.com/alpacahq/alpaca-trade-api-python/tree/master/examples/websockets) to achieve different goals. Let's see the basic example
-We present a new Streamer class under `alpaca_trade_api.stream` for API V2. + +### Live Stream Market Data +There are 2 streams available as described [here](https://alpaca.markets/docs/market-data/#subscription-plans). + +The free plan is using the `iex` stream, while the paid subscription is using the `sip` stream. + +You can subscribe to bars, trades, quotes, and trade updates for your account as well. +Under the example folder you can find different [code samples](https://github.com/alpacahq/alpaca-trade-api-python/tree/master/examples/websockets) +to achieve different goals. + +Here in this basic example, We use the Stream class under `alpaca_trade_api.stream` for API V2 to subscribe to trade +updates for AAPL and quote updates for IBM. ```py from alpaca_trade_api.stream import Stream @@ -251,9 +257,36 @@ stream.subscribe_trades(trade_callback, 'AAPL') stream.subscribe_quotes(quote_callback, 'IBM') stream.run() +``` + +#### Websockets Config For Live Data +Under the hood our SDK uses the [Websockets library](https://websockets.readthedocs.io/en/stable/index.html) to handle +our websocket connections. Since different environments can have wildly differing requirements for resources we allow you +to pass your own config options to the websockets lib via the `websocket_params` kwarg found on the Stream class. +ie: +```python +# Initiate Class Instance +stream = Stream(, + , + base_url=URL('https://paper-api.alpaca.markets'), + data_feed='iex', + websocket_params = {'ping_interval': 5}, #here we set ping_interval to 5 seconds + ) ``` +If you're curious [this link to their docs](https://websockets.readthedocs.io/en/stable/reference/client.html#opening-a-connection) +shows the values that websockets uses by default as well as any parameters they allow changing. Additionally, if you +don't specify any we set the following defaults on top of the ones the websockets library uses: +```python +{ + "ping_interval": 10, + "ping_timeout": 180, + "max_queue": 1024, +} +``` + + ## Account & Portfolio Management The HTTP API document is located at https://docs.alpaca.markets/ diff --git a/alpaca_trade_api/stream.py b/alpaca_trade_api/stream.py index 3e2ac2ca..f4d16df0 100644 --- a/alpaca_trade_api/stream.py +++ b/alpaca_trade_api/stream.py @@ -2,7 +2,7 @@ from collections import defaultdict import logging import json -from typing import List, Optional +from typing import Dict, List, Optional import msgpack import re import websockets @@ -30,18 +30,26 @@ log = logging.getLogger(__name__) +# Default Params we pass to the websocket constructors +WEBSOCKET_DEFAULTS = { + "ping_interval": 10, + "ping_timeout": 180, + "max_queue": 1024, +} + def _ensure_coroutine(handler): if not asyncio.iscoroutinefunction(handler): raise ValueError('handler must be a coroutine function') -class _DataStream(): +class _DataStream: def __init__(self, endpoint: str, key_id: str, secret_key: str, - raw_data: bool = False) -> None: + raw_data: bool = False, + websocket_params: Optional[Dict] = None) -> None: self._endpoint = endpoint self._key_id = key_id self._secret_key = secret_key @@ -60,14 +68,16 @@ def __init__(self, self._name = 'data' self._should_run = True self._max_frame_size = 32768 + self._websocket_params = websocket_params + + if self._websocket_params is None: + self._websocket_params = WEBSOCKET_DEFAULTS async def _connect(self): self._ws = await websockets.connect( self._endpoint, extra_headers={'Content-Type': 'application/msgpack'}, - ping_interval=10, - ping_timeout=180, - max_queue=1024, + **self._websocket_params ) r = await self._ws.recv() msg = msgpack.unpackb(r) @@ -327,12 +337,14 @@ def __init__(self, secret_key: str, base_url: URL, raw_data: bool, - feed: str = 'iex'): + feed: str = 'iex', + websocket_params: Optional[Dict] = None): base_url = re.sub(r'^http', 'ws', base_url) super().__init__(endpoint=base_url + '/v2/' + feed, key_id=key_id, secret_key=secret_key, raw_data=raw_data, + websocket_params=websocket_params ) self._handlers['statuses'] = {} self._handlers['lulds'] = {} @@ -453,7 +465,8 @@ def __init__(self, secret_key: str, base_url: URL, raw_data: bool, - exchanges: Optional[List[str]] = None): + exchanges: Optional[List[str]] = None, + websocket_params: Optional[Dict] = None): self._key_id = key_id self._secret_key = secret_key base_url = re.sub(r'^http', 'ws', base_url) @@ -467,6 +480,7 @@ def __init__(self, key_id=key_id, secret_key=secret_key, raw_data=raw_data, + websocket_params=websocket_params, ) self._name = 'crypto data' @@ -476,7 +490,8 @@ def __init__(self, key_id: str, secret_key: str, base_url: URL, - raw_data: bool): + raw_data: bool, + websocket_params: Optional[Dict] = None): self._key_id = key_id self._secret_key = secret_key base_url = re.sub(r'^http', 'ws', base_url) @@ -485,6 +500,7 @@ def __init__(self, key_id=key_id, secret_key=secret_key, raw_data=raw_data, + websocket_params=websocket_params ) self._handlers = { 'news': {}, @@ -534,7 +550,8 @@ def __init__(self, key_id: str, secret_key: str, base_url: URL, - raw_data: bool = False): + raw_data: bool = False, + websocket_params: Optional[Dict] = None): self._key_id = key_id self._secret_key = secret_key base_url = re.sub(r'^http', 'ws', base_url) @@ -546,9 +563,16 @@ def __init__(self, self._raw_data = raw_data self._stop_stream_queue = queue.Queue() self._should_run = True + self._websocket_params = websocket_params + + if self._websocket_params is None: + self._websocket_params = WEBSOCKET_DEFAULTS async def _connect(self): - self._ws = await websockets.connect(self._endpoint) + self._ws = await websockets.connect( + self._endpoint, + **self._websocket_params + ) async def _auth(self): await self._ws.send( @@ -675,7 +699,8 @@ def __init__(self, data_stream_url: URL = None, data_feed: str = 'iex', raw_data: bool = False, - crypto_exchanges: Optional[List[str]] = None): + crypto_exchanges: Optional[List[str]] = None, + websocket_params: Optional[Dict] = None): self._key_id, self._secret_key, _ = get_credentials(key_id, secret_key) self._base_url = base_url or get_base_url() self._data_stream_url = data_stream_url or get_data_stream_url() @@ -683,21 +708,25 @@ def __init__(self, self._trading_ws = TradingStream(self._key_id, self._secret_key, self._base_url, - raw_data) + raw_data, + websocket_params=websocket_params) self._data_ws = DataStream(self._key_id, self._secret_key, self._data_stream_url, raw_data, - data_feed.lower()) + data_feed.lower(), + websocket_params=websocket_params) self._crypto_ws = CryptoDataStream(self._key_id, self._secret_key, self._data_stream_url, raw_data, - crypto_exchanges) + crypto_exchanges, + websocket_params=websocket_params) self._news_ws = NewsDataStream(self._key_id, self._secret_key, self._data_stream_url, - raw_data) + raw_data, + websocket_params=websocket_params) def subscribe_trade_updates(self, handler): self._trading_ws.subscribe_trade_updates(handler)