Skip to content

Commit

Permalink
Add internal abstract communicators in MPI layer
Browse files Browse the repository at this point in the history
This is preparatory work for checkpoint restart
  • Loading branch information
DUCARTON Nicolas authored and nfurmento committed Jan 23, 2025
1 parent c035b14 commit 0c5a61c
Show file tree
Hide file tree
Showing 17 changed files with 664 additions and 58 deletions.
4 changes: 2 additions & 2 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ check:
image: registry.gitlab.inria.fr/starpu/starpu-docker/ci-${NODE}
tags: ['ci.inria.fr', 'linux', 'large']

check_mpich:
check_mpi:
extends: .check_template
image: registry.gitlab.inria.fr/starpu/starpu-docker/ci-ubuntu1804
tags: ['ci.inria.fr', 'linux', 'large']
parallel:
matrix:
- SCRIPT: [./contrib/gitlab/mpich.sh, ./contrib/gitlab/mpich_struct.sh]
- SCRIPT: [./contrib/gitlab/mpich.sh, ./contrib/gitlab/mpich_struct.sh, ./contrib/gitlab/mpi_ulfm.sh]
script:
- ${SCRIPT}

Expand Down
21 changes: 21 additions & 0 deletions contrib/gitlab/mpi_ulfm.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/sh
# StarPU --- Runtime system for heterogeneous multicore architectures.
#
# Copyright (C) 2020-2025 University of Bordeaux, CNRS (LaBRI UMR 5800), Inria
#
# StarPU is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or (at
# your option) any later version.
#
# StarPU is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See the GNU Lesser General Public License in COPYING.LGPL for more details.
#

export STARPU_MICROBENCHS_DISABLED=1
export STARPU_CHECK_DIRS=mpi
export STARPU_USER_CONFIGURE_OPTIONS="--disable-socl --enable-mpi-ft --enable-mpi-ft-stats"
./contrib/ci.inria.fr/job-1-check.sh
6 changes: 4 additions & 2 deletions mpi/src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ noinst_HEADERS = \
nmad/starpu_mpi_nmad.h \
load_balancer/policy/data_movements_interface.h \
load_balancer/policy/load_data_interface.h \
load_balancer/policy/load_balancer_policy.h
load_balancer/policy/load_balancer_policy.h \
mpi_failure_tolerance/ulfm/starpu_mpi_ulfm_comm.h

if STARPU_USE_MPI_FT
noinst_HEADERS += \
Expand Down Expand Up @@ -139,7 +140,8 @@ libstarpumpi_@STARPU_EFFECTIVE_VERSION@_la_SOURCES += \
mpi_failure_tolerance/starpu_mpi_ft_service_comms.c \
mpi_failure_tolerance/starpu_mpi_checkpoint_package.c \
mpi_failure_tolerance/starpu_mpi_checkpoint_tracker.c \
mpi_failure_tolerance/starpu_mpi_ft_stats.c
mpi_failure_tolerance/starpu_mpi_ft_stats.c \
mpi_failure_tolerance/ulfm/starpu_mpi_ulfm_comm.c
endif STARPU_USE_MPI_FT

if STARPU_USE_FXT
Expand Down
39 changes: 29 additions & 10 deletions mpi/src/mpi/starpu_mpi_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
#include <starpu_mpi_private.h>
#include <mpi/starpu_mpi_comm.h>
#include <common/list.h>
#include <mpi_failure_tolerance/ulfm/starpu_mpi_ulfm_comm.h>

#ifdef STARPU_USE_MPI_MPI

