Skip to content

Commit

Permalink
✨ add case files.
Browse files Browse the repository at this point in the history
  • Loading branch information
perillaroc committed Jan 17, 2022
1 parent 9e1b785 commit ad34c5c
Show file tree
Hide file tree
Showing 45 changed files with 2,624 additions and 1 deletion.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,8 @@ playground/
celerybeat-schedule
celerybeat.pid

.vscode
.vscode


sample
output
Empty file.
Empty file.
Empty file.
7 changes: 7 additions & 0 deletions reki_data_tool/postprocess/grid/gfs/abroad/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# CMACAST 国外低分辨率数据

抽取要素 + 重新排序 + 插值

wgrib2 + gribpost.exe

原任务:50秒
7 changes: 7 additions & 0 deletions reki_data_tool/postprocess/grid/gfs/downscaling/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# 降尺度数据

抽取要素 + 重新排序 + 区域裁剪 + 插值

gribpost.exe

原任务:5分钟
5 changes: 5 additions & 0 deletions reki_data_tool/postprocess/grid/gfs/ne/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# GFS 东北半球数据

全要素场 + 保持原有顺序 + 区域裁剪

原任务:串行,6分钟
Empty file.
45 changes: 45 additions & 0 deletions reki_data_tool/postprocess/grid/gfs/ne/gfs_ne_grib2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from pathlib import Path

import numpy as np
import pandas as pd
import xarray as xr

from loguru import logger
import eccodes
from tqdm.auto import tqdm

from reki.data_finder import find_local_file
from reki.format.grib.eccodes import load_message_from_file
from reki.format.grib.eccodes.operator import extract_region


def main():
file_path = find_local_file(
"grapes_gfs_gmf/grib2/orig",
start_time=pd.to_datetime("2021-09-01 00:00:00"),
forecast_time=pd.Timedelta(hours=24)
)

output_directory = "/g11/wangdp/project/work/data/playground/operation/gfs/ne/output"
output_file_path = Path(output_directory, "ne.grb2")

logger.info("count...")
with open(file_path, "rb") as f:
total_count = eccodes.codes_count_in_file(f)
logger.info(f"total count: {total_count}")
logger.info("count..done")

with open(output_file_path, "wb") as f:
for i in tqdm(range(1, total_count+1)):
message = load_message_from_file(file_path, count=i)
message = extract_region(
message,
0, 180, 89.875, 0.125
)
message_bytes = eccodes.codes_get_message(message)
f.write(message_bytes)
eccodes.codes_release(message)


if __name__ == "__main__":
main()
74 changes: 74 additions & 0 deletions reki_data_tool/postprocess/grid/gfs/ne/gfs_ne_grib2_dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from pathlib import Path

import numpy as np
import pandas as pd
import xarray as xr

from loguru import logger
import eccodes
from tqdm.auto import tqdm

import dask
from dask.distributed import Client, progress

from reki.data_finder import find_local_file
from reki.format.grib.eccodes import load_message_from_file
from reki.format.grib.eccodes.operator import extract_region


def get_message_bytes(
file_path,
count,
) -> bytes:
message = load_message_from_file(file_path, count=count)
message = extract_region(
message,
0, 180, 89.875, 0.125
)
message_bytes = eccodes.codes_get_message(message)
eccodes.codes_release(message)
return message_bytes


def main():
file_path = find_local_file(
"grapes_gfs_gmf/grib2/orig",
start_time=pd.to_datetime("2021-12-15 00:00:00"),
forecast_time=pd.Timedelta(hours=24)
)

output_directory = "/g11/wangdp/project/work/data/playground/operation/gfs/ne/output"
output_file_path = Path(output_directory, "ne_dask.grb2")

logger.info("count...")
with open(file_path, "rb") as f:
total_count = eccodes.codes_count_in_file(f)
logger.info(f"total count: {total_count}")
logger.info("count..done")

client = Client(
threads_per_worker=1,
)
print(client)

bytes_futures = []
for i in range(1, total_count+1):
f = client.submit(get_message_bytes, file_path, i)
bytes_futures.append(f)

with open(output_file_path, "wb") as f:
for i, fut in enumerate(bytes_futures):
message_bytes = client.gather(fut)
del fut
logger.info(f"writing message...{i + 1}/{total_count}")
f.write(message_bytes)
del message_bytes

client.close()


if __name__ == "__main__":
start_time = pd.Timestamp.now()
main()
end_time = pd.Timestamp.now()
print(end_time - start_time)
77 changes: 77 additions & 0 deletions reki_data_tool/postprocess/grid/gfs/ne/gfs_ne_grib2_dask_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from pathlib import Path

import numpy as np
import pandas as pd
import xarray as xr

from loguru import logger
import eccodes
from tqdm.auto import tqdm

import dask
from dask.distributed import Client, progress

from reki.data_finder import find_local_file
from reki.format.grib.eccodes import load_message_from_file
from reki.format.grib.eccodes.operator import extract_region


def get_message_bytes(
file_path,
count,
) -> bytes:
message = load_message_from_file(file_path, count=count)
# extract_region(
# message,
# 0, 180, 89.875, 0.125
# )
message_bytes = eccodes.codes_get_message(message)
eccodes.codes_release(message)
return b'1'


def main():
file_path = find_local_file(
"grapes_gfs_gmf/grib2/orig",
start_time=pd.to_datetime("2021-09-01 00:00:00"),
forecast_time=pd.Timedelta(hours=24)
)

output_directory = "/g11/wangdp/project/work/data/playground/operation/gfs/ne/output"
output_file_path = Path(output_directory, "ne_dask.grb2")

logger.info("count...")
with open(file_path, "rb") as f:
total_count = eccodes.codes_count_in_file(f)
logger.info(f"total count: {total_count}")
logger.info("count..done")

client = Client(
threads_per_worker=1,
)
print(client)

def get_object(x):
return x

with open(output_file_path, "wb") as f:
for batch in np.array_split(np.arange(1, total_count+1), np.ceil(total_count/32)):
bytes_lazy = []
for i in batch:
fut = dask.delayed(get_message_bytes)(file_path, i)
bytes_lazy.append(fut)
b = dask.delayed(get_object)(bytes_lazy)
b_future = b.persist()
bytes_result = b_future.compute()
del b_future

for i, b in enumerate(bytes_result):
logger.info(f"writing message...{i + batch[0]}/{total_count}")
f.write(b)
del b

client.close()


if __name__ == "__main__":
main()
5 changes: 5 additions & 0 deletions reki_data_tool/postprocess/grid/gfs/winter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# 冬奥数据

抽取要素

原任务:2秒
5 changes: 5 additions & 0 deletions reki_data_tool/postprocess/grid/gfs/wxzx/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# GFS卫星中心数据

抽取要素 + 重新排序 + 插值

原任务:串行,8分钟
Empty file.
Loading

0 comments on commit ad34c5c

Please sign in to comment.