Skip to content

Commit

Permalink
Merge pull request #382 from zhangsp8/master
Browse files Browse the repository at this point in the history
Optimizing communications in catchment lateral flow subroutines
  • Loading branch information
CoLM-SYSU authored Feb 17, 2025
2 parents ec67969 + c345545 commit d604b6d
Show file tree
Hide file tree
Showing 13 changed files with 511 additions and 338 deletions.
85 changes: 45 additions & 40 deletions main/HYDRO/MOD_Catch_BasinNetwork.F90
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ MODULE MOD_Catch_BasinNetwork
! -- instances --
integer :: numbasin
integer, allocatable :: basinindex(:)

integer :: numrivmth
integer, allocatable :: rivermouth(:)

integer :: numbsnhru
type(subset_type) :: basin_hru
Expand All @@ -28,6 +31,7 @@ MODULE MOD_Catch_BasinNetwork
integer, allocatable :: paddr (:)
integer, allocatable :: ndata (:)
integer, allocatable :: ipush (:)
integer, allocatable :: isdrv (:)
CONTAINS
final :: basin_pushdata_free_mem
END type basin_pushdata_type
Expand Down Expand Up @@ -231,13 +235,29 @@ SUBROUTINE build_basin_network ()
ENDIF
ENDDO

allocate (rivermouth (totalnumbasin))
ithis = 0
DO i = totalnumbasin, 1, -1
j = basindown(b_up2down(i))
IF (j <= 0) THEN
ithis = ithis + 1
rivermouth(b_up2down(i)) = ithis
ELSE
rivermouth(b_up2down(i)) = rivermouth(j)
ENDIF
ENDDO

numrivmth = ithis

deallocate (b_up2down)
deallocate (nups_all )
deallocate (orderbsn )
deallocate (nelm_wrk )

ENDIF

CALL mpi_bcast (numrivmth, 1, MPI_INTEGER, p_address_master, p_comm_glb, p_err)

! 3-3: send basin index to workers
IF (p_is_master) THEN

Expand Down Expand Up @@ -266,6 +286,10 @@ SUBROUTINE build_basin_network ()
CALL mpi_send (icache, nbasin, MPI_INTEGER, p_address_worker(iworker), &
mpi_tag_data, p_comm_glb, p_err)

icache = rivermouth(bindex)
CALL mpi_send (icache, nbasin, MPI_INTEGER, p_address_worker(iworker), &
mpi_tag_data, p_comm_glb, p_err)

nhru_in_bsn = nhru_all(bindex)
CALL mpi_send (nhru_in_bsn, nbasin, MPI_INTEGER, p_address_worker(iworker), &
mpi_tag_data, p_comm_glb, p_err)
Expand Down Expand Up @@ -297,6 +321,10 @@ SUBROUTINE build_basin_network ()
CALL mpi_recv (basindown, numbasin, MPI_INTEGER, p_address_master, &
mpi_tag_data, p_comm_glb, p_stat, p_err)

allocate (rivermouth (numbasin))
CALL mpi_recv (rivermouth, numbasin, MPI_INTEGER, p_address_master, &
mpi_tag_data, p_comm_glb, p_stat, p_err)

allocate (nhru_in_bsn (numbasin))
CALL mpi_recv (nhru_in_bsn, numbasin, MPI_INTEGER, p_address_master, &
mpi_tag_data, p_comm_glb, p_stat, p_err)
Expand Down Expand Up @@ -592,15 +620,14 @@ END SUBROUTINE build_basin_network


! ----------
SUBROUTINE worker_push_data_real8 (send_pointer, recv_pointer, accum, vec_send, vec_recv)
SUBROUTINE worker_push_data_real8 (send_pointer, recv_pointer, vec_send, vec_recv)

USE MOD_Precision
USE MOD_SPMD_Task
IMPLICIT NONE

type(basin_pushdata_type) :: send_pointer
type(basin_pushdata_type) :: recv_pointer
logical, intent(in) :: accum

