Skip to content

Commit

Permalink
🐳 chore: backup
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-yang-biz committed Apr 6, 2024
1 parent 3d44e79 commit dec406e
Show file tree
Hide file tree
Showing 16 changed files with 2,150 additions and 1,583 deletions.
9 changes: 2 additions & 7 deletions omega/config/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ redis:
pickle:
ver: 4

tushare:
token: ${TUSHARE_TOKEN}
omega:
http_port: 3180
local_data: /zillionare/import
Expand All @@ -51,13 +53,6 @@ quotes_fetchers:
zarr:
store_path: /zillionare/boards.zarr

influxdb:
url: ${INFLUXDB_URL}
token: ${INFLUXDB_TOKEN}
org: ${INFLUXDB_ORG}
bucket_name: ${INFLUXDB_BUCKET}
enable_compress: true
max_query_size: 15000

notify:
mail_from: ${MAIL_FROM}
Expand Down
3 changes: 3 additions & 0 deletions omega/core/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# -*- coding: utf-8 -*-
# @Author : xiaohuzi
# @Time : 2022-01-11 10:24
import datetime

EPOCH = datetime.date(2005, 1, 4)
TASK_PREFIX = "master.task"
TASK_SECS_PREFIX = "master.task.secs"

Expand Down
3 changes: 1 addition & 2 deletions omega/core/haystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ def update_factors(self, sec: str, factors: pd.Series):
sec: 待更新复权因子的证券代码
factors: 以日期为索引,复权因子为值的Series
"""

for dt, factor in factors.items():
sql = "alter table bars_day update factor = %(v1)s where symbol = %(v2)s and frame = %(v3)s"

self.client.command(sql, {"v1": factor, "v2": sec, "v3": dt})

100 changes: 54 additions & 46 deletions omega/fetchers/akshare.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,60 @@
import datetime

import contextlib
import akshare as ak
import arrow
import pandas as pd
from functools import wraps
from coretypes import SecurityInfoSchema
from pandera.typing import DataFrame
from pandera.typing import DataFrame, Series
from pypinyin import Style, lazy_pinyin
from omega.core.constants import EPOCH
import io


def _get_pinyin_initials(hanz: str)->str:
def _get_pinyin_initials(hanz: str) -> str:
"""将汉字转换为拼音首字母
本辅助函数用以获取证券名的拼音简写。
"""
return "".join(lazy_pinyin(hanz, style = Style.FIRST_LETTER)).upper()
return "".join(lazy_pinyin(hanz, style=Style.FIRST_LETTER)).upper()


def cheat_tqdm():
"""akshare使用tqdm来做进度显示。但是tqdm需要连接到控制台或者GUI,否则会出错。此函数用来欺骗tqdm,使得对akshare的调用能在后台进程中完成"""

def inner(func):
@wraps(func)
def decorated_function(*args, **kwargs):
with contextlib.redirect_stderr(io.StringIO()):
return func(*args, **kwargs)

return decorated_function

return inner

def fetch_stock_list()->DataFrame[SecurityInfoSchema]:

@cheat_tqdm()
def fetch_stock_list() -> DataFrame[SecurityInfoSchema]:
"""获取证券列表
此接口用时以秒计,因此一般需要在工作者进程中调用。
"""
# 获取沪市股票列表
sh = ak.stock_info_sh_name_code()
sh = sh.rename({
"证券简称": "alias",
"上市日期": "ipo",
"证券代码": "code"
}, axis=1)
sh = sh.rename({"证券简称": "alias", "上市日期": "ipo", "证券代码": "code"}, axis=1)

sh["code"] = sh["code"].apply(lambda x: x+".SH")
sh["code"] = sh["code"].apply(lambda x: x + ".SH")
sh["initials"] = sh["alias"].apply(_get_pinyin_initials)
sh["exit"] = None
sh["type"] = "stock"

sh = sh[["code", "alias", "initials", "ipo", "exit", "type"]]

# 补齐退市证券
sh_delisted = ak.stock_info_sh_delist()
sh_delisted = sh_delisted.rename({
"公司代码": "code",
"公司简称": "alias",
"上市日期": "ipo",
"暂停上市日期": "exit"
}, axis=1)
sh_delisted = sh_delisted.rename(
{"公司代码": "code", "公司简称": "alias", "上市日期": "ipo", "暂停上市日期": "exit"}, axis=1
)

sh_delisted["type"] = "stock"
sh_delisted["code"] = sh_delisted["code"].apply(lambda x: x + ".SH")
Expand All @@ -51,13 +63,9 @@ def fetch_stock_list()->DataFrame[SecurityInfoSchema]:

# 获取深市股票列表
sz = ak.stock_info_sz_name_code()
sz = sz.rename({
"A股代码": "code",
"A股简称": "alias",
"A股上市日期": "ipo"
}, axis=1)
sz = sz.rename({"A股代码": "code", "A股简称": "alias", "A股上市日期": "ipo"}, axis=1)

sz["code"] = sz["code"].apply(lambda x: x+".SZ")
sz["code"] = sz["code"].apply(lambda x: x + ".SZ")
sz["initials"] = sz["alias"].apply(_get_pinyin_initials)
sz["type"] = "stock"

Expand All @@ -69,12 +77,9 @@ def fetch_stock_list()->DataFrame[SecurityInfoSchema]:

# 补齐深市退市证券
sz_delisted = ak.stock_info_sz_delist("终止上市公司")
sz_delisted = sz_delisted.rename({
"证券代码": "code",
"证券简称": "alias",
"上市日期": "ipo",
"终止上市日期": "exit"
}, axis=1)
sz_delisted = sz_delisted.rename(
{"证券代码": "code", "证券简称": "alias", "上市日期": "ipo", "终止上市日期": "exit"}, axis=1
)

sz_delisted["type"] = "stock"
sz_delisted["code"] = sz_delisted["code"].apply(lambda x: x + ".SZ")
Expand All @@ -83,13 +88,9 @@ def fetch_stock_list()->DataFrame[SecurityInfoSchema]:

# 北交所
bj = ak.stock_info_bj_name_code()
bj = bj.rename({
"证券代码": "code",
"证券简称": "alias",
"上市日期": "ipo"
}, axis=1)
bj = bj.rename({"证券代码": "code", "证券简称": "alias", "上市日期": "ipo"}, axis=1)

bj["code"] = bj["code"].apply(lambda x: x+".BJ")
bj["code"] = bj["code"].apply(lambda x: x + ".BJ")
bj["initials"] = bj["alias"].apply(_get_pinyin_initials)
bj["type"] = "stock"
bj["exit"] = None
Expand All @@ -98,16 +99,23 @@ def fetch_stock_list()->DataFrame[SecurityInfoSchema]:

# 指数列表
index = ak.index_stock_info()
index = index.rename({
"index_code": "code",
"display_name": "alias",
"publish_date": "ipo"
}, axis=1)
index = index.rename(
{"index_code": "code", "display_name": "alias", "publish_date": "ipo"}, axis=1
)

index["code"] = index["code"].apply(lambda x: x+".SH" if x.startswith("000") else x+".SZ")
index["code"] = index["code"].apply(
lambda x: x + ".SH" if x.startswith("000") else x + ".SZ"
)
index["initials"] = index["alias"].apply(_get_pinyin_initials)
index["exit"] = None
index["type"] = "index"
index["ipo"] = index["ipo"].apply(lambda x: arrow.get(x).date())

return pd.concat([sh, sz, sh_delisted, sz_delisted, index], axis=0) #type: ignore

return pd.concat([sh, sz, sh_delisted, sz_delisted, index], axis=0) # type: ignore


@cheat_tqdm()
def fetch_calendar() -> Series[datetime.date]:
"""获取交易日历,返回从EPOCH以来的日历"""
df = ak.tool_trade_date_hist_sina()
return df[df.trade_date >= EPOCH]["trade_date"]
3 changes: 1 addition & 2 deletions omega/fetchers/tushare.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,4 @@ def fetch_stock_bars_daily(codes: List[str], start: datetime.date, end: datetime
"suspend_type": "suspended",
"turnover_rate": "turnover"
})

df["suspended"] = df.suspended.astype(int)
return df
4 changes: 3 additions & 1 deletion omega/master/tasks/rebuild_unclosed.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ async def _rebuild_min_level_unclosed_bars():
first_frame = bars[0]["frame"].item()
if no_first_bars > 50:
logger.warning("超过50支个股在缓存中没有09:31的分钟线,zillionare刚安装,还没来得及同步?")
DingTalkMessage.text("重建分钟级缓存数据时,超过50支个股在缓存中没有09:31的分钟线,zillionare刚安装,还没来得及同步?")
DingTalkMessage.text(
"重建分钟级缓存数据时,超过50支个股在缓存中没有09:31的分钟线,zillionare刚安装,还没来得及同步?"
)
return False
if first_frame.hour() != 9 and first_frame.minute() != 31:
no_first_bars += 1
Expand Down
4 changes: 2 additions & 2 deletions omega/worker/tasks/task_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ async def get_secs_for_sync(limit: int, n_bars: int, name: str):
Returns:
"""
while True:
step = limit // n_bars # 根据单次api允许获取的条数 和一共多少根k线 计算每次最多可以获取多少个股票的
step = limit // n_bars # 根据单次api允许获取的条数 和一共多少根k线 计算每次最多可以获取多少个股票的
p = cache.sys.pipeline()
p.lrange(name, 0, step - 1)
p.ltrim(name, step, -1)
Expand Down
Loading

0 comments on commit dec406e

Please sign in to comment.