Skip to content

Commit

Permalink
v3.3.4
Browse files Browse the repository at this point in the history
  • Loading branch information
chen-001 committed Oct 6, 2022
1 parent 776d0ad commit 0ba9343
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 25 deletions.
4 changes: 2 additions & 2 deletions pure_ocean_breeze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
一个量化多因子研究的框架,包含数据、回测、因子加工等方面的功能
"""

__updated__ = "2022-10-01 11:35:56"
__version__ = "3.3.3"
__updated__ = "2022-10-02 09:18:37"
__version__ = "3.3.4"
__author__ = "chenzongwei"
__author_email__ = "[email protected]"
__url__ = "https://github.com/chen-001/pure_ocean_breeze"
Expand Down
100 changes: 99 additions & 1 deletion pure_ocean_breeze/data/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
针对一些不常见的文件格式,读取数据文件的一些工具函数,以及其他数据工具
"""

__updated__ = "2022-09-30 22:33:15"
__updated__ = "2022-10-06 10:18:49"

import h5py
import pandas as pd
import tqdm
import datetime
import scipy.io as scio
import numpy as np
import numpy_ext as npext
from functools import reduce, partial
from typing import Callable

try:
import rqdatac
Expand Down Expand Up @@ -345,3 +348,98 @@ def change_index_name(df: pd.DataFrame, name: str = "date") -> pd.DataFrame:
df.columns = [name] + list(df.columns)[1:]
df = set_index_first(df)
return df


def merge_many(dfs: list[pd.DataFrame], names: list = None) -> pd.DataFrame:
"""将多个宽dataframe依据columns和index,拼接在一起,拼成一个长dataframe
Parameters
----------
dfs : list[pd.DataFrame]
将所有要拼接的宽表放在一个列表里
names : list, optional
拼接后,每一列宽表对应的名字, by default None
Returns
-------
pd.DataFrame
拼接后的dataframe
"""
num = len(dfs)
if names is None:
names = [f"fac{i+1}" for i in range(num)]
dfs = [i.stack().reset_index() for i in dfs]
dfs = [i.rename(columns={list(i.columns)[-1]: j}) for i, j in zip(dfs, names)]
df = reduce(lambda x, y: pd.merge(x, y, on=["date", "code"]))
return df


def corr_two_daily(
df1: pd.DataFrame, df2: pd.DataFrame, rolling_window: int = 20
) -> pd.DataFrame:
"""求两个因子,在相同股票上,时序上滚动窗口下的相关系数
Parameters
----------
df1 : pd.DataFrame
第一个因子,index为时间,columns为股票代码
df2 : pd.DataFrame
第二个因子,index为时间,columns为股票代码
rolling_window : int, optional
滚动窗口, by default 20
Returns
-------
pd.DataFrame
相关系数后的结果,index为时间,columns为股票代码
"""

def corr_in(a, b, c):
return c.iloc[-1], np.corrcoef(a, b)[0, 1]

return func_two_daily(df1=df1, df2=df2, func=corr_in, rolling_window=rolling_window)


def func_two_daily(
df1: pd.DataFrame, df2: pd.DataFrame, func: Callable, rolling_window: int = 20
) -> pd.DataFrame:
"""求两个因子,在相同股票上,时序上滚动窗口下的相关系数
Parameters
----------
df1 : pd.DataFrame
第一个因子,index为时间,columns为股票代码
df2 : pd.DataFrame
第二个因子,index为时间,columns为股票代码
func : Callable
要对两列数进行操作的函数
rolling_window : int, optional
滚动窗口, by default 20
Returns
-------
pd.DataFrame
计算后的结果,index为时间,columns为股票代码
"""

the_func = partial(func)

def func_rolling(df):
df = df.sort_values(["date"])
if df.shape[0] > rolling_window:
df = npext.rolling_apply(
the_func, rolling_window, df.fac1, df.fac2, df.date, n_jobs=6
)
return df

twins = merge_many([df1, df2])
tqdm.tqdm.pandas()
corrs = twins.groupby(["code"]).progress_apply(func_rolling)
cor = []
for i in tqdm.tqdm_notebook(range(len(corrs))):
df = pd.DataFrame(corrs.iloc[i]).dropna().assign(code=corrs.index[i])
cor.append(df)
cors = pd.concat(cor)
cors.columns = ["date", "corr", "code"]
cors = cors.pivot(index="date", columns="code", values="corr")
return cors
47 changes: 25 additions & 22 deletions pure_ocean_breeze/labor/process.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__updated__ = "2022-10-01 11:39:37"
__updated__ = "2022-10-06 10:21:36"

import warnings

Expand Down Expand Up @@ -1372,11 +1372,7 @@ class pure_moon(object):

@classmethod
@lru_cache(maxsize=None)
def __init__(
cls,
startdate: int,
no_read_indu: bool = 0,
):
def __init__(cls, no_read_indu):
cls.homeplace = HomePlace()
# 已经算好的月度st状态文件
cls.sts_monthly_file = homeplace.daily_data_file + "sts_monthly.feather"
Expand Down Expand Up @@ -1409,7 +1405,7 @@ def deal_dummy(industry_dummy):
col = ["code", "date"] + industry_ws
industry_dummy.columns = col
industry_dummy = industry_dummy[
industry_dummy.date >= pd.Timestamp(str(startdate))
industry_dummy.date >= pd.Timestamp("20100101")
]
return industry_dummy

