Skip to content

Commit

Permalink
Adds a param to the stream classes to allow overriding of websocket d…
Browse files Browse the repository at this point in the history
…efaults (#596)

* Update stream classes and README

* Fix flake8 warnings

* Update Readme wording a little

* Fix not none check
  • Loading branch information
drew887 authored Mar 25, 2022
1 parent 6700281 commit 9c1da14
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 22 deletions.
45 changes: 39 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,18 @@ It provides a much faster way to retrieve the historic data for multiple symbols
Under the hood we use the [aiohttp](https://docs.aiohttp.org/en/stable/) library.<br>
We provide a code sample to get you started with this new approach and it is located [here](examples/historic_async.py).<br>
Follow along the example code to learn more, and to utilize it to your own needs.<br>
### Live Stream Data
There are 2 streams available as described [here](https://alpaca.markets/docs/api-documentation/api-v2/market-data/alpaca-data-api-v2/real-time/).<br>
The free plan is using the `iex` stream, while the paid subscription is using the `sip` stream.<br>
You could subscribe to bars, trades or quotes and trade updates as well.<br>
Under the example folder you could find different code [samples](https://github.com/alpacahq/alpaca-trade-api-python/tree/master/examples/websockets) to achieve different goals. Let's see the basic example<br>
We present a new Streamer class under `alpaca_trade_api.stream` for API V2.

### Live Stream Market Data
There are 2 streams available as described [here](https://alpaca.markets/docs/market-data/#subscription-plans).

The free plan is using the `iex` stream, while the paid subscription is using the `sip` stream.

You can subscribe to bars, trades, quotes, and trade updates for your account as well.
Under the example folder you can find different [code samples](https://github.com/alpacahq/alpaca-trade-api-python/tree/master/examples/websockets)
to achieve different goals.

Here in this basic example, We use the Stream class under `alpaca_trade_api.stream` for API V2 to subscribe to trade
updates for AAPL and quote updates for IBM.
```py
from alpaca_trade_api.stream import Stream

Expand All @@ -251,9 +257,36 @@ stream.subscribe_trades(trade_callback, 'AAPL')
stream.subscribe_quotes(quote_callback, 'IBM')

stream.run()
```

#### Websockets Config For Live Data
Under the hood our SDK uses the [Websockets library](https://websockets.readthedocs.io/en/stable/index.html) to handle
our websocket connections. Since different environments can have wildly differing requirements for resources we allow you
to pass your own config options to the websockets lib via the `websocket_params` kwarg found on the Stream class.

ie:
```python
# Initiate Class Instance
stream = Stream(<ALPACA_API_KEY>,
<ALPACA_SECRET_KEY>,
base_url=URL('https://paper-api.alpaca.markets'),
data_feed='iex',
websocket_params = {'ping_interval': 5}, #here we set ping_interval to 5 seconds
)
```

If you're curious [this link to their docs](https://websockets.readthedocs.io/en/stable/reference/client.html#opening-a-connection)
shows the values that websockets uses by default as well as any parameters they allow changing. Additionally, if you
don't specify any we set the following defaults on top of the ones the websockets library uses:
```python
{
"ping_interval": 10,
"ping_timeout": 180,
"max_queue": 1024,
}
```


## Account & Portfolio Management

The HTTP API document is located at https://docs.alpaca.markets/
Expand Down
61 changes: 45 additions & 16 deletions alpaca_trade_api/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections import defaultdict
import logging
import json
from typing import List, Optional
from typing import Dict, List, Optional
import msgpack
import re
import websockets
Expand Down Expand Up @@ -30,18 +30,26 @@

log = logging.getLogger(__name__)

# Default Params we pass to the websocket constructors
WEBSOCKET_DEFAULTS = {
"ping_interval": 10,
"ping_timeout": 180,
"max_queue": 1024,
}


def _ensure_coroutine(handler):
if not asyncio.iscoroutinefunction(handler):
raise ValueError('handler must be a coroutine function')


class _DataStream():
class _DataStream:
def __init__(self,
endpoint: str,
key_id: str,
secret_key: str,
raw_data: bool = False) -> None:
raw_data: bool = False,
websocket_params: Optional[Dict] = None) -> None:
self._endpoint = endpoint
self._key_id = key_id
self._secret_key = secret_key
Expand All @@ -60,14 +68,16 @@ def __init__(self,
self._name = 'data'
self._should_run = True
self._max_frame_size = 32768
self._websocket_params = websocket_params

if self._websocket_params is None:
self._websocket_params = WEBSOCKET_DEFAULTS

async def _connect(self):
self._ws = await websockets.connect(
self._endpoint,
extra_headers={'Content-Type': 'application/msgpack'},
ping_interval=10,
ping_timeout=180,
max_queue=1024,
**self._websocket_params
)
r = await self._ws.recv()
msg = msgpack.unpackb(r)
Expand Down Expand Up @@ -327,12 +337,14 @@ def __init__(self,
secret_key: str,
base_url: URL,
raw_data: bool,
feed: str = 'iex'):
feed: str = 'iex',
websocket_params: Optional[Dict] = None):
base_url = re.sub(r'^http', 'ws', base_url)
super().__init__(endpoint=base_url + '/v2/' + feed,
key_id=key_id,
secret_key=secret_key,
raw_data=raw_data,
websocket_params=websocket_params
)
self._handlers['statuses'] = {}
self._handlers['lulds'] = {}
Expand Down Expand Up @@ -453,7 +465,8 @@ def __init__(self,
secret_key: str,
base_url: URL,
raw_data: bool,
exchanges: Optional[List[str]] = None):
exchanges: Optional[List[str]] = None,
websocket_params: Optional[Dict] = None):
self._key_id = key_id
self._secret_key = secret_key
base_url = re.sub(r'^http', 'ws', base_url)
Expand All @@ -467,6 +480,7 @@ def __init__(self,
key_id=key_id,
secret_key=secret_key,
raw_data=raw_data,
websocket_params=websocket_params,
)
self._name = 'crypto data'

Expand All @@ -476,7 +490,8 @@ def __init__(self,
key_id: str,
secret_key: str,
base_url: URL,
raw_data: bool):
raw_data: bool,
websocket_params: Optional[Dict] = None):
self._key_id = key_id
self._secret_key = secret_key
base_url = re.sub(r'^http', 'ws', base_url)
Expand All @@ -485,6 +500,7 @@ def __init__(self,
key_id=key_id,
secret_key=secret_key,
raw_data=raw_data,
websocket_params=websocket_params
)
self._handlers = {
'news': {},
Expand Down Expand Up @@ -534,7 +550,8 @@ def __init__(self,
key_id: str,
secret_key: str,
base_url: URL,
raw_data: bool = False):
raw_data: bool = False,
websocket_params: Optional[Dict] = None):
self._key_id = key_id
self._secret_key = secret_key
base_url = re.sub(r'^http', 'ws', base_url)
Expand All @@ -546,9 +563,16 @@ def __init__(self,
self._raw_data = raw_data
self._stop_stream_queue = queue.Queue()
self._should_run = True
self._websocket_params = websocket_params

if self._websocket_params is None:
self._websocket_params = WEBSOCKET_DEFAULTS

async def _connect(self):
self._ws = await websockets.connect(self._endpoint)
self._ws = await websockets.connect(
self._endpoint,
**self._websocket_params
)

async def _auth(self):
await self._ws.send(
Expand Down Expand Up @@ -675,29 +699,34 @@ def __init__(self,
data_stream_url: URL = None,
data_feed: str = 'iex',
raw_data: bool = False,
crypto_exchanges: Optional[List[str]] = None):
crypto_exchanges: Optional[List[str]] = None,
websocket_params: Optional[Dict] = None):
self._key_id, self._secret_key, _ = get_credentials(key_id, secret_key)
self._base_url = base_url or get_base_url()
self._data_stream_url = data_stream_url or get_data_stream_url()

self._trading_ws = TradingStream(self._key_id,
self._secret_key,
self._base_url,
raw_data)
raw_data,
websocket_params=websocket_params)
self._data_ws = DataStream(self._key_id,
self._secret_key,
self._data_stream_url,
raw_data,
data_feed.lower())
data_feed.lower(),
websocket_params=websocket_params)
self._crypto_ws = CryptoDataStream(self._key_id,
self._secret_key,
self._data_stream_url,
raw_data,
crypto_exchanges)
crypto_exchanges,
websocket_params=websocket_params)
self._news_ws = NewsDataStream(self._key_id,
self._secret_key,
self._data_stream_url,
raw_data)
raw_data,
websocket_params=websocket_params)

def subscribe_trade_updates(self, handler):
self._trading_ws.subscribe_trade_updates(handler)
Expand Down

0 comments on commit 9c1da14

Please sign in to comment.