struct _starpu_mpi_comm
{
MPI_Comm comm;
MPI_Comm app_comm;
starpu_mpi_comm internal_comm;
struct _starpu_mpi_envelope *envelope;
MPI_Request request;
int posted;
Expand Down Expand Up @@ -92,9 +94,10 @@ void _starpu_mpi_comm_shutdown()
void _starpu_mpi_comm_register(MPI_Comm comm)
{
struct _starpu_mpi_comm_hashtable *found;
starpu_mpi_comm internal_comm = _starpu_mpi_ulfm_get_mpi_comm_from_key(comm);

STARPU_PTHREAD_RWLOCK_RDLOCK(&_starpu_mpi_comms_mutex);
HASH_FIND(hh, _starpu_mpi_comms_cache, &comm, sizeof(MPI_Comm), found);
HASH_FIND(hh, _starpu_mpi_comms_cache, &internal_comm, sizeof(internal_comm), found);
STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_mpi_comms_mutex);
if (found)
{
Expand All @@ -103,7 +106,7 @@ void _starpu_mpi_comm_register(MPI_Comm comm)
}

STARPU_PTHREAD_RWLOCK_WRLOCK(&_starpu_mpi_comms_mutex);
HASH_FIND(hh, _starpu_mpi_comms_cache, &comm, sizeof(MPI_Comm), found);
HASH_FIND(hh, _starpu_mpi_comms_cache, &internal_comm, sizeof(internal_comm), found);
if (found)
{
_STARPU_MPI_DEBUG(10, "comm %ld (%ld) already registered in between\n", (long int)comm, (long int)MPI_COMM_WORLD);
Expand All @@ -119,14 +122,15 @@ void _starpu_mpi_comm_register(MPI_Comm comm)
_STARPU_MPI_DEBUG(10, "registering comm %ld (%ld) number %d\n", (long int)comm, (long int)MPI_COMM_WORLD, _starpu_mpi_comm_nb);
struct _starpu_mpi_comm *_comm;
_STARPU_MPI_CALLOC(_comm, 1, sizeof(struct _starpu_mpi_comm));
_comm->comm = comm;
_comm->internal_comm = internal_comm;
_comm->app_comm = comm;
_STARPU_MPI_CALLOC(_comm->envelope, 1,sizeof(struct _starpu_mpi_envelope));
_comm->posted = 0;
_starpu_mpi_comms[_starpu_mpi_comm_nb] = _comm;
_starpu_mpi_comm_nb++;
struct _starpu_mpi_comm_hashtable *entry;
_STARPU_MPI_MALLOC(entry, sizeof(*entry));
entry->comm = comm;
entry->comm = internal_comm;
HASH_ADD(hh, _starpu_mpi_comms_cache, comm, sizeof(entry->comm), entry);

#ifdef STARPU_SIMGRID
Expand All @@ -148,9 +152,9 @@ void _starpu_mpi_comm_post_recv()
struct _starpu_mpi_comm *_comm = _starpu_mpi_comms[i]; // get the ith _comm;
if (_comm->posted == 0)
{
_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop on comm %d %ld\n", i, (long int)_comm->comm);
_STARPU_MPI_COMM_FROM_DEBUG(_comm->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, (int64_t)_STARPU_MPI_TAG_ENVELOPE, _comm->comm);
MPI_Irecv(_comm->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, _comm->comm, &_comm->request);
_STARPU_MPI_DEBUG(3, "Posting a receive to get a data envelop on comm %d %ld\n", i, (long int)_comm->app_comm);
_STARPU_MPI_COMM_FROM_DEBUG(_comm->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, (int64_t)_STARPU_MPI_TAG_ENVELOPE, _comm->internal_comm);
MPI_Irecv(_comm->envelope, sizeof(struct _starpu_mpi_envelope), MPI_BYTE, MPI_ANY_SOURCE, _STARPU_MPI_TAG_ENVELOPE, _comm->internal_comm, &_comm->request);
#ifdef STARPU_SIMGRID
_starpu_mpi_simgrid_wait_req(&_comm->request, &_comm->status, &_comm->queue, &_comm->done);
#endif
Expand All @@ -160,7 +164,7 @@ void _starpu_mpi_comm_post_recv()
STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_mpi_comms_mutex);
}

int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope **envelope, MPI_Comm *comm)
int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope **envelope, starpu_mpi_comm *comm, MPI_Comm *app_comm)
{
int i=_starpu_mpi_comm_tested;

Expand All @@ -187,7 +191,8 @@ int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope *
if (_starpu_mpi_comm_tested == _starpu_mpi_comm_nb)
_starpu_mpi_comm_tested = 0;
*envelope = _comm->envelope;
*comm = _comm->comm;
*comm = _comm->internal_comm;
*app_comm = _comm->app_comm;
STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_mpi_comms_mutex);
return 1;
}
Expand Down Expand Up @@ -231,4 +236,18 @@ void _starpu_mpi_comm_cancel_recv()
STARPU_PTHREAD_RWLOCK_UNLOCK(&_starpu_mpi_comms_mutex);
}

int _starpu_mpi_comm_all_posted()
{
int i;
int posted = 0;

for(i=0 ; i<_starpu_mpi_comm_nb ; i++)
{
struct _starpu_mpi_comm *_comm = _starpu_mpi_comms[i];
if (_comm->posted == 1)
posted ++;
}
return (posted == _starpu_mpi_comm_nb);
}

#endif /* STARPU_USE_MPI_MPI */
3 changes: 2 additions & 1 deletion mpi/src/mpi/starpu_mpi_comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ void _starpu_mpi_comm_init(MPI_Comm comm);
void _starpu_mpi_comm_shutdown();
void _starpu_mpi_comm_register(MPI_Comm comm);
void _starpu_mpi_comm_post_recv();
int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope **envelope, MPI_Comm *comm);
int _starpu_mpi_comm_test_recv(MPI_Status *status, struct _starpu_mpi_envelope **envelope, starpu_mpi_comm *comm, MPI_Comm *app_comm);
void _starpu_mpi_comm_cancel_recv();
int _starpu_mpi_comm_all_posted();

#ifdef __cplusplus
}
Expand Down
Loading

0 comments on commit 0c5a61c

Please sign in to comment.