real(r8), intent(in) :: vec_send(:)
real(r8), intent(inout) :: vec_recv(:)
Expand All @@ -619,14 +646,7 @@ SUBROUTINE worker_push_data_real8 (send_pointer, recv_pointer, accum, vec_send,
IF (p_is_worker) THEN

IF (send_pointer%nself > 0) THEN
IF (.not. accum) THEN
vec_recv(recv_pointer%iself) = vec_send(send_pointer%iself)
ELSE
DO i = 1, send_pointer%nself
vec_recv(recv_pointer%iself(i)) = &
vec_recv(recv_pointer%iself(i)) + vec_send(send_pointer%iself(i))
ENDDO
ENDIF
vec_recv(recv_pointer%iself) = vec_send(send_pointer%iself)
ENDIF

#ifdef USEMPI
Expand Down Expand Up @@ -675,17 +695,8 @@ SUBROUTINE worker_push_data_real8 (send_pointer, recv_pointer, accum, vec_send,
ENDIF

IF (recv_pointer%nproc > 0) THEN

CALL mpi_waitall(recv_pointer%nproc, req_recv, MPI_STATUSES_IGNORE, p_err)

IF (accum) THEN
DO i = 1, ndatarecv
vec_recv(recv_pointer%ipush(i)) = &
vec_recv(recv_pointer%ipush(i)) + recvcache(i)
ENDDO
ELSE
vec_recv(recv_pointer%ipush) = recvcache
ENDIF
vec_recv(recv_pointer%ipush) = recvcache
ENDIF

IF (send_pointer%nproc > 0) THEN
Expand All @@ -705,15 +716,14 @@ SUBROUTINE worker_push_data_real8 (send_pointer, recv_pointer, accum, vec_send,
END SUBROUTINE worker_push_data_real8

! ----------
SUBROUTINE worker_push_data_int32 (send_pointer, recv_pointer, accum, vec_send, vec_recv)
SUBROUTINE worker_push_data_int32 (send_pointer, recv_pointer, vec_send, vec_recv)

USE MOD_Precision
USE MOD_SPMD_Task
IMPLICIT NONE

type(basin_pushdata_type) :: send_pointer
type(basin_pushdata_type) :: recv_pointer
logical, intent(in) :: accum

integer, intent(in) :: vec_send(:)
integer, intent(inout) :: vec_recv(:)
Expand All @@ -732,14 +742,7 @@ SUBROUTINE worker_push_data_int32 (send_pointer, recv_pointer, accum, vec_send,
IF (p_is_worker) THEN

IF (send_pointer%nself > 0) THEN
IF (.not. accum) THEN
vec_recv(recv_pointer%iself) = vec_send(send_pointer%iself)
ELSE
DO i = 1, send_pointer%nself
vec_recv(recv_pointer%iself(i)) = &
vec_recv(recv_pointer%iself(i)) + vec_send(send_pointer%iself(i))
ENDDO
ENDIF
vec_recv(recv_pointer%iself) = vec_send(send_pointer%iself)
ENDIF

#ifdef USEMPI
Expand Down Expand Up @@ -788,17 +791,8 @@ SUBROUTINE worker_push_data_int32 (send_pointer, recv_pointer, accum, vec_send,
ENDIF

IF (recv_pointer%nproc > 0) THEN

CALL mpi_waitall(recv_pointer%nproc, req_recv, MPI_STATUSES_IGNORE, p_err)

IF (accum) THEN
DO i = 1, ndatarecv
vec_recv(recv_pointer%ipush(i)) = &
vec_recv(recv_pointer%ipush(i)) + recvcache(i)
ENDDO
ELSE
vec_recv(recv_pointer%ipush) = recvcache
ENDIF
vec_recv(recv_pointer%ipush) = recvcache
ENDIF

IF (send_pointer%nproc > 0) THEN
Expand Down Expand Up @@ -1003,6 +997,16 @@ SUBROUTINE worker_push_subset_data (send_pointer, recv_pointer, &

END SUBROUTINE worker_push_subset_data

! ---------
SUBROUTINE basin_network_final ()

IMPLICIT NONE

IF (allocated(basinindex)) deallocate(basinindex)
IF (allocated(rivermouth)) deallocate(rivermouth)

END SUBROUTINE basin_network_final

! ---------
SUBROUTINE basin_pushdata_free_mem (this)

Expand All @@ -1013,6 +1017,7 @@ SUBROUTINE basin_pushdata_free_mem (this)
IF (allocated(this%paddr)) deallocate(this%paddr)
IF (allocated(this%ndata)) deallocate(this%ndata)
IF (allocated(this%ipush)) deallocate(this%ipush)
IF (allocated(this%isdrv)) deallocate(this%isdrv)

END SUBROUTINE basin_pushdata_free_mem

Expand Down
6 changes: 3 additions & 3 deletions main/HYDRO/MOD_Catch_Hist.F90
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ SUBROUTINE hist_basin_out (file_hist, idate)
END WHERE
ENDIF

CALL worker_push_data (iam_bsn, iam_elm, .false., a_wdsrf_bsn, a_wdsrf_elm)
CALL worker_push_data (iam_bsn, iam_elm, a_wdsrf_bsn, a_wdsrf_elm)

CALL vector_write_basin (&
file_hist_basin, a_wdsrf_elm, numelm, totalnumelm, 'wdsrf_bsn', 'basin', elm_data_address, &
Expand All @@ -177,7 +177,7 @@ SUBROUTINE hist_basin_out (file_hist, idate)
END WHERE
ENDIF

CALL worker_push_data (iam_bsn, iam_elm, .false., a_veloc_riv, a_veloc_elm)
CALL worker_push_data (iam_bsn, iam_elm, a_veloc_riv, a_veloc_elm)

CALL vector_write_basin (&
file_hist_basin, a_veloc_elm, numelm, totalnumelm, 'veloc_riv', 'basin', elm_data_address, &
Expand All @@ -190,7 +190,7 @@ SUBROUTINE hist_basin_out (file_hist, idate)
END WHERE
ENDIF

CALL worker_push_data (iam_bsn, iam_elm, .false., a_discharge, a_dschg_elm)
CALL worker_push_data (iam_bsn, iam_elm, a_discharge, a_dschg_elm)

CALL vector_write_basin (&
file_hist_basin, a_dschg_elm, numelm, totalnumelm, 'discharge', 'basin', elm_data_address, &
Expand Down
7 changes: 4 additions & 3 deletions main/HYDRO/MOD_Catch_LateralFlow.F90
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ SUBROUTINE lateral_flow (deltime)
#ifdef RangeCheck
IF (p_is_worker .and. (p_iam_worker == 0)) THEN
write(*,'(/,A)') 'Checking Lateral Flow Variables ...'
write(*,'(A,F12.5,A)') 'River Lake Flow average timestep: ', &
write(*,'(A,F12.5,A)') 'River Lake Flow minimum average timestep: ', &
dt_average/nsubstep, ' seconds'
ENDIF

Expand Down Expand Up @@ -265,7 +265,7 @@ SUBROUTINE lateral_flow (deltime)
CALL mpi_allreduce (MPI_IN_PLACE, toldis, 1, MPI_REAL8, MPI_SUM, p_comm_worker, p_err)
#endif
IF (p_iam_worker == 0) THEN
write(*,'(A,F10.2,A,ES10.3,A,ES10.3,A)') 'Total surface water error: ', dtolw, &
write(*,'(A,F12.2,A,ES8.1,A,ES10.3,A)') 'Total surface water error: ', dtolw, &
'(m^3) in area ', landarea, '(m^2), discharge ', toldis, '(m^3)'
ENDIF

Expand All @@ -275,7 +275,7 @@ SUBROUTINE lateral_flow (deltime)
CALL mpi_allreduce (MPI_IN_PLACE, dtolw, 1, MPI_REAL8, MPI_SUM, p_comm_worker, p_err)
#endif
IF (p_iam_worker == 0) THEN
write(*,'(A,F10.2,A,ES10.3,A)') 'Total ground water error: ', dtolw, &
write(*,'(A,F12.2,A,ES8.1,A)') 'Total ground water error: ', dtolw, &
'(m^3) in area ', landarea, '(m^2)'
ENDIF
ENDIF
Expand All @@ -291,6 +291,7 @@ SUBROUTINE lateral_flow_final ()

CALL river_lake_network_final ()
CALL subsurface_network_final ()
CALL basin_network_final ()

#ifdef CoLMDEBUG
IF (allocated(patcharea)) deallocate(patcharea)
Expand Down
Loading

0 comments on commit d604b6d

Please sign in to comment.