forked from AliceO2Group/AliceO2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathO2SimDevice.h
349 lines (305 loc) · 13.4 KB
/
O2SimDevice.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
/// @author Sandro Wenzel
#ifndef ALICEO2_DEVICES_SIMDEVICE_H_
#define ALICEO2_DEVICES_SIMDEVICE_H_
#include <memory>
#include <fairmq/Message.h>
#include <fairmq/Device.h>
#include <fairmq/Parts.h>
#include <fairlogger/Logger.h>
#include "../macro/o2sim.C"
#include "TVirtualMC.h"
#include "TMessage.h"
#include <DetectorsBase/Stack.h>
#include <DetectorsBase/VMCSeederService.h>
#include <SimulationDataFormat/PrimaryChunk.h>
#include <TRandom.h>
#include <SimConfig/SimConfig.h>
#include <cstring>
#include "PrimaryServerState.h"
// a helper for logging with worker index prefixed
void doLogInfo(int workerID, std::string const& message)
{
LOG(info) << "[W" << workerID << "] " << message;
}
namespace o2
{
namespace devices
{
class TMessageWrapper : public TMessage
{
public:
TMessageWrapper(void* buf, Int_t len) : TMessage(buf, len) { ResetBit(kIsOwner); }
~TMessageWrapper() override = default;
};
// device representing a simulation worker
class O2SimDevice final : public fair::mq::Device
{
public:
O2SimDevice() = default;
O2SimDevice(o2::steer::O2MCApplication* vmcapp, TVirtualMC* vmc) : mVMCApp{vmcapp}, mVMC{vmc} {}
/// Default destructor
~O2SimDevice() final
{
FairSystemInfo sysinfo;
o2::utils::ShmManager::Instance().release();
LOG(info) << "Shutting down O2SimDevice";
LOG(info) << "TIME-STAMP " << mTimer.RealTime() << "\t";
LOG(info) << "MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " " << sysinfo.GetMaxMemory() << " MB\n";
}
protected:
/// Overloads the InitTask() method of fair::mq::Device
void InitTask() final
{
// in the initialization phase we will init the simulation
// NOTE: In a fair::mq::Device this is better done here (instead of outside) since
// we have to setup simulation + worker in the same thread (due to many threadlocal variables
// in the simulation) ... at least as long fair::mq::Device is not spawning workers on the master thread
initSim(GetChannels().at("o2sim-primserv-info").at(0), mSimRun);
// set the vmc and app pointers
mVMC = TVirtualMC::GetMC();
mVMCApp = static_cast<o2::steer::O2MCApplication*>(TVirtualMCApplication::Instance());
lateInit();
}
static void CustomCleanup(void* data, void* hint) { delete static_cast<std::string*>(hint); }
public:
void lateInit()
{
// late init
mVMCApp->initLate();
}
// should go into a helper
// this function queries the sim config data and initializes the SimConfig singleton
// returns true if successful / false if not
static bool querySimConfig(fair::mq::Channel& channel)
{
// auto text = new std::string("configrequest");
// std::unique_ptr<fair::mq::Message> request(channel.NewMessage(const_cast<char*>(text->c_str()),
// text->length(), CustomCleanup, text));
std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage(O2PrimaryServerInfoRequest::Config));
std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
int timeoutinMS = 60000; // wait for 60s max --> should be fast reply
if (channel.Send(request, timeoutinMS) > 0) {
LOG(info) << "Waiting for configuration answer ";
if (channel.Receive(reply, timeoutinMS) > 0) {
LOG(info) << "Configuration answer received, containing " << reply->GetSize() << " bytes ";
// the answer is a TMessage containing the simulation Configuration
auto message = std::make_unique<o2::devices::TMessageWrapper>(reply->GetData(), reply->GetSize());
auto config = static_cast<o2::conf::SimConfigData*>(message.get()->ReadObjectAny(message.get()->GetClass()));
if (!config) {
return false;
}
LOG(info) << "COMMUNICATED ENGINE " << config->mMCEngine;
auto& conf = o2::conf::SimConfig::Instance();
conf.resetFromConfigData(*config);
FairLogger::GetLogger()->SetLogVerbosityLevel(conf.getLogVerbosity().c_str());
delete config;
} else {
LOG(error) << "No configuration received within " << timeoutinMS << "ms\n";
return false;
}
} else {
LOG(error) << "Could not send configuration request within " << timeoutinMS << "ms\n";
return false;
}
return true;
}
// initializes the simulation classes; queries the configuration on a given channel
static bool initSim(fair::mq::Channel& channel, std::unique_ptr<FairRunSim>& simptr)
{
if (!querySimConfig(channel)) {
return false;
}
LOG(info) << "Setting up the simulation ...";
simptr = std::move(std::unique_ptr<FairRunSim>(o2sim_init(true)));
FairSystemInfo sysinfo;
// to finish initialization (trigger further cross section table building etc) -- which especially
// G4 is doing at the first ProcessRun
// The goal is to have everything setup before we fork
TVirtualMC::GetMC()->ProcessRun(0);
LOG(info) << "MEM-STAMP END OF SIM INIT" << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
<< sysinfo.GetMaxMemory() << " MB\n";
return true;
}
bool isWorkAvailable(fair::mq::Channel& statuschannel, int workerID = -1)
{
std::stringstream str;
str << "[W" << workerID << "]";
auto workerStr = str.str();
int timeoutinMS = 2000; // wait for 2s max
bool reprobe = true;
while (reprobe) {
reprobe = false;
int i = -1;
fair::mq::MessagePtr request(statuschannel.NewSimpleMessage(O2PrimaryServerInfoRequest::Status));
fair::mq::MessagePtr reply(statuschannel.NewSimpleMessage(i));
auto sendcode = statuschannel.Send(request, timeoutinMS);
if (sendcode > 0) {
LOG(info) << workerStr << " Waiting for status answer ";
auto code = statuschannel.Receive(reply, timeoutinMS);
if (code > 0) {
int state(*((int*)(reply->GetData())));
if (state == (int)o2::O2PrimaryServerState::ReadyToServe) {
LOG(info) << workerStr << " SERVER IS SERVING";
return true;
} else if (state == (int)o2::O2PrimaryServerState::Initializing) {
LOG(info) << workerStr << " SERVER IS STILL INITIALIZING";
reprobe = true;
sleep(1);
} else if (state == (int)o2::O2PrimaryServerState::WaitingEvent) {
LOG(info) << workerStr << " SERVER IS WAITING FOR EVENT";
reprobe = true;
sleep(1);
} else if (state == (int)o2::O2PrimaryServerState::Idle) {
LOG(info) << workerStr << " SERVER IS IDLE";
return false;
} else {
LOG(info) << workerStr << " SERVER STATE UNKNOWN OR STOPPED";
}
} else {
LOG(error) << workerStr << " STATUS REQUEST UNSUCCESSFUL";
}
}
}
return false;
}
bool Kernel(int workerID, fair::mq::Channel& requestchannel, fair::mq::Channel& dataoutchannel, fair::mq::Channel* statuschannel = nullptr)
{
static int counter = 0;
bool reproducibleSim = true;
if (getenv("O2_DISABLE_REPRODUCIBLE_SIM")) {
reproducibleSim = false;
}
// Mainly for debugging reasons, we allow to transport
// a specific event + eventpart. This allows to reproduce and debug bugs faster, once
// we know in which precise chunk they occur. The expected format for the environment variable
// is "eventnum:partid".
auto eventselection = getenv("O2SIM_RESTRICT_EVENTPART");
int focus_on_event = -1;
int focus_on_part = -1;
if (eventselection) {
auto splitString = [](const std::string& str) {
std::pair<std::string, std::string> parts;
size_t pos = str.find(':');
if (pos != std::string::npos) {
parts.first = str.substr(0, pos);
parts.second = str.substr(pos + 1);
}
return parts;
};
auto p = splitString(eventselection);
focus_on_event = std::atoi(p.first.c_str());
focus_on_part = std::atoi(p.second.c_str());
}
fair::mq::MessagePtr request(requestchannel.NewSimpleMessage(PrimaryChunkRequest{workerID, -1, counter++})); // <-- don't need content; channel means -> give primaries
fair::mq::Parts reply;
mVMCApp->setSimDataChannel(&dataoutchannel);
// we log info with workerID prepended
auto workerStr = [workerID]() {
std::stringstream str;
str << "[W" << workerID << "]";
return str.str();
};
doLogInfo(workerID, "Requesting work chunk");
int timeoutinMS = 2000;
auto sendcode = requestchannel.Send(request, timeoutinMS);
if (sendcode > 0) {
doLogInfo(workerID, "Waiting for answer");
// asking for primary generation
auto code = requestchannel.Receive(reply);
if (code > 0) {
doLogInfo(workerID, "Primary chunk received");
auto rawmessage = std::move(reply.At(0));
auto header = *(o2::PrimaryChunkAnswer*)(rawmessage->GetData());
if (!header.payload_attached) {
doLogInfo(workerID, "No payload; Server in stage " + std::string(PrimStateToString[(int)header.serverstate]));
// if no payload attached we inspect the server state, to see what to do
if (header.serverstate == O2PrimaryServerState::Initializing || header.serverstate == O2PrimaryServerState::WaitingEvent) {
sleep(1); // back-off and retry
return true;
}
// we need to decide what to do when the server is idle ---> if this happens immediately after a new batch request it means that the server might just lag a bit behind
return false;
} else {
auto payload = std::move(reply.At(1));
// wrap incoming bytes as a TMessageWrapper which offers "adoption" of a buffer
auto message = new TMessageWrapper(payload->GetData(), payload->GetSize());
auto chunk = static_cast<o2::data::PrimaryChunk*>(message->ReadObjectAny(message->GetClass()));
bool goon = true;
// no particles and eventID == -1 --> indication for no more work
if (chunk->mParticles.size() == 0 && chunk->mSubEventInfo.eventID == -1) {
doLogInfo(workerID, "No particles in reply : quitting kernel");
goon = false;
}
if (goon) {
auto info = chunk->mSubEventInfo;
LOG(info) << workerStr() << " Processing " << chunk->mParticles.size() << " primary particles "
<< "for event " << info.eventID << "/" << info.maxEvents << " "
<< "part " << info.part << "/" << info.nparts;
if (eventselection == nullptr || (focus_on_event == info.eventID && focus_on_part == info.part)) {
mVMCApp->setPrimaries(chunk->mParticles);
} else {
// nothing to transport here
mVMCApp->setPrimaries(std::vector<TParticle>{});
LOG(info) << workerStr() << " This chunk will be skipped";
}
mVMCApp->setSubEventInfo(&info);
if (reproducibleSim) {
LOG(info) << workerStr() << " Setting seed for this sub-event to " << chunk->mSubEventInfo.seed;
gRandom->SetSeed(chunk->mSubEventInfo.seed);
o2::base::VMCSeederService::instance().setSeed();
}
// Process one event
auto& conf = o2::conf::SimConfig::Instance();
if (strcmp(conf.getMCEngine().c_str(), "TGeant4") == 0 || strcmp(conf.getMCEngine().c_str(), "O2TrivialMCEngine") == 0) {
// this is preferred and necessary for Geant4
// since repeated "ProcessRun" might have significant overheads
mVMC->ProcessEvent();
} else {
// for Geant3 calling ProcessEvent is not enough
// as some hooks are not called
mVMC->ProcessRun(1);
}
FairSystemInfo sysinfo;
LOG(info) << workerStr() << " TIME-STAMP " << mTimer.RealTime() << "\t";
mTimer.Continue();
LOG(info) << workerStr() << " MEM-STAMP " << sysinfo.GetCurrentMemory() / (1024. * 1024) << " "
<< sysinfo.GetMaxMemory() << " MB\n";
}
delete message;
delete chunk;
}
} else {
LOG(info) << workerStr() << " No primary answer received from server (within timeout). Return code " << code;
}
} else {
LOG(info) << workerStr() << " Requesting work from server not possible. Return code " << sendcode;
return false;
}
return true;
}
protected:
/// Overloads the ConditionalRun() method of fair::mq::Device
bool ConditionalRun() final
{
return Kernel(-1, GetChannels().at("primary-get").at(0), GetChannels().at("simdata").at(0));
}
void PostRun() final { LOG(info) << "Shutting down "; }
private:
TStopwatch mTimer; //!
o2::steer::O2MCApplication* mVMCApp = nullptr; //!
TVirtualMC* mVMC = nullptr; //!
std::unique_ptr<FairRunSim> mSimRun; //!
};
} // namespace devices
} // namespace o2
#endif