Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sometimes, why the session halts? #41

Open
hzadonis opened this issue Jun 15, 2018 · 9 comments
Open

Sometimes, why the session halts? #41

hzadonis opened this issue Jun 15, 2018 · 9 comments

Comments

@hzadonis
Copy link

Hi, Masters:
These days, I met a issue. Sometimes, the pyrfa session halts to receive data. But this is not always happens, yesterday it worked smoothly without problem. But today, it had been halt 2 times.
At the same time, I check data at TREP with the same RIC, the data is always refreshes. Do you have any suggestion? Thanks!

Regards
Zheng

@wiwat-tharateeraparb
Copy link
Contributor

Hi, make sure you call dispatchEventQueue() fast enough.

@hzadonis
Copy link
Author

dispatchEventQueue(100)
Does it mean the interval is 0.1 second? Is it fast enough? Actually, I subscribed 10 RICs, and every one is a thread. If it passes test, I'll ask more than 3000 RICs.

@wiwat-tharateeraparb
Copy link
Contributor

100 means it will block for 100ms and if there is data it will dispatch from queue if no data during 100ms it will then return.

PyRFA is thread-safe but not sure what happned as I don’t see your program. Anything in the log file?

@hzadonis
Copy link
Author

import pyrfa
import logging
import logging.handlers
import datetime
import numpy
import threading
import sys
import datetime
import time
import csv

from qpython import qconnection
from qpython.qcollection import qlist
from qpython.qtype import QException, QTIMESTAMP_LIST, QTIMESPAN_LIST, QSYMBOL_LIST, QFLOAT_LIST, QINT_LIST

class PublisherThread(threading.Thread): #PublisherThread类为从threading继承的子类

def __init__(self, pubtrdq, pubtrdp, subric):
    super(PublisherThread, self).__init__()
    self.q = pubtrdq
    self.p = pubtrdp
    self.ric = subric
    self._stopper = threading.Event()
    # 在这里还可以定义类中其他的属性,比如self.name、self.price等,也是全局变量可供类中的其他方法直接使用。

def stop(self):
    self._stopper.set()

def stopped(self):
    return self._stopper.isSet()

def run(self):
    while not self.stopped():
        try:
            # publish data to TickerPlant, 将数据发往TickerPlant
            print(self.ric + ' subscribing to TREP...')
            self.get_marketPrice_from_trep()    # 通过该方法获取实时成交的交易数据,历史原因不要被marketPrice这个名字骗了哦!
            #self.get_marketByOrder_from_trep()  # 通过该方法获取实时报价数据。
        except QException as e:
            print('>*< excepted')
            print(e)
            logger.info(e)
        except:
            self.stop()

def get_marketPrice_from_trep(self):
    # 定义所需查看的字段:
    self.p.setView('DSPLY_NAME, PRCTCK_1, TRADE_DATE, SALTIM_MS, TRDPRC_1, TRDVOL_1')
    # 若要查看所有的字段:
    # self.p.setView()
    self.p.marketPriceRequest(self.ric) # 所订阅的RIC以参数的形式传入
    end = False
    while not end:
        try:
            updates = self.p.dispatchEventQueue(100)
        except KeyboardInterrupt:
            end = True
        if updates:
            print("")
            for u in updates:
                ric = [u['RIC']]
                if u['MTYPE'] == 'REFRESH':
                    # 测试过程中发现,如果MTYPE是REFRESH时,相应信息只有ServiceName和RIC字段,除此之外并无其他有用的数据,故而不需要Insert到kdb+中
                    update_able = False
                else: # 当MTYPE不为REFRESH时,才需要Insert到kdb+中
                    update_able = True
                for k, v in u.items():  # 可以将k理解为FieldName,v为其值
                    fid = self.p.getFieldID(k)  # 通过FieldName获取其FID
                    if fid == 14:   # PRCTCK_1域,可以用该域来判断买卖方向
                        if v == '⇩':
                            direction = ['SELL']
                        elif v == '⇧':
                            direction = ['BUY']
                        else:
                            direction = ['BUY']
                    elif fid == 16:      # TRADE_DATE域,该域为字符串格式,str类型,例如:'31 MAY 2018'。要将其转换为时间戳格式
                        quotedate = datetime.datetime.fromtimestamp(int(time.mktime(time.strptime(v, "%d %b %Y")))).strftime("%Y-%m-%d")
                    elif fid == 6:    # TRDPRC_1域
                        price = [numpy.float32(v)]
                    elif fid == 178:  # TRDVOL_1域
                        volume = [numpy.int(v)]
                    elif fid == 3854:    # SALTIM_MS域,该域为时间戳格式,int类型,例如:20526000
                        quotetime = datetime.datetime.utcfromtimestamp(v / 1000).strftime("%H:%M:%S.%f")    # 将时间戳格式的QUOTIM_MS值转为GMT时间
                    else:
                        pass

                    '''
                    # 不需要时可以屏蔽以下日志输出段:
                    if update_able:
                        if type(v) is float:  # 如果v的值为float类型
                            print("%20s %g" % (k + ' (' + str(fid) + ')', v))  # %20s:将k格式化为20位字符长度;%g:浮点数字(根据值的大小采用%e或%f)
                            logger.info("%20s %g" % (k + ' (' + str(fid) + ')', v))  # 将k, v值写到日志中
                        else:  # 否则:
                            print("%20s %s" % (k + ' (' + str(fid) + ')', v))  # %20s:将k格式化为20位字符长度;%s:字符串
                            logger.info("%20s %s" % (k + ' (' + str(fid) + ')', v))  # 将k, v值写到日志中
                    else:
                        pass
                    '''

                if update_able:
                    quotestamp = [numpy.datetime64(quotedate + 'T' + quotetime, 'ns')]
                    pkgdata = [qlist(quotestamp, qtype=QTIMESTAMP_LIST), qlist(ric, qtype=QSYMBOL_LIST),
                               qlist(direction, qtype=QSYMBOL_LIST),
                               qlist(price, qtype=QFLOAT_LIST), qlist(volume, qtype=QINT_LIST)]
                    print(pkgdata)
                    self.q.sync('updtrade', numpy.string_('trade'), pkgdata)   # 将pkgdata插入到trade表中
                    # insert数据库后马上清空price, volume的值,避免后续数据在没有更新的情况下与RIC结合产生紊乱的情况。
                    price = [numpy.float32()]
                    volume = [numpy.int()]
                else:
                    pass
                    # logger.info('There is REFRESH tag, maybe no active realtime data.')

