diff --git a/optionchain_stream/instrument_file.py b/optionchain_stream/instrument_file.py index 57240ef..d3cd36c 100644 --- a/optionchain_stream/instrument_file.py +++ b/optionchain_stream/instrument_file.py @@ -17,7 +17,8 @@ def __init__(self, api_key): self.fno_file = 'https://archives.nseindia.com/content/fo/fo_mktlots.csv' self.kite = KiteConnect(api_key=api_key) self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'} - self.redis_db = InstrumentDumpFetch() + self.redis_db = InstrumentDumpFetch.get_conn() + self.counter = 0 def filter_redis_dump(self): @@ -51,9 +52,9 @@ def filter_redis_dump(self): elif 'LIMITED' in underlying_name: underlying_name = underlying_name.replace('LIMITED', '') - optionInstrument.append({'symbol':row['SYMBOL '].rstrip(), + optionInstrument.append({'symbol':row['SYMBOL '].rstrip(), 'underlying':underlying_name.rstrip()}) - + for optContract in optionInstrument: # Create list of strike price for specific symbol contractToken[optContract['symbol']] = [] @@ -61,13 +62,14 @@ def filter_redis_dump(self): if ((contract['name'] == optContract['symbol'] or \ contract['tradingsymbol'] == optContract['symbol'] or \ contract['name'] == optContract['underlying']) and \ - (contract['segment'] == 'NFO-OPT' or + (contract['segment'] == 'NFO-OPT' or (contract['instrument_type'] == 'EQ' and contract['exchange'] == 'NSE'))): contractToken[optContract['symbol']].append({'strike':contract['strike'], 'type':contract['instrument_type'], 'expiry':str(contract['expiry']), 'token':contract['instrument_token']}) + self.redis_db = InstrumentDumpFetch.get_conn() self.redis_db.data_dump(optContract['symbol'], contractToken[optContract['symbol']]) - + def fetch_contract(self, symbol, expiry, underlying): """ Fetch strike and token detail for requested symbol @@ -75,6 +77,7 @@ def fetch_contract(self, symbol, expiry, underlying): """ token_list = [] # Fetch all active opt and EQ contracts for requested symbol + self.redis_db = InstrumentDumpFetch.get_conn() optionData = self.redis_db.symbol_data(symbol) for strike_detail in optionData: # For underlying request fetch both EQ and option contracts @@ -87,14 +90,15 @@ def fetch_contract(self, symbol, expiry, underlying): if strike_detail['expiry'] == str(expiry): token_list.append(strike_detail['token']) return token_list - + def fetch_token_detail(self, token): """ Fetch contract name for requested instrument token - Param token:(integer) - Instrument token + Param token:(integer) - Instrument token """ + self.redis_db = InstrumentDumpFetch.get_conn() return self.redis_db.fetch_token(token) - + def store_option_data(self, tradingsymbol, token, optionData): """ Store option chain data for requested symbol @@ -102,8 +106,10 @@ def store_option_data(self, tradingsymbol, token, optionData): Param token:(string) - Expiry date of the requested symbol Param optionData:(string) - Complete data dump for required option symbol """ + + self.redis_db = InstrumentDumpFetch.get_conn() return self.redis_db.store_optiondata(tradingsymbol, token, optionData) - + def generate_optionChain(self, token_list): """ Fetch all option contracts for requested symbol @@ -112,8 +118,13 @@ def generate_optionChain(self, token_list): optionChain = [] # Iterate though list of tokens for respective symbol and fetch respective strike data for instrumentToken in token_list: + self.counter += 1 + # print("counter = ",(self.counter)) + # print('parent process id:', os.getppid()) + # print('Internal process id:', os.getpid()) # Fetch market depth data for respective strike optionInstrument = self.fetch_token_detail(instrumentToken) + self.redis_db = InstrumentDumpFetch.get_conn() optionData = self.redis_db.fetch_option_data(optionInstrument['symbol'], instrumentToken) optionChain.append(optionData) return optionChain \ No newline at end of file diff --git a/optionchain_stream/option_chain.py b/optionchain_stream/option_chain.py index e6eff4e..dc12cd8 100644 --- a/optionchain_stream/option_chain.py +++ b/optionchain_stream/option_chain.py @@ -5,11 +5,14 @@ from optionchain_stream.websocket import WebsocketClient from optionchain_stream.instrument_file import InstrumentMaster + class OptionChain(): """ Wrapper class to fetch option chain steaming data """ - def __init__(self, symbol, expiry, api_key, api_secret=None, request_token=None, access_token=None, underlying=False): + + def __init__(self, symbol, expiry, api_key, api_secret=None, request_token=None, access_token=None, + underlying=False): self.symbol = symbol self.expiry = expiry self.api_key = api_key @@ -24,10 +27,10 @@ def sync_instruments(self): Sync master instrument to redis """ self.instrumentClass.filter_redis_dump() - + def create_option_chain(self): """ - Wrapper method to fetch sreaming option chain for requested symbol/expiry + Wrapper method to fetch streaming option chain for requested symbol/expiry """ # Assign/generate access_token using request_token and api_secret if self.api_secret and self.request_token: @@ -36,10 +39,13 @@ def create_option_chain(self): self.access_token = self.data["access_token"] elif self.access_token: self.access_token = self.access_token - - self.socketClient = WebsocketClient(self.symbol, self.expiry, self.api_key, self.access_token, self.underlying) + try: + self.socketClient = WebsocketClient(self.symbol, self.expiry, self.api_key, self.access_token, self.underlying) + except Exception as e: + print('%s Order modify to market failed: %s', self.socketClient, str(e)) + raise Exception(str(e)) # create streaming websocket data self.socketClient.queue_callBacks() # Keep fetching streaming Queue while 1: - yield self.socketClient.q.get() \ No newline at end of file + yield self.socketClient.q.get() diff --git a/optionchain_stream/redis_instrument.py b/optionchain_stream/redis_instrument.py index de82f20..3268a72 100644 --- a/optionchain_stream/redis_instrument.py +++ b/optionchain_stream/redis_instrument.py @@ -1,6 +1,7 @@ """ @author: rakeshr """ + """ Dump all exchange instruments/contract data to Redis Retrive strike and instrument token detail from redis for each symbol search @@ -10,43 +11,63 @@ import json class InstrumentDumpFetch(): - - def __init__(self): + + __conn = None + + @staticmethod + def __init__(): # Default redis port connection - # Port no and host be edited by user or will make both as acceptable argument in later release - self.conn = redis.StrictRedis(host='localhost', port=6379) + # Port no and host be edited by user or will make both as acceptable argument in later release + print('This is init') + if InstrumentDumpFetch.__conn != None: + raise Exception("This class is a singleton!") + else: + InstrumentDumpFetch.__conn = redis.Redis(host='localhost', port=6379) + - def data_dump(self, symbol, instrument_data): + @staticmethod + def get_conn(): + if InstrumentDumpFetch.__conn == None: + InstrumentDumpFetch() + return InstrumentDumpFetch + + @staticmethod + def data_dump(symbol, instrument_data): """ Dump specific exchange complete instrument data Param symbol:(string) - Option contract symbol - Param instrument_data:(dictionary) - List of dict for specific option contract containing all strike, etc + Param instrument_data:(dictionary) - List of dict for specific option contract containing all strike, etc """ - self.conn.set(symbol, json.dumps(instrument_data)) - def symbol_data(self, symbol): + InstrumentDumpFetch.__conn.set(symbol, json.dumps(instrument_data)) + + + @staticmethod + def symbol_data(symbol): """ Return instrument detail for required symbol Param symbol:(string) - Option contract symbol to be searched """ try: - contract_detail = json.loads(self.conn.get(symbol)) + contract_detail = json.loads(InstrumentDumpFetch.__conn.get(symbol)) except TypeError: raise Exception('Key not found - {}'.format(symbol)) return contract_detail - def fetch_token(self, token): + @staticmethod + def fetch_token(token): """ Fetch contract name for requested instrument token - Param token:(integer) - Instrument token + Param token:(integer) - Instrument token """ try: - token_instrument = json.loads(self.conn.get(token)) + token_instrument = json.loads(InstrumentDumpFetch.__conn.get(token)) except Exception as e: raise Exception('Error {}'.format(e)) return token_instrument - def store_optiondata(self, tradingsymbol, token, optionData): + @staticmethod + def store_optiondata(tradingsymbol, token, optionData): """ Store option chain data for requested symbol Param symbol:(string) - Option contract symbol @@ -55,11 +76,12 @@ def store_optiondata(self, tradingsymbol, token, optionData): """ optionChainKey = '{}:{}'.format(tradingsymbol, token) try: - self.conn.set(optionChainKey, json.dumps(optionData)) + InstrumentDumpFetch.__conn.set(optionChainKey, json.dumps(optionData)) except Exception as e: raise Exception('Error - {}'.format(e)) - def fetch_option_data(self, tradingsymbol, token): + @staticmethod + def fetch_option_data(tradingsymbol, token): """ Fetch stored option data Param symbol:(string) - Option contract symbol @@ -67,7 +89,7 @@ def fetch_option_data(self, tradingsymbol, token): """ optionContractKey = '{}:{}'.format(tradingsymbol, token) try: - token_data = json.loads(self.conn.get(optionContractKey)) + token_data = json.loads(InstrumentDumpFetch.__conn.get(optionContractKey)) except Exception as e: raise Exception('Error - {}'.format(e)) return token_data \ No newline at end of file diff --git a/optionchain_stream/websocket.py b/optionchain_stream/websocket.py index c231a13..be11e17 100644 --- a/optionchain_stream/websocket.py +++ b/optionchain_stream/websocket.py @@ -7,13 +7,15 @@ """ import logging, time -from multiprocessing import Process, Queue +#from multiprocessing import Process, Queue, freeze_support from kiteconnect import KiteTicker from optionchain_stream.instrument_file import InstrumentMaster +import multiprocessing as mp class WebsocketClient: def __init__(self, symbol, expiry, api_key, acess_token, underlying): + mp.freeze_support() # Create kite ticker instance self.kws = KiteTicker(api_key, acess_token, debug=True) self.symbol = symbol @@ -21,7 +23,7 @@ def __init__(self, symbol, expiry, api_key, acess_token, underlying): self.underlying = underlying self.instrumentClass = InstrumentMaster(api_key) self.token_list = self.instrumentClass.fetch_contract(self.symbol, str(self.expiry), self.underlying) - self.q = Queue() + self.q = mp.Queue() def form_option_chain(self, q): """ @@ -29,23 +31,22 @@ def form_option_chain(self, q): """ while 1: complete_option_data = self.instrumentClass.generate_optionChain(self.token_list) - # Store queue data q.put(complete_option_data) def on_ticks(self, ws, ticks): """ Push each tick to DB - """ + """ for tick in ticks: contract_detail = self.instrumentClass.fetch_token_detail(tick['instrument_token']) # For EQ underlying instrument don't fetch OI and volume(for INDICES) value if contract_detail['type'] == 'EQ': - optionData = {'token':tick['instrument_token'], 'symbol':contract_detail['symbol'], - 'last_price':tick['last_price'], 'change':tick['change']} + optionData = {'token': tick['instrument_token'], 'symbol': contract_detail['symbol'], + 'last_price': tick['last_price'], 'change': tick['change']} else: - optionData = {'token':tick['instrument_token'], 'symbol':contract_detail['symbol'], - 'last_price':tick['last_price'], 'volume':tick['volume'], 'change':tick['change'], - 'oi':tick['oi']} + optionData = {'token': tick['instrument_token'], 'symbol': contract_detail['symbol'], + 'last_price': tick['last_price'], 'volume': tick['volume'], 'change': tick['change'], + 'oi': tick['oi']} # Store each tick to redis with symbol and token as key pair self.instrumentClass.store_option_data(contract_detail['symbol'], tick['instrument_token'], optionData) @@ -65,7 +66,7 @@ def on_noreconnect(self, ws): def on_reconnect(self, ws, attempt_count): logging.debug("Reconnecting the websocket: {}".format(attempt_count)) - + def assign_callBacks(self): # Assign all the callbacks self.kws.on_ticks = self.on_ticks @@ -80,11 +81,14 @@ def queue_callBacks(self): """ Wrapper around ticker callbacks with multiprocess Queue """ + mp.set_start_method('spawn', force=True) + print('Queue Call backs') + # Process to keep updating real time tick to DB - Process(target=self.assign_callBacks,).start() + mp.Process(target=self.assign_callBacks, args=()).start() # Delay to let intial instrument DB sync # For option chain to fetch value # Required only during initial run time.sleep(2) # Process to fetch option chain in real time from Redis - Process(target=self.form_option_chain,args=(self.q, )).start() \ No newline at end of file + mp.Process(target=self.form_option_chain, args=(self.q,)).start()