forked from rhashimoto/poolqueue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMPI.hpp
176 lines (151 loc) · 6.25 KB
/
MPI.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/*
Copyright 2015 Shoestring Research, LLC. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef poolqueue_MPI_hpp
#define poolqueue_MPI_hpp
#include <chrono>
#include <functional>
#include <memory>
#include <typeinfo>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/serialization/export.hpp>
#include "Promise.hpp"
#include "ThreadPool.hpp"
namespace poolqueue {
namespace detail {
class ExitProcedure;
class ResultProcedure;
}
// Abstract base class for a serializable functor that does not
// fulfil a completion Promise. A Procedure instance passed to
// MPI::call() is fire-and-forget; i.e. it does not return a
// value or completion notification.
class Procedure {
friend class boost::serialization::access;
template<class Archive>
void serialize(Archive&, const unsigned int) {
}
public:
virtual ~Procedure() {}
virtual void operator()() const = 0;
};
// Abstract base class for a serializable functor that *does*
// fulfil a completion Promise. The type passed via the returned
// Promise must be serializable and registered with
// registerType<T>(). Exceptions cannot be returned.
class Function {
friend class boost::serialization::access;
template<class Archive>
void serialize(Archive&, const unsigned int) {
}
public:
class UnregisteredReturnType : public std::bad_cast {
const std::string msg_;
public:
UnregisteredReturnType(const std::string& msg)
: msg_("Unregistered type returned from Function: " + msg) {
}
virtual const char *what() const noexcept {
return msg_.c_str();
}
};
virtual ~Function() {}
virtual Promise operator()() const = 0;
};
class MPI {
public:
static int rank();
static int size();
static std::string processName();
// To make a remote call, define a subclass of Procedure or
// Function that can be serialized using boost::serialization
// via a pointer to the base class. Details are in the Boost
// docs, see especially:
//
// http://www.boost.org/doc/libs/1_57_0/libs/serialization/doc/serialization.html#derivedpointers
// http://www.boost.org/doc/libs/1_57_0/libs/serialization/doc/special.html#export
//
// At runtime, use call() to invoke a functor instance on an
// MPI rank. The original functor instance is not referenced
// after call() returns (i.e. there is no dangling reference
// if it is immediately destroyed). In the case of a
// Function, the returned Promise can be used for
// synchronization and access to the returned value.
static void call(int rank, const Procedure& f);
static Promise call(int rank, const Function& f);
// Procedure and Function subclass instances execute on a
// ThreadPool for each MPI rank. This function provides
// access to the local pool.
static ThreadPool& pool();
// This invokes a function on the MPI thread for low-level
// MPI (or boost::mpi) usage, as MPI implementations are
// not necessarily thread-safe.
static void post(const std::function<void()>& f);
// This establishes a synchronization point for all MPI
// ranks. The returned Promise is resolved when all ranks
// have executed the call (note that the call itself is
// non-blocking).
static Promise synchronize();
// Because the underlying MPI library may not be thread-safe,
// polling is used to check for new messages (a check is also
// made each time a message is sent). setPollInterval() allows
// the default polling interval (20 ms) to be adjusted.
template<typename Duration>
static void setPollInterval(const Duration& interval) {
const auto micros = std::chrono::duration_cast<std::chrono::microseconds>(interval);
setPollInterval(micros);
}
static void setPollInterval(const std::chrono::microseconds& interval);
// This registers a serializable type so it can be returned
// from a Function subclass via a Promise. Types must be
// registered in the same order on all ranks.
//
// Care must be taken to ensure that a type is registered on
// a rank before remote calls that use that type are received
// from other ranks. The recommended pattern is to register
// all types before any Functions are called, followed by a
// synchronize().
template<typename T>
static void registerType() {
registerType(
typeid(T),
&MPI::saveValue<T>,
&MPI::loadValue<T>);
}
private:
struct Pimpl;
friend class detail::ExitProcedure;
friend class detail::ResultProcedure;
typedef boost::archive::binary_iarchive IArchive;
typedef boost::archive::binary_oarchive OArchive;
template<typename T>
static void saveValue(OArchive& ar, const Promise::Value& value) {
ar << value.cast<const T&>();;
}
template<typename T>
static void loadValue(IArchive& ar, Promise::Value& value) {
value = T();
ar >> value.cast<T&>();
}
typedef std::function<void(OArchive&, const Promise::Value&)> SaveFunc;
typedef std::function<void(IArchive&, Promise::Value&)> LoadFunc;
static void registerType(
const std::type_info& type,
const SaveFunc& saveFunc,
const LoadFunc& loadFunc);
static const SaveFunc& getSaveFunc(const std::type_info& type);
static const LoadFunc& getLoadFunc(const uint32_t index);
};
} // namespace poolqueue
#endif // poolqueue_MPI_hpp