From 51ff598abf59d7e3f7d013fe502e38167187a9e8 Mon Sep 17 00:00:00 2001 From: "lin.dongzhao" <542698096@qq.com> Date: Thu, 14 Mar 2024 15:13:02 +0800 Subject: [PATCH] =?UTF-8?q?update=20bundle=E8=87=AA=E5=8A=A8=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rqalpha/data/base_data_source/data_source.py | 5 ++ rqalpha/data/bundle.py | 69 +++++++++++-------- rqalpha/data/data_proxy.py | 5 +- .../sys_simulation/test_simulation_broker.py | 3 + 4 files changed, 51 insertions(+), 31 deletions(-) diff --git a/rqalpha/data/base_data_source/data_source.py b/rqalpha/data/base_data_source/data_source.py index b5118ee51..62145e33d 100644 --- a/rqalpha/data/base_data_source/data_source.py +++ b/rqalpha/data/base_data_source/data_source.py @@ -400,3 +400,8 @@ def history_ticks(self, instrument, count, dt): def get_algo_bar(self, id_or_ins, start_min, end_min, dt): raise NotImplementedError("open source rqalpha not support algo order") + + def get_open_auction_volume(self, instrument, dt): + # type: (Instrument, datetime.datetime) -> float + volume = self.get_open_auction_bar(instrument, dt)['volume'] + return volume diff --git a/rqalpha/data/bundle.py b/rqalpha/data/bundle.py index 5e72c464d..a15fe79be 100644 --- a/rqalpha/data/bundle.py +++ b/rqalpha/data/bundle.py @@ -31,6 +31,7 @@ from rqalpha.utils.i18n import gettext as _ from rqalpha.utils.logger import system_log from rqalpha.const import INSTRUMENT_TYPE +from rqalpha.utils.functools import lru_cache START_DATE = 20050104 END_DATE = 29991231 @@ -629,52 +630,66 @@ def update_futures_trading_parameters(path, end_date): class AutomaticUpdateBundle(object): - def __init__(self, path): - # type: (str, str) -> None - self.path = path + def __init__(self, path, filename, rqdata_api, fields): + # type: (str, str, str, List[str]) -> None + self._file = os.path.join(path, filename) self._trading_dates = None + self._filename = filename + self._rqdata_api = rqdata_api + self._fields = fields try: import rqdatac self.rqdata_init = True except ImportError: system_log.info(_("RQData is not installed, relevant data cannot be updated automatically, and some functions will be limited.")) self.rqdata_init = False + + def get_data(self, instrument, dt): + # type: (Instrument, datetime.datetime) -> numpy.ndarray or None + dt = convert_date_to_date_int(dt) + data = self._get_data_all_time(instrument) + if data is None: + return data + else: + data = data[data['datetime'] == dt] + return data + + @lru_cache(128) + def _get_data_all_time(self, instrument): + # type: (Instrument) -> numpy.ndarray or None + self._auto_update_task(instrument) + with h5py.File(self._file, "r") as h5: + try: + data = h5[instrument.order_book_id][:] + except KeyError: + return None + return data - def auto_update_task(self, filename, instrument, rqdata_api, fields): + def _auto_update_task(self, instrument): """ - 在 rqalpha 策略运行过程中自动更新所需的数据 - - :param filename: 存储数据的 h5 文件 - :type filename: `str` + 在 rqalpha 策略运行过程中自动更新所需的日线数据 :param instrument: 合约对象 :type instrument: `Instrument` - - :param rqdata_api: 更新数据所使用的 rqdatac API - :type rqdata_api: `str` - - :param fields: 需要更新的数据字段,需与 rqdata_api 可调取的字段相匹配 - :type fields: `List[str]` """ - file = os.path.join(self.path, filename) order_book_id = instrument.order_book_id start_date = START_DATE - if not os.path.exists(file): - h5 = h5py.File(file, "w") + if not os.path.exists(self._file): + h5 = h5py.File(self._file, "w") else: try: - h5 = h5py.File(file, "a") + h5 = h5py.File(self._file, "a") except OSError: raise OSError("File {} update failed, if it is using, please update later, " - "or you can delete then update again".format(file)) + "or you can delete then update again".format(self._file)) if order_book_id in h5: last_date = datetime.datetime.strptime(str(h5[order_book_id][-1]['datetime']), "%Y%m%d").date() if datetime.date.today() == last_date or rqdatac.get_previous_trading_date(datetime.date.today()) == last_date: h5.close() return start_date = rqdatac.get_next_trading_date(last_date) - - df = self._get_df(rqdata_api, instrument, start_date, fields) + + df = self._get_df(instrument, start_date) if not (df is None or df.empty): if order_book_id in h5: data = np.array( @@ -685,18 +700,18 @@ def auto_update_task(self, filename, instrument, rqdata_api, fields): h5.create_dataset(order_book_id, data=data) else: h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records()) - h5.close() + h5.close() - def _get_df(self, rqdata_api, instrument, start_date, fields): - # type: (str, Instrument, datetime.date, List[str]) -> Dataframe + def _get_df(self, instrument, start_date): + # type: (Instrument, datetime.date) -> Dataframe if instrument.type ==INSTRUMENT_TYPE.FUTURE and instrument.de_listed_date == datetime.datetime(2999, 12, 31): - command = "rqdatac.{}(dic['order_book_id'], dic['start_date'], dic['end_date'], {})".format(rqdata_api, fields) + command = "rqdatac.{}(dic['order_book_id'], dic['start_date'], dic['end_date'], {})".format(self._rqdata_api, self._fields) df = self._get_dominant_df(command, instrument.underlying_symbol, start_date) else: - command = "rqdatac.{}(instrument.order_book_id, start_date, datetime.date.today(), fields)".format(rqdata_api) + command = "rqdatac.{}(instrument.order_book_id, start_date, datetime.date.today(), self._fields)".format(self._rqdata_api) df = eval(command) if not (df is None or df.empty): - df = df[fields] # rqdatac.get_open_auction_info get Futures's data will auto add 'open_interest' and 'prev_settlement' + df = df[self._fields] # rqdatac.get_open_auction_info get Futures's data will auto add 'open_interest' and 'prev_settlement' df.reset_index(inplace=True) df["order_book_id"] = instrument.order_book_id time_parameter = list(set(['datetime', 'date', 'trading_date']).intersection(set(df.columns)))[0] diff --git a/rqalpha/data/data_proxy.py b/rqalpha/data/data_proxy.py index 0825db8f3..daff3425e 100644 --- a/rqalpha/data/data_proxy.py +++ b/rqalpha/data/data_proxy.py @@ -188,10 +188,7 @@ def get_open_auction_bar(self, order_book_id, dt): def get_open_auction_volume(self, order_book_id, dt): instrument = self.instruments(order_book_id) - try: - volume = self._data_source.get_open_auction_volume(instrument, dt) - except NotImplementedError: - volume = self.get_open_auction_bar(order_book_id, dt).volume + volume = self._data_source.get_open_auction_volume(instrument, dt) return volume def history(self, order_book_id, bar_count, frequency, field, dt): diff --git a/tests/api_tests/mod/sys_simulation/test_simulation_broker.py b/tests/api_tests/mod/sys_simulation/test_simulation_broker.py index 9dfa19290..1b27c2380 100644 --- a/tests/api_tests/mod/sys_simulation/test_simulation_broker.py +++ b/tests/api_tests/mod/sys_simulation/test_simulation_broker.py @@ -43,6 +43,9 @@ def test_open_auction_match(): __config__ = { + "base": { + "auto_update_bundle": False, + }, "mod": { "sys_simulation": { "volume_limit": True,