Expand Down Expand Up @@ -2220,7 +2216,7 @@ def __init__(
if only_cap + no_read_indu > 0:
only_cap = no_read_indu = 1
self.shen = pure_moon(
startdate=start,
# startdate=start,
no_read_indu=no_read_indu,
)
self.shen.set_basic_data(
Expand Down Expand Up @@ -2825,17 +2821,19 @@ def select_one_calculate(
if self.clickhouse == 1:
sql_order = f"select {fields} from minute_data.minute_data_{self.kind} where date={date * 100} order by code,date,num"
else:
sql_order = f"select {fields} from minute_data_{self.kind} where date='{date}'"
sql_order = (
f"select {fields} from minute_data_{self.kind} where date='{date}'"
)
if show_time:
df = self.chc.get_data_show_time(sql_order)
else:
df = self.chc.get_data(sql_order)
if self.clickhouse == 1:
df = ((df.set_index("code")) / 100).reset_index()
else:
df.num=df.num.astype(int)
df.date=df.date.astype(int)
df=df.sort_values(['date','num'])
df.num = df.num.astype(int)
df.date = df.date.astype(int)
df = df.sort_values(["date", "num"])
tqdm.tqdm.pandas()
df = df.groupby(["date", "code"]).progress_apply(the_func)
df = df.to_frame("fac").reset_index()
Expand All @@ -2847,17 +2845,19 @@ def select_one_calculate(
if self.clickhouse == 1:
sql_order = f"select {fields} from minute_data.minute_data_{self.kind} where date={date * 100} order by code,date,num"
else:
sql_order = f"select {fields} from minute_data_{self.kind} where date='{date}'"
sql_order = (
f"select {fields} from minute_data_{self.kind} where date='{date}'"
)
if show_time:
df = self.chc.get_data_show_time(sql_order)
else:
df = self.chc.get_data(sql_order)
if self.clickhouse == 1:
df = ((df.set_index("code")) / 100).reset_index()
else:
df.num=df.num.astype(int)
df.date=df.date.astype(int)
df=df.sort_values(['date','num'])
df.num = df.num.astype(int)
df.date = df.date.astype(int)
df = df.sort_values(["date", "num"])
df = df.groupby(["date", "code"]).apply(the_func)
df = df.to_frame("fac").reset_index()
df.columns = ["date", "code", "fac"]
Expand Down Expand Up @@ -2907,9 +2907,9 @@ def select_many_calculate(
if self.clickhouse == 1:
df = ((df.set_index("code")) / 100).reset_index()
else:
df.num=df.num.astype(int)
df.date=df.date.astype(int)
df=df.sort_values(['date','num'])
df.num = df.num.astype(int)
df.date = df.date.astype(int)
df = df.sort_values(["date", "num"])
tqdm.tqdm.pandas()
df = df.groupby(["date", "code"]).progress_apply(the_func)
df = df.to_frame("fac").reset_index()
Expand Down Expand Up @@ -2950,9 +2950,9 @@ def select_many_calculate(
if self.clickhouse == 1:
df = ((df.set_index("code")) / 100).reset_index()
else:
df.num=df.num.astype(int)
df.date=df.date.astype(int)
df=df.sort_values(['date','num'])
df.num = df.num.astype(int)
df.date = df.date.astype(int)
df = df.sort_values(["date", "num"])
df = df.groupby(["date", "code"]).apply(the_func)
df = df.to_frame("fac").reset_index()
df.columns = ["date", "code", "fac"]
Expand Down Expand Up @@ -3968,6 +3968,7 @@ def sort_a_with_b_func(self):


class pure_fama(object):
# @lru_cache(maxsize=None)
def __init__(
self,
factors: list[pd.DataFrame],
Expand Down Expand Up @@ -4056,6 +4057,8 @@ def __init__(
self.rets_long = pd.concat(self.factors_rets_long, axis=1)
self.rets_short = pd.concat(self.factors_rets_short, axis=1)
self.__factors_rets = self.rets_long - self.rets_short
if add_market_series is not None:
add_market = 1
if add_market:
if add_market_series is None:
closes = read_market(close=1, every_stock=0, start=start).to_frame(
Expand Down
6 changes: 6 additions & 0 deletions 更新日志/version3.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## 更新日志🗓 — v3

* v3.3.4 — 2022.10.06
> 1. 删去了回测类pure_moon和pure_moonnight中起始日期startdate参数,以避免输入因子值起点不同,会导致缓存失效的bug
> 1. 优化了pure_fama的参数逻辑,在输入add_markert_series参数时,自动将add_market参数指定为1
> 1. 新增了merge_many函数,将多个index为时间,columns位股票代码的dataframe拼接在一起,变成一个长表
> 1. 新增函数func_two_daily,用于对两个index为时间,columns为股票代码的dataframe,每只股票下,各自沿着时间序列,做某个函数操作,最终得到一个index为时间,columns为股票代码的dataframe
> 1. 新增func_two_daily的特例函数,corr_two_daily,求两个因子同一股票滚动窗口下的时序相关系数
* v3.3.3 — 2022.10.01
> 1. 将读取300、500、1000指数的行情read_index_three改为从分钟数据读取
> 1. 给读取市场行情(中证全指)行情read_market增加从questdb读取
Expand Down

0 comments on commit 0ba9343

Please sign in to comment.