Skip to content

Commit

Permalink
add parallel options
Browse files Browse the repository at this point in the history
  • Loading branch information
James Halgren committed Jan 2, 2023
1 parent 07cf754 commit e8caaf9
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 19 deletions.
40 changes: 28 additions & 12 deletions ngen_forcing/TestConvert_NWMForcing_to_Ngen.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ def get_forcing_dict_newway(


def get_forcing_dict_newway_parallel(
feature_list,
folder_prefix,
file_list,
):
feature_list,
folder_prefix,
file_list,
para="thread",
para_n=2,
):

reng = "rasterio"
_xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng)
Expand All @@ -67,8 +69,14 @@ def get_forcing_dict_newway_parallel(
)
filehandles = [xr.open_dataset("data/" + f) for f in file_list]

with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
# with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
if para == "process":
pool = concurrent.futures.ProcessPoolExecutor
elif para == "thread":
pool = concurrent.futures.ThreadPoolExecutor
else:
pool = concurrent.futures.ThreadPoolExecutor

with pool(max_workers=para_n) as executor:
stats = []
future_list = []

Expand Down Expand Up @@ -130,10 +138,12 @@ def get_forcing_dict_newway_inverted(


def get_forcing_dict_newway_inverted_parallel(
feature_list,
folder_prefix,
file_list,
):
feature_list,
folder_prefix,
file_list,
para="thread",
para_n=2,
):

import concurrent.futures

Expand Down Expand Up @@ -161,8 +171,14 @@ def get_forcing_dict_newway_inverted_parallel(
stats = []
future_list = []

with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
# with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
if para == "process":
pool = concurrent.futures.ProcessPoolExecutor
elif para == "thread":
pool = concurrent.futures.ThreadPoolExecutor
else:
pool = concurrent.futures.ThreadPoolExecutor

with pool(max_workers=para_n) as executor:

for f in filehandles:
print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r")
Expand Down
39 changes: 32 additions & 7 deletions ngen_forcing/process_nwm_forcing.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ def main():
"SWDOWN",
]

file_list = list_of_files[0:30]
gpkg_subset = gpkg_divides[0:2000]
#file_list = list_of_files[0:3]
#gpkg_subset = gpkg_divides[0:20]
# file_list = list_of_files[0:30]
# gpkg_subset = gpkg_divides[0:2000]
file_list = list_of_files[0:3]
gpkg_subset = gpkg_divides[0:200]
feature_list = gpkg_subset.geometry.to_list()

# start_time = time.time()
Expand All @@ -132,14 +132,26 @@ def main():
print(time.time() - start_time)

start_time = time.time()
print(f"Working on the new way with threading. (It's not much better.)")
print(f"Working on the new way with threading parallel.")
fd3 = get_forcing_dict_newway_parallel(
feature_list,
folder_prefix,
file_list,
)
para="thread",
para_n=16,
)
print(time.time() - start_time)

start_time = time.time()
print(f"Working on the new way with process parallel.")
fd3 = get_forcing_dict_newway_parallel(
feature_list,
folder_prefix,
file_list,
para="process",
para_n=16,
)
print(time.time() - start_time)

start_time = time.time()
print(f"Working on the new way with loops reversed.")
Expand All @@ -151,11 +163,24 @@ def main():
print(time.time() - start_time)

start_time = time.time()
print(f"Working on the new way with loops reversed with threading.")
print(f"Working on the new way with loops reversed with threading parallel.")
fd4 = get_forcing_dict_newway_inverted_parallel(
feature_list,
folder_prefix,
file_list,
para="thread",
para_n=16,
)
print(time.time() - start_time)

start_time = time.time()
print(f"Working on the new way with loops reversed with process parallel.")
fd4 = get_forcing_dict_newway_inverted_parallel(
feature_list,
folder_prefix,
file_list,
para="process",
para_n=16,
)
print(time.time() - start_time)

Expand Down

0 comments on commit e8caaf9

Please sign in to comment.