Skip to content

Commit

Permalink
在透视模块高空要素检验功能中增加了地形mask功能
Browse files Browse the repository at this point in the history
在对高空气象要素进行检验时,如果某一层的部分区域处于地面以下,在检验时理应排除在统计样本之外,改进后增加了对这些功能的支持,可以通过para中的zs和gh参数读取地面的海拔高度和某一层的位势高度数据,从而实现扣除地下数据样本的功能
  • Loading branch information
liucouhua committed Oct 27, 2024
1 parent 9a72160 commit 41c834b
Showing 1 changed file with 66 additions and 14 deletions.
80 changes: 66 additions & 14 deletions meteva/perspact/middel_high.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@
"time_type": "UT", # 预报数据时间类型是北京时,即08时起报
"multiple": 1, # 当文件数据的单位是位势米,通过乘以9.8转换成位势
"move_fo_time":0,
"veri_by":"CRA40"
"veri_by":"CRA40",
"gh":{
"dirm": r"\\10.28.16.234\data2\AI\fuxi\Z\LLL\YYYYMMDDHH\YYYYMMDDHH.TTT.nc", #位势高度的文件路径
"read_method": meteva.base.read_griddata_from_nc, #读取位势高度的函数
"read_para": {}, #读取位势高度所用的函数参数
},
},

"pa": {
Expand All @@ -50,13 +55,24 @@
"time_type": "UT",#预报数据时间类型是北京时,即08时起报
"multiple":1, #当文件数据的单位是位势米,通过乘以9.8转换成位势
"move_fo_time": 12,
"veri_by": "CRA40"
"veri_by": "CRA40",
"gh": {
"dirm": r"\\10.28.16.234\data2\AI\Pangu_ERA5\Z\LLL\YYYYMMDDHH\YYYYMMDDHH.TTT.nc", #位势高度的文件路径
"read_method": meteva.base.read_griddata_from_nc, #读取位势高度的函数
"read_para": {}, #读取位势高度所用的函数参数
},
},
},

"zs": {
"path": r"\\10.28.16.234\data2\AI\fuxi\Z\LLL\YYYYMMDDHH\YYYYMMDDHH.TTT.nc", #地形高度的文件路径
"read_method": meteva.base.read_griddata_from_nc, #读取地形高度的函数
"read_para": {}, #读取地形高度的函数参数
}
}


def task_of_one_p(para,middle_result_path,v_df):
def task_of_one_p(para,middle_result_path,v_df,mid0 = None,zs = None):

time_cost_dict = {"total_cost":0,"veri_cost":0}
time_cost_total0 = time.time()
Expand All @@ -66,7 +82,10 @@ def task_of_one_p(para,middle_result_path,v_df):
else:
marker = meteva.perspact.get_grid_marker(grid0, step=para["step"])
mid_method = para["mid_method"]
mid_list = []
if mid0 is None:
mid_list = []
else:
mid_list = [mid0]
save_k = 0 # 用来在收集的过程间隔一段时间报错一下结果,避免程序错误时没有任何输出导致要重头再来
v_df_group, level_list = meteva.base.group(v_df, g="level")
for i in range(len(level_list)):
Expand Down Expand Up @@ -131,11 +150,15 @@ def task_of_one_p(para,middle_result_path,v_df):
if para_fo1["time_type"].lower() == "bt":
time1_ = time1 + datetime.timedelta(hours=8)
time1_ -= datetime.timedelta(hours=move_fo_time)
path1 = meteva.base.get_path(para_fo1["dirm"], time1_, dh + move_fo_time)
path1 = path1.replace("LLL", str(level))
time_cost0 = time.time()
grd_fo = para_fo1["read_method"](path1, grid=para["grid"], time=time1, dtime=dh, level=level,
data_name=model, show=True, **para_fo1["read_para"])
if para_fo1["dirm"] is not None:
path1 = meteva.base.get_path(para_fo1["dirm"], time1_, dh + move_fo_time)
path1 = path1.replace("LLL", str(level))
time_cost0 = time.time()
grd_fo = para_fo1["read_method"](path1, grid=para["grid"], time=time1, dtime=dh, level=level,
data_name=model, show=True, **para_fo1["read_para"])
else:
grd_fo = para_fo1["read_method"](grid=para["grid"], time=time1, dtime=dh, level=level,
data_name=model, show=True)
time_cost = time.time() - time_cost0
if model not in time_cost_dict.keys():
time_cost_dict[model] = 0
Expand All @@ -144,6 +167,23 @@ def task_of_one_p(para,middle_result_path,v_df):
if grd_fo is not None:
grd_fo.values *= para_fo1["multiple"]
time_cost0 = time.time()

