Skip to content

Commit

Permalink
v3.7.7
Browse files Browse the repository at this point in the history
  • Loading branch information
chen-001 committed Feb 23, 2023
1 parent 3db4954 commit fd64045
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 77 deletions.
4 changes: 2 additions & 2 deletions pure_ocean_breeze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
一个量化多因子研究的框架,包含数据、回测、因子加工等方面的功能
"""

__updated__ = "2023-02-23 14:42:32"
__version__ = "3.7.6"
__updated__ = "2023-02-23 16:17:02"
__version__ = "3.7.7"
__author__ = "chenzongwei"
__author_email__ = "[email protected]"
__url__ = "https://github.com/chen-001/pure_ocean_breeze"
Expand Down
4 changes: 2 additions & 2 deletions pure_ocean_breeze/data/database.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__updated__ = "2023-02-23 15:01:44"
__updated__ = "2023-02-23 16:16:34"

import pandas as pd
import pymysql
Expand Down Expand Up @@ -952,7 +952,7 @@ def get_data_with_tuple(

def eval_it(x):
if "," in x.iloc[0]:
x = pd.eval(x)
x = x.apply(lambda y:[float(i) if y not in ['nan', ' nan'] else np.nan for i in y[1:-1].split(',')])
else:
x = x.astype(float)
return x
Expand Down
132 changes: 61 additions & 71 deletions pure_ocean_breeze/labor/process.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__updated__ = "2023-02-23 12:34:42"
__updated__ = "2023-02-23 22:46:58"

import warnings

Expand Down Expand Up @@ -33,6 +33,7 @@
import tradetime as tt
import cufflinks as cf
import deprecation
from mpire import WorkerPool
from pure_ocean_breeze import __version__

cf.set_config_file(offline=True)
Expand Down Expand Up @@ -2665,7 +2666,6 @@ def __call__(self, monthly=False):
except Exception:
return self.monthly_factors.copy()


def wide_to_long(self, df, i):
"""将宽数据转化为长数据,用于因子表转化和拼接"""
df = df.stack().reset_index()
Expand Down Expand Up @@ -3131,8 +3131,8 @@ def select_many_calculate(
fields: str = "*",
chunksize: int = 10,
show_time: bool = 0,
tqdm_inside: bool = 0,
many_days: int = 1,
n_jobs: int = 1,
) -> None:
the_func = partial(func)
factor_new = []
Expand All @@ -3152,71 +3152,48 @@ def select_many_calculate(
)
factor_new.append(df_first)

if tqdm_inside == 1:
# 开始计算因子值
for date1, date2 in cuts:
if self.clickhouse == 1:
sql_order = f"select {fields} from minute_data.minute_data_{self.kind} where date>{dates[date1] * 100} and date<={dates[date2] * 100} order by code,date,num"
else:
sql_order = f"select {fields} from minute_data_{self.kind} where cast(date as int)>{dates[date1]} and cast(date as int)<={dates[date2]}"
if show_time:
df = self.chc.get_data_show_time(sql_order)
else:
df = self.chc.get_data(sql_order)
if self.clickhouse == 1:
df = ((df.set_index("code")) / 100).reset_index()
else:
df.num = df.num.astype(int)
df.date = df.date.astype(int)
df = df.sort_values(["date", "num"])
tqdm.auto.tqdm.pandas()
df = df.groupby(self.groupby_target).progress_apply(the_func)
def cal_one(date1, date2):
if self.clickhouse == 1:
sql_order = f"select {fields} from minute_data.minute_data_{self.kind} where date>{dates[date1] * 100} and date<={dates[date2] * 100} order by code,date,num"
else:
sql_order = f"select {fields} from minute_data_{self.kind} where cast(date as int)>{dates[date1]} and cast(date as int)<={dates[date2]} order by code,date,num"
if show_time:
df = self.chc.get_data_show_time(sql_order)
else:
df = self.chc.get_data(sql_order)
if self.clickhouse == 1:
df = ((df.set_index("code")) / 100).reset_index()
else:
df.num = df.num.astype(int)
df.date = df.date.astype(int)
df = df.sort_values(["date", "num"])
df = df.groupby(self.groupby_target).apply(the_func)
if self.groupby_target == ["date", "code"]:
df = df.to_frame("fac").reset_index()
df.columns = ["date", "code", "fac"]
df = df.pivot(columns="code", index="date", values="fac")
df.index = pd.to_datetime(df.index.astype(str), format="%Y%m%d")
factor_new.append(df)
to_save = df.stack().reset_index()
to_save.columns = ["date", "code", "fac"]
self.factor_steps.write_via_df(
to_save, self.factor_file_pinyin, tuple_col="fac"
)
else:
df = df.reset_index()
df = df.pivot(columns="code", index="date", values="fac")
df.index = pd.to_datetime(df.index.astype(str), format="%Y%m%d")
to_save = df.stack().reset_index()
to_save.columns = ["date", "code", "fac"]
self.factor_steps.write_via_df(
to_save, self.factor_file_pinyin, tuple_col="fac"
)
return df

if n_jobs > 1:
with WorkerPool(n_jobs=n_jobs) as pool:
factor_new_more = pool.map(cal_one, cuts, progress_bar=True)
factor_new = factor_new + factor_new_more
else:
# 开始计算因子值
for date1, date2 in tqdm.auto.tqdm(cuts, desc="不知乘月几人归,落月摇情满江树。"):
if self.clickhouse == 1:
sql_order = f"select {fields} from minute_data.minute_data_{self.kind} where date>{dates[date1] * 100} and date<={dates[date2] * 100} order by code,date,num"
else:
sql_order = f"select {fields} from minute_data_{self.kind} where cast(date as int)>{dates[date1]} and cast(date as int)<={dates[date2]} order by code,date,num"
if show_time:
df = self.chc.get_data_show_time(sql_order)
else:
df = self.chc.get_data(sql_order)
if self.clickhouse == 1:
df = ((df.set_index("code")) / 100).reset_index()
else:
df.num = df.num.astype(int)
df.date = df.date.astype(int)
df = df.sort_values(["date", "num"])
df = df.groupby(self.groupby_target).apply(the_func)
if self.groupby_target == ["date", "code"]:
df = df.to_frame("fac").reset_index()
df.columns = ["date", "code", "fac"]
else:
df = df.reset_index()
df = df.pivot(columns="code", index="date", values="fac")
df.index = pd.to_datetime(df.index.astype(str), format="%Y%m%d")
df = cal_one(date1, date2)
factor_new.append(df)
to_save = df.stack().reset_index()
to_save.columns = ["date", "code", "fac"]
self.factor_steps.write_via_df(
to_save, self.factor_file_pinyin, tuple_col="fac"
)
else:
pairs = self.forward_dates(dates, many_days=many_days)
for date1, date2 in tqdm.auto.tqdm(
list(zip(pairs, dates)), desc="知不可乎骤得,托遗响于悲风。"
):

def cal_two(date1, date2):
if date1 is not None:
if self.clickhouse == 1:
sql_order = f"select {fields} from minute_data.minute_data_{self.kind} where date>{date1*100} and date<={date2*100} order by code,date,num"
Expand All @@ -3243,12 +3220,25 @@ def select_many_calculate(
df.columns = ["code", "fac", "date"]
df = df.pivot(columns="code", index="date", values="fac")
df.index = pd.to_datetime(df.index.astype(str), format="%Y%m%d")
factor_new.append(df)
to_save = df.stack().reset_index()
to_save.columns = ["date", "code", "fac"]
self.factor_steps.write_via_df(
to_save, self.factor_file_pinyin, tuple_col="fac"
)
return df

pairs = self.forward_dates(dates, many_days=many_days)
cuts2 = tuple(zip(pairs, dates))
if n_jobs > 1:
with WorkerPool(n_jobs=n_jobs) as pool:
factor_new_more = pool.map(cal_two, cuts2, progress_bar=True)
factor_new = factor_new + factor_new_more
else:
# 开始计算因子值
for date1, date2 in tqdm.auto.tqdm(cuts2, desc="知不可乎骤得,托遗响于悲风。"):
df = cal_two(date1, date2)
factor_new.append(df)

if len(factor_new) > 0:
factor_new = pd.concat(factor_new)
return factor_new
Expand All @@ -3262,8 +3252,8 @@ def select_any_calculate(
fields: str = "*",
chunksize: int = 10,
show_time: bool = 0,
tqdm_inside: bool = 0,
many_days: int = 1,
n_jobs: int = 1,
) -> None:
if len(dates) == 1 and many_days == 1:
res = self.select_one_calculate(
Expand All @@ -3279,9 +3269,11 @@ def select_any_calculate(
fields=fields,
chunksize=chunksize,
show_time=show_time,
tqdm_inside=tqdm_inside,
many_days=many_days,
n_jobs=n_jobs,
)
if res is not None:
self.factor_new.append(res)
return res

@staticmethod
Expand Down Expand Up @@ -3344,8 +3336,8 @@ def get_daily_factors(
fields: str = "*",
chunksize: int = 10,
show_time: bool = 0,
tqdm_inside: bool = 0,
many_days: int = 1,
n_jobs: int = 1,
) -> None:
"""每次抽取chunksize天的截面上全部股票的分钟数据
对每天的股票的数据计算因子值
Expand All @@ -3361,10 +3353,10 @@ def get_daily_factors(
每次读取的截面上的天数, by default 10
show_time : bool, optional
展示每次读取数据所需要的时间, by default 0
tqdm_inside : bool, optional
将进度条加在内部,而非外部,建议仅chunksize较大时使用, by default 0
many_days: int, optional
many_days : int, optional
计算某天的因子值时,需要使用之前多少天的数据
n_jobs : int, optional
并行数量,不建议设置为大于2的数,此外当此参数大于1时,请使用questdb数据库来读取分钟数据, by default 1
"""
if len(self.dates_new) > 0:
for interval in self.dates_new_intervals:
Expand All @@ -3374,11 +3366,9 @@ def get_daily_factors(
fields=fields,
chunksize=chunksize,
show_time=show_time,
tqdm_inside=tqdm_inside,
many_days=many_days,
n_jobs=n_jobs,
)
if df is not None:
self.factor_new.append(df)
self.factor_new = pd.concat(self.factor_new)
# 拼接新的和旧的
self.factor = pd.concat([self.factor_old, self.factor_new]).sort_index()
Expand Down
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__updated__ = "2023-02-23 11:36:59"
__updated__ = "2023-02-23 22:44:04"

from setuptools import setup
import setuptools
Expand Down Expand Up @@ -27,7 +27,7 @@ def get_version(package):
url="https://github.com/chen-001/pure_ocean_breeze.git",
project_urls={"Documentation": "https://chen-001.github.io/pure_ocean_breeze/"},
install_requires=[
"numpy",
# "numpy",
"pandas",
"scipy",
"statsmodels",
Expand All @@ -54,6 +54,7 @@ def get_version(package):
"tradetime",
"deprecation",
"questdb",
"mpire",
],
python_requires=">=3",
license="MIT",
Expand Down
5 changes: 5 additions & 0 deletions 更新日志/version3.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## 更新日志🗓 — v3

* v3.7.7 — 2023.2.23
> 1. 修复了Questdb的get_data_with_tuple方法,读取数据异常的bug
> 1. 对pure_fall_frequent的get_daily_factors方法新增了n_jobs参数,用于开启基于mpire的并行加速
> 1. 删去了pure_fall_frequent的get_daily_factors方法中,tqdm_inside的参数,只能在外部添加总进度条
> 1. 新增了mpire依赖库,暂时删去了numpy的依赖库
* v3.7.6 — 2023.2.23
> 1. 修复了pure_fall_frequent打断后重读备份表的bug
> 1. Questdb新增了copy_all_tables和upload_all_copies方法,用于备份和恢复数据库
Expand Down

0 comments on commit fd64045

Please sign in to comment.