if name == 'main':
global logger # 将logger定义为全局变量,便于在整个程序架构内进行数据操作写日志时调用
LOG_FILE = '../log/RTFeed.log'

# handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes=1024*1024, backupCount=5, encoding='utf-8') # 实例化handler,考虑到特殊字符需要设置encoding为utf-8
handler = logging.handlers.RotatingFileHandler(LOG_FILE, encoding='utf-8')  # 实例化handler,考虑到特殊字符需要设置encoding为utf-8
fmt = '%(asctime)s - %(filename)s:%(lineno)s - %(name)s - %(message)s'

formatter = logging.Formatter(fmt)  # 实例化formatter
handler.setFormatter(formatter)  # 为handler添加formatter

logger = logging.getLogger('RTFeed')  # 获取名为RTFeed的logger
logger.addHandler(handler)  # 为logger添加handler
logger.setLevel(logging.INFO)

logger.info('RTFeed is about to run and write log file.')

try:
    # PyRFA连接TREP:
    p = pyrfa.Pyrfa()
    p.createConfigDb("../control/RTFeed.cfg")  # 指定配置文件
    p.acquireSession("Session1")  # 指定读配置文件哪个Session节的配置
    p.createOMMConsumer()
    p.login()
    p.directoryRequest()
    p.dictionaryRequest()

    # 连接TP,在owntick示例中TP运行在本机的8099端口上
    with qconnection.QConnection(host='localhost', port=8099) as q:
        print('^oo^ is running...')
        #print(q)   # 将打印“:localhost:8099”
        print('IPC Version: %s. Is connected: %s' % (q.protocol_version, q.is_connected()))
        print('Press <ENTER> to close application')
        print('TickerPlant Server port 8099 connected.')
        logger.info('TickerPlant Server port 8099 connected.')

        # 读取所需订阅数据的RIC列表然后将其作为所调用线程的参数:
        csv_file = csv.reader(open('../control/AShareList.csv', 'r'))
        for ric in csv_file:
            t = PublisherThread(q, p, ric[0])   # 参数说明:q为qconnection对象、p为pyrfa对象、ric[0]为RIClist.csv文件中定义的各RIC名。
            t.start()

        sys.stdin.readline()

        t.stop()
        t.join()
except Exception:
    logger.info('TickerPlant Server port 8099 NOT connected! Exit RTFeed.')
finally:
    # Terminate the connection to TP:
    q.close()
    # Close subscribe to TREP:
    p.marketPriceCloseAllRequest()
    p.marketByOrderCloseAllRequest()
    p.marketByPriceCloseAllRequest()

@hzadonis
Copy link
Author

Once the problem occurred, then the process seems totally halt, no output logs.

@wiwat-tharateeraparb
Copy link
Contributor

It may have nothing to do with dispatchEventQueue after all. Can you try/catch get_marketPrice_from_trep() to see if there is any error parsing the incoming updates?

@wiwat-tharateeraparb
Copy link
Contributor

Or try dispatchEventQueue() with empty timeout. This will not block but return immediately.

@hzadonis
Copy link
Author

Thanks, Let me try that.

@hzadonis
Copy link
Author

I ran the process with dispatchEventQueue() for 2 days, but no halt issue happens.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants