Skip to content

Commit

Permalink
pr update
Browse files Browse the repository at this point in the history
  • Loading branch information
Lin-Dongzhao committed Mar 15, 2024
1 parent 51ff598 commit cd87697
Showing 1 changed file with 43 additions and 74 deletions.
117 changes: 43 additions & 74 deletions rqalpha/data/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from rqalpha.utils.logger import system_log
from rqalpha.const import INSTRUMENT_TYPE
from rqalpha.utils.functools import lru_cache
from rqalpha.environment import Environment

START_DATE = 20050104
END_DATE = 29991231
Expand Down Expand Up @@ -631,12 +632,13 @@ def update_futures_trading_parameters(path, end_date):

class AutomaticUpdateBundle(object):
def __init__(self, path, filename, rqdata_api, fields):
# type: (str, str, str, List[str]) -> None
# type: (str, str, function, List[str]) -> None
self._file = os.path.join(path, filename)
self._trading_dates = None
self._filename = filename
self._rqdata_api = rqdata_api
self._fields = fields
self._env = None
try:
import rqdatac
self.rqdata_init = True
Expand All @@ -651,7 +653,10 @@ def get_data(self, instrument, dt):
if data is None:
return data
else:
data = data[data['datetime'] == dt]
try:
data = data[np.searchsorted(data['trading_dt'], dt)]
except IndexError:
data = None
return data

@lru_cache(128)
Expand All @@ -672,92 +677,56 @@ def _auto_update_task(self, instrument):
:param instrument: 合约对象
:type instrument: `Instrument`
"""
self._env = Environment.get_instance()
order_book_id = instrument.order_book_id
start_date = START_DATE
if not os.path.exists(self._file):
h5 = h5py.File(self._file, "w")
try:
df = self._get_df(instrument, start_date)
if not (df is None or df.empty):
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records())
finally:
h5.close()
else:
try:
h5 = h5py.File(self._file, "a")
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(self._file))
if order_book_id in h5:
last_date = datetime.datetime.strptime(str(h5[order_book_id][-1]['datetime']), "%Y%m%d").date()
if datetime.date.today() == last_date or rqdatac.get_previous_trading_date(datetime.date.today()) == last_date:
h5.close()
return
start_date = rqdatac.get_next_trading_date(last_date)

df = self._get_df(instrument, start_date)
if not (df is None or df.empty):
if order_book_id in h5:
data = np.array(
[tuple(i) for i in chain(h5[order_book_id][:], df.loc[order_book_id].to_records())],
dtype=h5[order_book_id].dtype
)
del h5[order_book_id]
h5.create_dataset(order_book_id, data=data)
else:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records())
h5.close()
except OSError as e:
raise OSError(_("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(self._file))) from e
try:
if order_book_id in h5:
last_date = datetime.datetime.strptime(str(h5[order_book_id][-1]['trading_dt']), "%Y%m%d").date()
start_date = self._env.data_proxy._data_source.get_next_trading_date(last_date).date()
if start_date > datetime.date.today():
return
df = self._get_df(instrument, start_date)
if not (df is None or df.empty):
if order_book_id in h5:
data = np.array(
[tuple(i) for i in chain(h5[order_book_id][:], df.loc[order_book_id].to_records())],
dtype=h5[order_book_id].dtype
)
del h5[order_book_id]
h5.create_dataset(order_book_id, data=data)
else:
h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records())
finally:
h5.close()

def _get_df(self, instrument, start_date):
# type: (Instrument, datetime.date) -> Dataframe
if instrument.type ==INSTRUMENT_TYPE.FUTURE and instrument.de_listed_date == datetime.datetime(2999, 12, 31):
command = "rqdatac.{}(dic['order_book_id'], dic['start_date'], dic['end_date'], {})".format(self._rqdata_api, self._fields)
df = self._get_dominant_df(command, instrument.underlying_symbol, start_date)
else:
command = "rqdatac.{}(instrument.order_book_id, start_date, datetime.date.today(), self._fields)".format(self._rqdata_api)
df = eval(command)
df = self._rqdata_api(instrument.order_book_id, start_date, datetime.date.today(), self._fields)
if not (df is None or df.empty):
df = df[self._fields] # rqdatac.get_open_auction_info get Futures's data will auto add 'open_interest' and 'prev_settlement'
df.reset_index(inplace=True)
df["order_book_id"] = instrument.order_book_id
time_parameter = list(set(['datetime', 'date', 'trading_date']).intersection(set(df.columns)))[0]
if time_parameter == "datetime":
if instrument.type in [INSTRUMENT_TYPE.FUTURE, INSTRUMENT_TYPE.OPTION]:
# 有夜盘的合约,datetime 的日期可能与所属交易日期不一致
self._trading_dates = rqdatac.get_trading_dates(start_date, datetime.date.today())
df['datetime'] = df[time_parameter].map(self._update_datetime)
else:
df['datetime'] = df[time_parameter].map(convert_date_to_date_int)
else:
df['datetime'] = df[time_parameter].map(convert_date_to_date_int)
del df[time_parameter]
df.set_index(["order_book_id", "datetime"], inplace=True)
return df

def _update_datetime(self, dt):
if dt.hour < 17:
t = convert_date_to_date_int(dt)
else:
t = (dt + datetime.timedelta(days=1)).date()
while t not in self._trading_dates:
t = t + datetime.timedelta(days=1)
t = convert_date_to_date_int(t)
return t

def _get_dominant_df(self, command, underlying_symbol, start_date):

def slice_df(df):
slice_list = []
order_book_id = df[0]
start_date = df.head(1).index[0]
for date, donimant in df.items():
if donimant == order_book_id:
end_date = date
if donimant != order_book_id or date == df.tail(1).index[0]:
dic = {"order_book_id": order_book_id, "start_date": start_date, "end_date": end_date}
slice_list.append(dic)
order_book_id = donimant
start_date = date
return slice_list

dominant_df = rqdatac.futures.get_dominant(underlying_symbol, start_date, datetime.date.today())
slice_list = slice_df(dominant_df)
df = pd.DataFrame()
for dic in slice_list:
df = pd.concat([df, eval(command)])
if time_parameter == "datetime" and instrument.type != INSTRUMENT_TYPE.CS:
# 股票无夜盘,且在市时间较长,考虑省略获取交易日的操作
df[time_parameter] = df[time_parameter].map(self._env.data_proxy._data_source.get_future_trading_date)
df['trading_dt'] = df[time_parameter].map(convert_date_to_date_int)
del df[time_parameter]
df.set_index(["order_book_id", "trading_dt"], inplace=True)
return df

0 comments on commit cd87697

Please sign in to comment.