#读取位势高度用于判断是否在地下
if zs is not None:
if para_fo1["gh"]["dirm"] is not None:
path1 = meteva.base.get_path(para_fo1["gh"]["dirm"], time1_, dh + move_fo_time)
path1 = path1.replace("LLL", str(level))
time_cost0 = time.time()
grd_gh = para_fo1["gh"]["read_method"](path1, grid=para["grid"], time=time1, dtime=dh,
level=level,
data_name=model, show=True, **para_fo1["gh"]["read_para"])
else:
grd_gh = para_fo1["gh"]["read_method"](grid=para["grid"], time=time1, dtime=dh, level=level,
data_name=model, show=True)

grd_fo.values[grd_gh.values<zs.values] = np.nan


df_mid = meteva.perspact.middle_df_grd(grd_obs, grd_fo, mid_method, marker=marker,
grade_list=para["grade_list"], compare=para["compare"])
time_cost = time.time() - time_cost0
Expand Down Expand Up @@ -177,7 +217,7 @@ def task_of_one_p(para,middle_result_path,v_df):
if not key in ["total_cost","veri_cost"]:
print("读取"+key+"耗时:"+str(time_cost_dict[key]))

return
return mid_all


def middle_of_score(para):
Expand All @@ -197,6 +237,7 @@ def middle_of_score(para):
mid_list = []
# 创建一个空的DataFrame,用来记录哪些中间量已经收集
df1 = pd.DataFrame(data=None, columns=['level', 'time', 'dtime', 'member'])
mid0 = None
if os.path.exists(middle_result_path):
if not para["recover"]:
mid0 = pd.read_hdf(middle_result_path)
Expand Down Expand Up @@ -244,8 +285,18 @@ def middle_of_score(para):
"member":v_member_list
})

zs = None
#读取地形高度
if "zs" in para.keys():
if "read_method" in para["zs"]:
if para["zs"]["read_method"] is not None:
zs = para["zs"]["read_method"](para["zs"]["path"],grid = para["grid"],**para["zs"]["read_para"])
if zs is None:
print("地形高度数据读取失败,请检查地形数据文件是否存在,地形数据读取函数和地形数据格式是否吻合")

if "cpu" not in para.keys() or para["cpu"] ==1:
task_of_one_p(para,middle_result_path,v_df)
task_of_one_p(para,middle_result_path,v_df,mid0 = mid0,zs= zs)

else:
#多进程并行方案

Expand All @@ -262,7 +313,7 @@ def middle_of_score(para):
pathlib.Path(middle_result_dir_sep).mkdir(parents=True, exist_ok=True) #创建文件夹


v_df["ob_time"] = v_df["time"] + v_df["dtime"]*np.timedelta64(1,"h")
v_df["ob_time"] = v_df["time"].astype("datetime64[us]") + v_df["dtime"]*np.timedelta64(1,"h")
v_df.sort_values(by=["level","ob_time"],inplace=True) #按层次、实况时间排序,可以尽可能让共用实况的任务靠近
ntask = len(v_df.index)
cpu = para["cpu"]
Expand All @@ -278,15 +329,16 @@ def middle_of_score(para):
middle_result_path_list.append(middle_result_dir_sep+"/"+str(i)+".h5")

#采用多进程统计中间量
meteva.base.multi_run(cpu,task_of_one_p,para=para,middle_result_path=middle_result_path_list,v_df = v_df_list)
meteva.base.multi_run(cpu,task_of_one_p,para=para,middle_result_path=middle_result_path_list,v_df = v_df_list,zs = zs)

#将中间量的结果进行合并
df_list = []
df_list = [mid0]
file_list = os.listdir(middle_result_dir_sep)
for file1 in file_list:
path = middle_result_dir_sep + "/" + file1
df = pd.read_hdf(path)
df_list.append(df)
os.remove(path)

mid_all = meteva.base.concat(df_list)
mid_all.sort_values(by=["time","dtime","level","member"],inplace=True)
Expand Down

0 comments on commit 41c834b

Please sign in to comment.