Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

# Fixed the "TypeError: cannot pickle '_thread.lock' object" issue. T… #7

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions optionchain_stream/instrument_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def __init__(self, api_key):
self.fno_file = 'https://archives.nseindia.com/content/fo/fo_mktlots.csv'
self.kite = KiteConnect(api_key=api_key)
self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
self.redis_db = InstrumentDumpFetch()
self.redis_db = InstrumentDumpFetch.get_conn()
self.counter = 0


def filter_redis_dump(self):
Expand Down Expand Up @@ -51,30 +52,32 @@ def filter_redis_dump(self):
elif 'LIMITED' in underlying_name:
underlying_name = underlying_name.replace('LIMITED', '')

optionInstrument.append({'symbol':row['SYMBOL '].rstrip(),
optionInstrument.append({'symbol':row['SYMBOL '].rstrip(),
'underlying':underlying_name.rstrip()})

for optContract in optionInstrument:
# Create list of strike price for specific symbol
contractToken[optContract['symbol']] = []
for contract in instruments:
if ((contract['name'] == optContract['symbol'] or \
contract['tradingsymbol'] == optContract['symbol'] or \
contract['name'] == optContract['underlying']) and \
(contract['segment'] == 'NFO-OPT' or
(contract['segment'] == 'NFO-OPT' or
(contract['instrument_type'] == 'EQ' and contract['exchange'] == 'NSE'))):
contractToken[optContract['symbol']].append({'strike':contract['strike'],
'type':contract['instrument_type'], 'expiry':str(contract['expiry']),
'token':contract['instrument_token']})
self.redis_db = InstrumentDumpFetch.get_conn()
self.redis_db.data_dump(optContract['symbol'], contractToken[optContract['symbol']])

def fetch_contract(self, symbol, expiry, underlying):
"""
Fetch strike and token detail for requested symbol
Param symbol:(string) - Option contract symbol
"""
token_list = []
# Fetch all active opt and EQ contracts for requested symbol
self.redis_db = InstrumentDumpFetch.get_conn()
optionData = self.redis_db.symbol_data(symbol)
for strike_detail in optionData:
# For underlying request fetch both EQ and option contracts
Expand All @@ -87,23 +90,26 @@ def fetch_contract(self, symbol, expiry, underlying):
if strike_detail['expiry'] == str(expiry):
token_list.append(strike_detail['token'])
return token_list

def fetch_token_detail(self, token):
"""
Fetch contract name for requested instrument token
Param token:(integer) - Instrument token
Param token:(integer) - Instrument token
"""
self.redis_db = InstrumentDumpFetch.get_conn()
return self.redis_db.fetch_token(token)

def store_option_data(self, tradingsymbol, token, optionData):
"""
Store option chain data for requested symbol
Param tradingsymbol:(string) - Option contract symbol
Param token:(string) - Expiry date of the requested symbol
Param optionData:(string) - Complete data dump for required option symbol
"""

self.redis_db = InstrumentDumpFetch.get_conn()
return self.redis_db.store_optiondata(tradingsymbol, token, optionData)

def generate_optionChain(self, token_list):
"""
Fetch all option contracts for requested symbol
Expand All @@ -112,8 +118,13 @@ def generate_optionChain(self, token_list):
optionChain = []
# Iterate though list of tokens for respective symbol and fetch respective strike data
for instrumentToken in token_list:
self.counter += 1
# print("counter = ",(self.counter))
# print('parent process id:', os.getppid())
# print('Internal process id:', os.getpid())
# Fetch market depth data for respective strike
optionInstrument = self.fetch_token_detail(instrumentToken)
self.redis_db = InstrumentDumpFetch.get_conn()
optionData = self.redis_db.fetch_option_data(optionInstrument['symbol'], instrumentToken)
optionChain.append(optionData)
return optionChain
18 changes: 12 additions & 6 deletions optionchain_stream/option_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
from optionchain_stream.websocket import WebsocketClient
from optionchain_stream.instrument_file import InstrumentMaster


class OptionChain():
"""
Wrapper class to fetch option chain steaming data
"""
def __init__(self, symbol, expiry, api_key, api_secret=None, request_token=None, access_token=None, underlying=False):

def __init__(self, symbol, expiry, api_key, api_secret=None, request_token=None, access_token=None,
underlying=False):
self.symbol = symbol
self.expiry = expiry
self.api_key = api_key
Expand All @@ -24,10 +27,10 @@ def sync_instruments(self):
Sync master instrument to redis
"""
self.instrumentClass.filter_redis_dump()

def create_option_chain(self):
"""
Wrapper method to fetch sreaming option chain for requested symbol/expiry
Wrapper method to fetch streaming option chain for requested symbol/expiry
"""
# Assign/generate access_token using request_token and api_secret
if self.api_secret and self.request_token:
Expand All @@ -36,10 +39,13 @@ def create_option_chain(self):
self.access_token = self.data["access_token"]
elif self.access_token:
self.access_token = self.access_token

self.socketClient = WebsocketClient(self.symbol, self.expiry, self.api_key, self.access_token, self.underlying)
try:
self.socketClient = WebsocketClient(self.symbol, self.expiry, self.api_key, self.access_token, self.underlying)
except Exception as e:
print('%s Order modify to market failed: %s', self.socketClient, str(e))
raise Exception(str(e))
# create streaming websocket data
self.socketClient.queue_callBacks()
# Keep fetching streaming Queue
while 1:
yield self.socketClient.q.get()
yield self.socketClient.q.get()
54 changes: 38 additions & 16 deletions optionchain_stream/redis_instrument.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
@author: rakeshr
"""

"""
Dump all exchange instruments/contract data to Redis
Retrive strike and instrument token detail from redis for each symbol search
Expand All @@ -10,43 +11,63 @@
import json

class InstrumentDumpFetch():

def __init__(self):

__conn = None

@staticmethod
def __init__():
# Default redis port connection
# Port no and host be edited by user or will make both as acceptable argument in later release
self.conn = redis.StrictRedis(host='localhost', port=6379)
# Port no and host be edited by user or will make both as acceptable argument in later release
print('This is init')
if InstrumentDumpFetch.__conn != None:
raise Exception("This class is a singleton!")
else:
InstrumentDumpFetch.__conn = redis.Redis(host='localhost', port=6379)


def data_dump(self, symbol, instrument_data):
@staticmethod
def get_conn():
if InstrumentDumpFetch.__conn == None:
InstrumentDumpFetch()
return InstrumentDumpFetch

@staticmethod
def data_dump(symbol, instrument_data):
"""
Dump specific exchange complete instrument data
Param symbol:(string) - Option contract symbol
Param instrument_data:(dictionary) - List of dict for specific option contract containing all strike, etc
Param instrument_data:(dictionary) - List of dict for specific option contract containing all strike, etc
"""
self.conn.set(symbol, json.dumps(instrument_data))

def symbol_data(self, symbol):
InstrumentDumpFetch.__conn.set(symbol, json.dumps(instrument_data))


@staticmethod
def symbol_data(symbol):
"""
Return instrument detail for required symbol
Param symbol:(string) - Option contract symbol to be searched
"""
try:
contract_detail = json.loads(self.conn.get(symbol))
contract_detail = json.loads(InstrumentDumpFetch.__conn.get(symbol))
except TypeError:
raise Exception('Key not found - {}'.format(symbol))
return contract_detail

def fetch_token(self, token):
@staticmethod
def fetch_token(token):
"""
Fetch contract name for requested instrument token
Param token:(integer) - Instrument token
Param token:(integer) - Instrument token
"""
try:
token_instrument = json.loads(self.conn.get(token))
token_instrument = json.loads(InstrumentDumpFetch.__conn.get(token))
except Exception as e:
raise Exception('Error {}'.format(e))
return token_instrument

def store_optiondata(self, tradingsymbol, token, optionData):
@staticmethod
def store_optiondata(tradingsymbol, token, optionData):
"""
Store option chain data for requested symbol
Param symbol:(string) - Option contract symbol
Expand All @@ -55,19 +76,20 @@ def store_optiondata(self, tradingsymbol, token, optionData):
"""
optionChainKey = '{}:{}'.format(tradingsymbol, token)
try:
self.conn.set(optionChainKey, json.dumps(optionData))
InstrumentDumpFetch.__conn.set(optionChainKey, json.dumps(optionData))
except Exception as e:
raise Exception('Error - {}'.format(e))

def fetch_option_data(self, tradingsymbol, token):
@staticmethod
def fetch_option_data(tradingsymbol, token):
"""
Fetch stored option data
Param symbol:(string) - Option contract symbol
Param token:(integer) - Instrument token
"""
optionContractKey = '{}:{}'.format(tradingsymbol, token)
try:
token_data = json.loads(self.conn.get(optionContractKey))
token_data = json.loads(InstrumentDumpFetch.__conn.get(optionContractKey))
except Exception as e:
raise Exception('Error - {}'.format(e))
return token_data
28 changes: 16 additions & 12 deletions optionchain_stream/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,46 @@
"""

import logging, time
from multiprocessing import Process, Queue
#from multiprocessing import Process, Queue, freeze_support
from kiteconnect import KiteTicker
from optionchain_stream.instrument_file import InstrumentMaster
import multiprocessing as mp


class WebsocketClient:
def __init__(self, symbol, expiry, api_key, acess_token, underlying):
mp.freeze_support()
# Create kite ticker instance
self.kws = KiteTicker(api_key, acess_token, debug=True)
self.symbol = symbol
self.expiry = expiry
self.underlying = underlying
self.instrumentClass = InstrumentMaster(api_key)
self.token_list = self.instrumentClass.fetch_contract(self.symbol, str(self.expiry), self.underlying)
self.q = Queue()
self.q = mp.Queue()

def form_option_chain(self, q):
"""
Wrapper method around fetch and create option chain
"""
while 1:
complete_option_data = self.instrumentClass.generate_optionChain(self.token_list)
# Store queue data
q.put(complete_option_data)

def on_ticks(self, ws, ticks):
"""
Push each tick to DB
"""
"""
for tick in ticks:
contract_detail = self.instrumentClass.fetch_token_detail(tick['instrument_token'])
# For EQ underlying instrument don't fetch OI and volume(for INDICES) value
if contract_detail['type'] == 'EQ':
optionData = {'token':tick['instrument_token'], 'symbol':contract_detail['symbol'],
'last_price':tick['last_price'], 'change':tick['change']}
optionData = {'token': tick['instrument_token'], 'symbol': contract_detail['symbol'],
'last_price': tick['last_price'], 'change': tick['change']}
else:
optionData = {'token':tick['instrument_token'], 'symbol':contract_detail['symbol'],
'last_price':tick['last_price'], 'volume':tick['volume'], 'change':tick['change'],
'oi':tick['oi']}
optionData = {'token': tick['instrument_token'], 'symbol': contract_detail['symbol'],
'last_price': tick['last_price'], 'volume': tick['volume'], 'change': tick['change'],
'oi': tick['oi']}

# Store each tick to redis with symbol and token as key pair
self.instrumentClass.store_option_data(contract_detail['symbol'], tick['instrument_token'], optionData)
Expand All @@ -65,7 +66,7 @@ def on_noreconnect(self, ws):

def on_reconnect(self, ws, attempt_count):
logging.debug("Reconnecting the websocket: {}".format(attempt_count))

def assign_callBacks(self):
# Assign all the callbacks
self.kws.on_ticks = self.on_ticks
Expand All @@ -80,11 +81,14 @@ def queue_callBacks(self):
"""
Wrapper around ticker callbacks with multiprocess Queue
"""
mp.set_start_method('spawn', force=True)
print('Queue Call backs')

# Process to keep updating real time tick to DB
Process(target=self.assign_callBacks,).start()
mp.Process(target=self.assign_callBacks, args=()).start()
# Delay to let intial instrument DB sync
# For option chain to fetch value
# Required only during initial run
time.sleep(2)
# Process to fetch option chain in real time from Redis
Process(target=self.form_option_chain,args=(self.q, )).start()
mp.Process(target=self.form_option_chain, args=(self.q,)).start()