Skip to content

Commit

Permalink
v3.7.4
Browse files Browse the repository at this point in the history
  • Loading branch information
chen-001 committed Feb 22, 2023
1 parent ab30acc commit a603a1c
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 15 deletions.
4 changes: 2 additions & 2 deletions pure_ocean_breeze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
一个量化多因子研究的框架,包含数据、回测、因子加工等方面的功能
"""

__updated__ = "2023-02-13 10:02:13"
__version__ = "3.7.3"
__updated__ = "2023-02-22 18:05:27"
__version__ = "3.7.4"
__author__ = "chenzongwei"
__author_email__ = "[email protected]"
__url__ = "https://github.com/chen-001/pure_ocean_breeze"
Expand Down
29 changes: 27 additions & 2 deletions pure_ocean_breeze/data/database.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__updated__ = "2023-02-03 10:41:21"
__updated__ = "2023-02-22 20:16:11"

import pandas as pd
import pymysql
Expand All @@ -14,6 +14,7 @@
from typing import Union
from psycopg2.extensions import register_adapter, AsIs
from tenacity import retry, stop_after_attempt
import questdb.ingress as qdbing
from pure_ocean_breeze.state.states import STATES


Expand Down Expand Up @@ -822,7 +823,7 @@ def __addapt_numpy_float64(self, numpy_float64):
def __addapt_numpy_int64(self, numpy_int64):
return AsIs(numpy_int64)

def write_via_df(
def write_via_df_old(
self,
df: pd.DataFrame,
table: str,
Expand Down Expand Up @@ -890,6 +891,30 @@ def write_via_df(
cursor.close()
cursor.close()

def write_via_df(
self,
df: pd.DataFrame,
table_name: str,
symbols: Union[str, bool, list[int], list[str]] = None,
) -> None:
"""通过questdb的python库直接将dataframe写入quested数据库
Parameters
----------
df : pd.DataFrame
要写入的dataframe
table_name : str
questdb中该表的表名
symbols : Union[str, bool, list[int], list[str]], optional
为symbols的那些列的名称, by default None
"""
if symbols is not None:
with qdbing.Sender(self.host, 9009) as sender:
sender.dataframe(df, table_name=table_name, symbols=symbols)
else:
with qdbing.Sender(self.host, 9009) as sender:
sender.dataframe(df, table_name=table_name)

def write_via_csv(self, df: pd.DataFrame, table: str, index_id: str = None) -> None:
"""以csv中转的方式,将pd.DataFrame写入Questdb,这一方法的速度约为直接写入的20倍以上,建议使用此方法
Expand Down
6 changes: 3 additions & 3 deletions pure_ocean_breeze/data/write_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__updated__ = "2023-02-19 22:46:25"
__updated__ = "2023-02-22 20:19:10"

import time

Expand Down Expand Up @@ -159,7 +159,7 @@ def database_update_minute_data_to_clickhouse_and_questdb(kind: str,web_port:str
ts.date = ts.date.astype(int).astype(str)
ts.num = ts.num.astype(int).astype(str)
qdb = Questdb(web_port=web_port)
qdb.write_via_csv(ts, f"minute_data_{kind}")
qdb.write_via_df(ts, f"minute_data_{kind}")
# 获取剩余使用额
user2 = round(rqdatac.user.get_quota()["bytes_used"] / 1024 / 1024, 2)
user12 = round(user2 - user1, 2)
Expand Down Expand Up @@ -334,7 +334,7 @@ def database_update_minute_data_to_questdb(kind: str,web_port:str='9001') -> Non
ts.date = ts.date.astype(int).astype(str)
ts.num = ts.num.astype(int).astype(str)
# 数据写入数据库
qdb.write_via_csv(ts, f"minute_data_{kind}")
qdb.write_via_df(ts, f"minute_data_{kind}")
# 获取剩余使用额
user2 = round(rqdatac.user.get_quota()["bytes_used"] / 1024 / 1024, 2)
user12 = round(user2 - user1, 2)
Expand Down
13 changes: 5 additions & 8 deletions pure_ocean_breeze/labor/process.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__updated__ = "2023-02-03 10:39:00"
__updated__ = "2023-02-22 20:19:44"

import warnings

Expand Down Expand Up @@ -3268,7 +3268,7 @@ def select_one_calculate(
df.index = pd.to_datetime(df.index.astype(str), format="%Y%m%d")
to_save = df.stack().reset_index()
to_save.columns = ["date", "code", "fac"]
self.factor_steps.write_via_csv(to_save, self.factor_file_pinyin)
self.factor_steps.write_via_df(to_save, self.factor_file_pinyin)
return df

def select_many_calculate(
Expand Down Expand Up @@ -3298,9 +3298,6 @@ def select_many_calculate(
show_time=show_time,
)
factor_new.append(df_first)
to_save = df_first.stack().reset_index()
to_save.columns = ["date", "code", "fac"]
self.factor_steps.write_via_csv(to_save, self.factor_file_pinyin)

if tqdm_inside == 1:
# 开始计算因子值
Expand Down Expand Up @@ -3328,7 +3325,7 @@ def select_many_calculate(
factor_new.append(df)
to_save = df.stack().reset_index()
to_save.columns = ["date", "code", "fac"]
self.factor_steps.write_via_csv(to_save, self.factor_file_pinyin)
self.factor_steps.write_via_df(to_save, self.factor_file_pinyin)
else:
# 开始计算因子值
for date1, date2 in tqdm.auto.tqdm(cuts, desc="不知乘月几人归,落月摇情满江树。"):
Expand Down Expand Up @@ -3357,7 +3354,7 @@ def select_many_calculate(
factor_new.append(df)
to_save = df.stack().reset_index()
to_save.columns = ["date", "code", "fac"]
self.factor_steps.write_via_csv(to_save, self.factor_file_pinyin)
self.factor_steps.write_via_df(to_save, self.factor_file_pinyin)
else:
pairs = self.forward_dates(dates, many_days=many_days)
for date1, date2 in tqdm.auto.tqdm(
Expand Down Expand Up @@ -3392,7 +3389,7 @@ def select_many_calculate(
factor_new.append(df)
to_save = df.stack().reset_index()
to_save.columns = ["date", "code", "fac"]
self.factor_steps.write_via_csv(to_save, self.factor_file_pinyin)
self.factor_steps.write_via_df(to_save, self.factor_file_pinyin)
if len(factor_new) > 0:
factor_new = pd.concat(factor_new)
return factor_new
Expand Down
3 changes: 3 additions & 0 deletions 更新日志/version3.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 更新日志🗓 — v3

* v3.7.4 — 2023.2.22
> 1. Questdb的写入方式将原write_via_df改为write_via_df_old,新增了通过questdb.ingress.Sender写入的方式write_via_df,并将所有写入方式都改为了write_via_df
> 1. 修复了使用分钟数据计算因子值时,单日数据重复写入的bug
* v3.7.3 — 2023.2.21
> 1. 删去了do_on_dfs装饰器对get_list_std函数的支持
> 1. 修复了database_update_minute_data_to_clickhouse_and_questdb和database_update_minute_data_to_questdb在端口号为9000的设备上的写入bug,并增加了web_port参数,用于手动指定端口参数
Expand Down

0 comments on commit a603a1c

Please sign in to comment.