Skip to content

Commit

Permalink
update bundle自动更新组件
Browse files Browse the repository at this point in the history
  • Loading branch information
Lin-Dongzhao committed Mar 14, 2024
1 parent de75f46 commit 51ff598
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 31 deletions.
5 changes: 5 additions & 0 deletions rqalpha/data/base_data_source/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,8 @@ def history_ticks(self, instrument, count, dt):

def get_algo_bar(self, id_or_ins, start_min, end_min, dt):
raise NotImplementedError("open source rqalpha not support algo order")

def get_open_auction_volume(self, instrument, dt):
# type: (Instrument, datetime.datetime) -> float
volume = self.get_open_auction_bar(instrument, dt)['volume']
return volume
69 changes: 42 additions & 27 deletions rqalpha/data/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from rqalpha.utils.i18n import gettext as _
from rqalpha.utils.logger import system_log
from rqalpha.const import INSTRUMENT_TYPE
from rqalpha.utils.functools import lru_cache

START_DATE = 20050104
END_DATE = 29991231
Expand Down Expand Up @@ -629,52 +630,66 @@ def update_futures_trading_parameters(path, end_date):


class AutomaticUpdateBundle(object):
def __init__(self, path):
# type: (str, str) -> None
self.path = path
def __init__(self, path, filename, rqdata_api, fields):
# type: (str, str, str, List[str]) -> None
self._file = os.path.join(path, filename)
self._trading_dates = None
self._filename = filename
self._rqdata_api = rqdata_api
self._fields = fields
try:
import rqdatac
self.rqdata_init = True
except ImportError:
system_log.info(_("RQData is not installed, relevant data cannot be updated automatically, and some functions will be limited."))
self.rqdata_init = False

def get_data(self, instrument, dt):
# type: (Instrument, datetime.datetime) -> numpy.ndarray or None
dt = convert_date_to_date_int(dt)
data = self._get_data_all_time(instrument)
if data is None:
return data
else:
data = data[data['datetime'] == dt]
return data

@lru_cache(128)
def _get_data_all_time(self, instrument):
# type: (Instrument) -> numpy.ndarray or None
self._auto_update_task(instrument)
with h5py.File(self._file, "r") as h5:
try:
data = h5[instrument.order_book_id][:]
except KeyError:
return None
return data

def auto_update_task(self, filename, instrument, rqdata_api, fields):
def _auto_update_task(self, instrument):
"""
在 rqalpha 策略运行过程中自动更新所需的数据
:param filename: 存储数据的 h5 文件
:type filename: `str`
在 rqalpha 策略运行过程中自动更新所需的日线数据
:param instrument: 合约对象
:type instrument: `Instrument`
:param rqdata_api: 更新数据所使用的 rqdatac API
:type rqdata_api: `str`
:param fields: 需要更新的数据字段,需与 rqdata_api 可调取的字段相匹配
:type fields: `List[str]`
"""
file = os.path.join(self.path, filename)
order_book_id = instrument.order_book_id
start_date = START_DATE
if not os.path.exists(file):
h5 = h5py.File(file, "w")
if not os.path.exists(self._file):
h5 = h5py.File(self._file, "w")
else:
try:
h5 = h5py.File(file, "a")
h5 = h5py.File(self._file, "a")
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(file))
"or you can delete then update again".format(self._file))
if order_book_id in h5:
last_date = datetime.datetime.strptime(str(h5[order_book_id][-1]['datetime']), "%Y%m%d").date()
if datetime.date.today() == last_date or rqdatac.get_previous_trading_date(datetime.date.today()) == last_date:
h5.close()
return
start_date = rqdatac.get_next_trading_date(last_date)
df = self._get_df(rqdata_api, instrument, start_date, fields)

df = self._get_df(instrument, start_date)
if not (df is None or df.empty):
if order_book_id in h5:
data = np.array(
Expand All @@ -685,18 +700,18 @@ def auto_update_task(self, filename, instrument, rqdata_api, fields):
h5.create_dataset(order_book_id, data=data)
else:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records())
h5.close()
h5.close()

def _get_df(self, rqdata_api, instrument, start_date, fields):
# type: (str, Instrument, datetime.date, List[str]) -> Dataframe
def _get_df(self, instrument, start_date):
# type: (Instrument, datetime.date) -> Dataframe
if instrument.type ==INSTRUMENT_TYPE.FUTURE and instrument.de_listed_date == datetime.datetime(2999, 12, 31):
command = "rqdatac.{}(dic['order_book_id'], dic['start_date'], dic['end_date'], {})".format(rqdata_api, fields)
command = "rqdatac.{}(dic['order_book_id'], dic['start_date'], dic['end_date'], {})".format(self._rqdata_api, self._fields)
df = self._get_dominant_df(command, instrument.underlying_symbol, start_date)
else:
command = "rqdatac.{}(instrument.order_book_id, start_date, datetime.date.today(), fields)".format(rqdata_api)
command = "rqdatac.{}(instrument.order_book_id, start_date, datetime.date.today(), self._fields)".format(self._rqdata_api)
df = eval(command)
if not (df is None or df.empty):
df = df[fields] # rqdatac.get_open_auction_info get Futures's data will auto add 'open_interest' and 'prev_settlement'
df = df[self._fields] # rqdatac.get_open_auction_info get Futures's data will auto add 'open_interest' and 'prev_settlement'
df.reset_index(inplace=True)
df["order_book_id"] = instrument.order_book_id
time_parameter = list(set(['datetime', 'date', 'trading_date']).intersection(set(df.columns)))[0]
Expand Down
5 changes: 1 addition & 4 deletions rqalpha/data/data_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,7 @@ def get_open_auction_bar(self, order_book_id, dt):

def get_open_auction_volume(self, order_book_id, dt):
instrument = self.instruments(order_book_id)
try:
volume = self._data_source.get_open_auction_volume(instrument, dt)
except NotImplementedError:
volume = self.get_open_auction_bar(order_book_id, dt).volume
volume = self._data_source.get_open_auction_volume(instrument, dt)
return volume

def history(self, order_book_id, bar_count, frequency, field, dt):
Expand Down
3 changes: 3 additions & 0 deletions tests/api_tests/mod/sys_simulation/test_simulation_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@

def test_open_auction_match():
__config__ = {
"base": {
"auto_update_bundle": False,
},
"mod": {
"sys_simulation": {
"volume_limit": True,
Expand Down

0 comments on commit 51ff598

Please sign in to comment.