From 61774fd3332b690d086fdcb03377b5b88910d66e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 8 Mar 2024 09:35:09 +0100 Subject: [PATCH 1/9] DPL: make sure O2_SIGNPOST_ID_FROM_POINTER allows for const pointers --- Framework/Foundation/include/Framework/Signpost.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Framework/Foundation/include/Framework/Signpost.h b/Framework/Foundation/include/Framework/Signpost.h index 7750687088a1d..9ea8aea1192a9 100644 --- a/Framework/Foundation/include/Framework/Signpost.h +++ b/Framework/Foundation/include/Framework/Signpost.h @@ -202,8 +202,6 @@ struct _o2_log_t { bool _o2_lock_free_stack_push(_o2_lock_free_stack& stack, const int& value, bool spin = false); bool _o2_lock_free_stack_pop(_o2_lock_free_stack& stack, int& value, bool spin = false); -//_o2_signpost_id_t _o2_signpost_id_generate_local(_o2_log_t* log); -//_o2_signpost_id_t _o2_signpost_id_make_with_pointer(_o2_log_t* log, void* pointer); void* _o2_log_create(char const* name, int stacktrace); void _o2_signpost_event_emit(_o2_log_t* log, _o2_signpost_id_t id, char const* name, char const* const format, ...); void _o2_signpost_interval_begin(_o2_log_t* log, _o2_signpost_id_t id, char const* name, char const* const format, ...); @@ -224,7 +222,7 @@ inline _o2_signpost_id_t _o2_signpost_id_generate_local(_o2_log_t* log) // Generate a unique id for a signpost. Do not use this directly, use O2_SIGNPOST_ID_FROM_POINTER instead. // Notice that this will fail for pointers to bytes as it might overlap with the id above. -inline _o2_signpost_id_t _o2_signpost_id_make_with_pointer(_o2_log_t* log, void* pointer) +inline _o2_signpost_id_t _o2_signpost_id_make_with_pointer(_o2_log_t* log, void const* pointer) { assert(((int64_t)pointer & 1) != 1); _o2_signpost_id_t uniqueId{(int64_t)pointer}; From cf3c2402f1e6e85d3128d300262a13d2e4e447fc Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 8 Mar 2024 09:35:09 +0100 Subject: [PATCH 2/9] DPL: replace Tracy allocation profiler with Signposts --- Framework/Core/src/DataAllocator.cxx | 7 ++++ Framework/Core/src/DataProcessingDevice.cxx | 42 +++++++++++++++------ Framework/Core/src/LifetimeHelpers.cxx | 8 +++- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index de500cd6d6c4e..fe38283d5e2de 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -35,6 +35,7 @@ #include O2_DECLARE_DYNAMIC_LOG(stream_context); +O2_DECLARE_DYNAMIC_LOG(parts); namespace o2::framework { @@ -129,6 +130,8 @@ void DataAllocator::addPartToContext(RouteIndex routeIndex, fair::mq::MessagePtr o2::header::SerializationMethod serializationMethod) { auto headerMessage = headerMessageFromOutput(spec, routeIndex, serializationMethod, 0); + O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerMessage->GetData()); + O2_SIGNPOST_START(parts, pid, "parts", "addPartToContext %p", headerMessage->GetData()); // FIXME: this is kind of ugly, we know that we can change the content of the // header message because we have just created it, but the API declares it const @@ -150,6 +153,8 @@ void DataAllocator::adopt(const Output& spec, std::string* ptr) // the correct payload size is set later when sending the // StringContext, see DataProcessor::doSend auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodNone, 0); + O2_SIGNPOST_ID_FROM_POINTER(pid, parts, header->GetData()); + O2_SIGNPOST_START(parts, pid, "parts", "addPartToContext %p", header->GetData()); mRegistry.get().addString(std::move(header), std::move(payload), routeIndex); assert(payload.get() == nullptr); } @@ -206,6 +211,8 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder& tb) auto& timingInfo = mRegistry.get(); RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice); auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodArrow, 0); + O2_SIGNPOST_ID_FROM_POINTER(pid, parts, header->GetData()); + O2_SIGNPOST_START(parts, pid, "parts", "adopt %p", header->GetData()); auto& context = mRegistry.get(); auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](size_t s) -> std::unique_ptr { diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 3e944b30ed11f..ca4ca71f50c2c 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -92,7 +92,10 @@ struct formatter : ostream_format }; } // namespace fmt +// A log to use for general device logging O2_DECLARE_DYNAMIC_LOG(device); +// Special log to keep track of the lifetime of the parts +O2_DECLARE_DYNAMIC_LOG(parts); using namespace o2::framework; using ConfigurationInterface = o2::configuration::ConfigurationInterface; @@ -1804,9 +1807,14 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& LOGP(error, "DataHeader payloadSize mismatch"); continue; } - TracyPlot("payload size", (int64_t)dh->payloadSize); auto dph = o2::header::get(headerData); - TracyAlloc(parts.At(pi + 1)->GetData(), parts.At(pi + 1)->GetSize()); + // We only deal with the tracking of parts if the log is enabled. + // This is because in principle we should track the size of each of + // the parts and sum it up. Not for now. + if (O2_LOG_ENABLED(parts) == true) { + O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData); + O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader with splitPayloadParts %d and splitPayloadIndex %d", dh->splitPayloadParts, dh->splitPayloadIndex); + } if (!dph) { insertInputInfo(pi, 2, InputType::Invalid); LOGP(error, "Header stack does not contain DataProcessingHeader"); @@ -2180,10 +2188,12 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v // Function to cleanup record. For the moment we // simply use it to keep track of input messages // which are not needed, to display them in the GUI. -#ifdef TRACY_ENABLE auto cleanupRecord = [](InputRecord& record) { - for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) { - DataRef input = record.getByPos(ii); + if (O2_LOG_ENABLED(parts) == false) { + return; + } + for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) { + DataRef input = record.getByPos(pi); if (input.header == nullptr) { continue; } @@ -2196,10 +2206,21 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v if (!dh) { continue; } - TracyFree(input.payload); + // We use the address of the first header of a split payload + // to identify the interval. + O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh); + O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh); + + // No split parts, we simply skip the payload + if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) { + // this is indicating a sequence of payloads following the header + // FIXME: we will probably also set the DataHeader version + pi += dh->splitPayloadParts - 1; + } else { + size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2; + } } }; -#endif auto switchState = [ref](StreamingState newState) { auto& control = ref.get(); @@ -2432,6 +2453,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v // We forward inputs only when we consume them. If we simply Process them, // we keep them for next message arriving. if (action.op == CompletionPolicy::CompletionOp::Consume) { + cleanupRecord(record); context.postDispatchingCallbacks(processContext); ref.get().call(o2::framework::ServiceRegistryRef{ref}); } @@ -2441,11 +2463,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume); } context.postForwardingCallbacks(processContext); - if (action.op == CompletionPolicy::CompletionOp::Consume) { -#ifdef TRACY_ENABLE - cleanupRecord(record); -#endif - } else if (action.op == CompletionPolicy::CompletionOp::Process) { + if (action.op == CompletionPolicy::CompletionOp::Process) { cleanTimers(action.slot, record); } } diff --git a/Framework/Core/src/LifetimeHelpers.cxx b/Framework/Core/src/LifetimeHelpers.cxx index 1aa53fa0493ca..8c8605d41849c 100644 --- a/Framework/Core/src/LifetimeHelpers.cxx +++ b/Framework/Core/src/LifetimeHelpers.cxx @@ -25,6 +25,7 @@ #include "Framework/FairMQDeviceProxy.h" #include "Framework/Formatters.h" #include "Framework/DeviceState.h" +#include "Framework/Signpost.h" #include "Headers/DataHeader.h" #include "Headers/DataHeaderHelpers.h" @@ -44,6 +45,8 @@ using namespace o2::header; using namespace fair; +O2_DECLARE_DYNAMIC_LOG(parts); + namespace o2::framework { @@ -411,7 +414,6 @@ ExpirationHandler::Handler LifetimeHelpers::enumerate(ConcreteDataMatcher const& assert(!ref.payload); auto timestamp = VariableContextHelpers::getTimeslice(variables).value; - LOGP(debug, "Enumerating record"); DataHeader dh; dh.dataOrigin = matcher.origin; dh.dataDescription = matcher.description; @@ -432,6 +434,8 @@ ExpirationHandler::Handler LifetimeHelpers::enumerate(ConcreteDataMatcher const& auto&& transport = deviceProxy.getInputChannel(channelIndex)->Transport(); auto channelAlloc = o2::pmr::getTransportAllocator(transport); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); + O2_SIGNPOST_ID_FROM_POINTER(hid, parts, header->GetData()); + O2_SIGNPOST_START(parts, hid, "parts", "Enumerating part %p with timestamp %zu", header->GetData(), timestamp); ref.header = std::move(header); auto payload = transport->CreateMessage(sizeof(counter_t)); @@ -486,6 +490,8 @@ ExpirationHandler::Handler LifetimeHelpers::dummy(ConcreteDataMatcher const& mat auto channelAlloc = o2::pmr::getTransportAllocator(transport); auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); ref.header = std::move(header); + O2_SIGNPOST_ID_FROM_POINTER(hid, parts, header->GetData()); + O2_SIGNPOST_START(parts, hid, "parts", "Enumerating part %p with timestamp %zu", header->GetData(), timestamp); auto payload = transport->CreateMessage(0); ref.payload = std::move(payload); }; From afb25c5bf77078b0e7900692455033aee4f23d27 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 8 Mar 2024 09:35:09 +0100 Subject: [PATCH 3/9] DPL: use signposts for OOB callback --- Framework/Core/src/DataProcessingDevice.cxx | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index ca4ca71f50c2c..ae7c19022e556 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -318,7 +318,7 @@ void on_socket_polled(uv_poll_t* poller, int status, int events) O2_SIGNPOST_END(device, sid, "socket_state", "Socket disconnected in context %{public}s", context->name); } break; case UV_PRIORITIZED: { - O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for context %{public}s", context->name); + O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Socket prioritized for context %{public}s", context->name); } break; } // We do nothing, all the logic for now stays in DataProcessingDevice::doRun() @@ -326,6 +326,7 @@ void on_socket_polled(uv_poll_t* poller, int status, int events) void on_out_of_band_polled(uv_poll_t* poller, int status, int events) { + O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller); auto* context = (PollerContext*)poller->data; context->state->loopReason |= DeviceState::OOB_ACTIVITY; if (status < 0) { @@ -334,32 +335,27 @@ void on_out_of_band_polled(uv_poll_t* poller, int status, int events) } switch (events) { case UV_READABLE: { - ZoneScopedN("socket readable event"); + O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name); context->state->loopReason |= DeviceState::DATA_INCOMING; assert(context->channelInfo); - LOGP(debug, "oob socket {} polled UV_READABLE.", - context->name, - context->channelInfo->hasPendingEvents); context->channelInfo->readPolled = true; } break; case UV_WRITABLE: { - ZoneScopedN("socket writeable"); + O2_SIGNPOST_END(device, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name); if (context->read) { - LOG(debug) << "socket polled UV_CONNECT" << context->name; + O2_SIGNPOST_START(device, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name); uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled); } else { - LOG(debug) << "socket polled UV_WRITABLE" << context->name; + O2_SIGNPOST_START(device, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name); context->state->loopReason |= DeviceState::DATA_OUTGOING; } } break; case UV_DISCONNECT: { - ZoneScopedN("socket disconnect"); - LOG(debug) << "socket polled UV_DISCONNECT"; + O2_SIGNPOST_END(device, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name); uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled); } break; case UV_PRIORITIZED: { - ZoneScopedN("socket prioritized"); - LOG(debug) << "socket polled UV_PRIORITIZED"; + O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name); } break; } // We do nothing, all the logic for now stays in DataProcessingDevice::doRun() From 7175a2746927054ef2904398967e602111b9ffd0 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 8 Mar 2024 09:35:09 +0100 Subject: [PATCH 4/9] DPL: rename TracyLockable to O2_LOCKABLE --- .../Core/include/Framework/DataRelayer.h | 2 +- Framework/Core/include/Framework/DataSender.h | 2 +- Framework/Core/src/DataRelayer.cxx | 32 +++++++++---------- Framework/Core/src/DataSender.cxx | 2 +- Framework/Core/src/ServiceRegistry.cxx | 6 ++-- .../Foundation/include/Framework/Tracing.h | 4 +-- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/Framework/Core/include/Framework/DataRelayer.h b/Framework/Core/include/Framework/DataRelayer.h index 8fdedbdd1119e..1ebcf549d6a29 100644 --- a/Framework/Core/include/Framework/DataRelayer.h +++ b/Framework/Core/include/Framework/DataRelayer.h @@ -194,7 +194,7 @@ class DataRelayer std::vector mPruneOps; size_t mMaxLanes; - TracyLockableN(std::recursive_mutex, mMutex, "data relayer mutex"); + O2_LOCKABLE_NAMED(std::recursive_mutex, mMutex, "data relayer mutex"); }; } // namespace o2::framework diff --git a/Framework/Core/include/Framework/DataSender.h b/Framework/Core/include/Framework/DataSender.h index 7cdc0ea537bea..2937bd80f78f8 100644 --- a/Framework/Core/include/Framework/DataSender.h +++ b/Framework/Core/include/Framework/DataSender.h @@ -61,7 +61,7 @@ class DataSender std::vector mPresent; std::vector mPresentDefaults; - TracyLockableN(std::recursive_mutex, mMutex, "data relayer mutex"); + O2_LOCKABLE_NAMED(std::recursive_mutex, mMutex, "data relayer mutex"); }; } // namespace o2::framework diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 0b459dd155a21..1daf4bbd2a20b 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -70,7 +70,7 @@ DataRelayer::DataRelayer(const CompletionPolicy& policy, mInputMatchers{DataRelayerHelpers::createInputMatchers(routes)}, mMaxLanes{InputRouteHelpers::maxLanes(routes)} { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); if (policy.configureRelayer == nullptr) { static int pipelineLength = DefaultsHelpers::pipelineLength(); @@ -100,7 +100,7 @@ DataRelayer::DataRelayer(const CompletionPolicy& policy, TimesliceId DataRelayer::getTimesliceForSlot(TimesliceSlot slot) { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); auto& variables = mTimesliceIndex.getVariablesForSlot(slot); return VariableContextHelpers::getTimeslice(variables); } @@ -109,7 +109,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector lock(mMutex); + std::scoped_lock lock(mMutex); auto& deviceProxy = services.get(); ActivityStats activity; @@ -394,7 +394,7 @@ DataRelayer::RelayChoice size_t nPayloads, std::function&, TimesliceIndex::OldestOutputInfo)> onDrop) { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); DataProcessingHeader const* dph = o2::header::get(rawHeader); // IMPLEMENTATION DETAILS // @@ -616,7 +616,7 @@ DataRelayer::RelayChoice void DataRelayer::getReadyToProcess(std::vector& completed) { LOGP(debug, "DataRelayer::getReadyToProcess"); - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); // THE STATE const auto& cache = mCache; @@ -749,7 +749,7 @@ void DataRelayer::getReadyToProcess(std::vector& comp void DataRelayer::updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus) { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); const auto numInputTypes = mDistinctRoutesIndex.size(); auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics, @@ -767,7 +767,7 @@ void DataRelayer::updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStat std::vector DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot) { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); const auto numInputTypes = mDistinctRoutesIndex.size(); // State of the computation @@ -821,7 +821,7 @@ std::vector DataRelayer::consumeAllInputsForTimeslice std::vector DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot) { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); const auto numInputTypes = mDistinctRoutesIndex.size(); // State of the computation @@ -864,7 +864,7 @@ std::vector DataRelayer::consumeExistingInputsForTime void DataRelayer::clear() { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); for (auto& cache : mCache) { cache.clear(); @@ -886,7 +886,7 @@ size_t /// the time pipelining. void DataRelayer::setPipelineLength(size_t s) { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); mTimesliceIndex.resize(s); mVariableContextes.resize(s); @@ -895,7 +895,7 @@ void DataRelayer::setPipelineLength(size_t s) void DataRelayer::publishMetrics() { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); auto numInputTypes = mDistinctRoutesIndex.size(); // FIXME: many of the DataRelayer function rely on allocated cache, so its @@ -932,31 +932,31 @@ void DataRelayer::publishMetrics() uint32_t DataRelayer::getFirstTFOrbitForSlot(TimesliceSlot slot) { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); return VariableContextHelpers::getFirstTFOrbit(mTimesliceIndex.getVariablesForSlot(slot)); } uint32_t DataRelayer::getFirstTFCounterForSlot(TimesliceSlot slot) { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); return VariableContextHelpers::getFirstTFCounter(mTimesliceIndex.getVariablesForSlot(slot)); } uint32_t DataRelayer::getRunNumberForSlot(TimesliceSlot slot) { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); return VariableContextHelpers::getRunNumber(mTimesliceIndex.getVariablesForSlot(slot)); } uint64_t DataRelayer::getCreationTimeForSlot(TimesliceSlot slot) { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); return VariableContextHelpers::getCreationTime(mTimesliceIndex.getVariablesForSlot(slot)); } void DataRelayer::sendContextState() { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); auto& states = mContext.get(); for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) { auto slot = TimesliceSlot{ci}; diff --git a/Framework/Core/src/DataSender.cxx b/Framework/Core/src/DataSender.cxx index f0753102e2d40..3acce1c4b9704 100644 --- a/Framework/Core/src/DataSender.cxx +++ b/Framework/Core/src/DataSender.cxx @@ -51,7 +51,7 @@ DataSender::DataSender(ServiceRegistryRef registry) mSpec{registry.get()}, mDistinctRoutesIndex{createDistinctOutputRouteIndex(mSpec.outputs)} { - std::scoped_lock lock(mMutex); + std::scoped_lock lock(mMutex); auto numInputTypes = mDistinctRoutesIndex.size(); auto& routes = mSpec.outputs; diff --git a/Framework/Core/src/ServiceRegistry.cxx b/Framework/Core/src/ServiceRegistry.cxx index 7b77c521fe6a7..09921f6b4a7d4 100644 --- a/Framework/Core/src/ServiceRegistry.cxx +++ b/Framework/Core/src/ServiceRegistry.cxx @@ -271,7 +271,7 @@ void ServiceRegistry::postRenderGUICallbacks(ServiceRegistryRef ref) void ServiceRegistry::bindService(ServiceRegistry::Salt salt, ServiceSpec const& spec, void* service) const { - static TracyLockableN(std::mutex, bindMutex, "bind mutex"); + static O2_LOCKABLE_NAMED(std::mutex, bindMutex, "bind mutex"); // Stream services need to store their callbacks in the stream context. // This is to make sure we invoke the correct callback only once per // stream, since they could bind multiple times. @@ -280,12 +280,12 @@ void ServiceRegistry::bindService(ServiceRegistry::Salt salt, ServiceSpec const& if (spec.kind == ServiceKind::Stream) { ServiceRegistryRef ref{const_cast(*this), salt}; auto& streamContext = ref.get(); - std::scoped_lock lock(bindMutex); + std::scoped_lock lock(bindMutex); auto& dataProcessorContext = ref.get(); ContextHelpers::bindStreamService(dataProcessorContext, streamContext, spec, service); } else { ServiceRegistryRef ref{const_cast(*this), salt}; - std::scoped_lock lock(bindMutex); + std::scoped_lock lock(bindMutex); if (ref.active()) { auto& dataProcessorContext = ref.get(); ContextHelpers::bindProcessorService(dataProcessorContext, spec, service); diff --git a/Framework/Foundation/include/Framework/Tracing.h b/Framework/Foundation/include/Framework/Tracing.h index f5bcecd3889d7..6c9c65f4423e5 100644 --- a/Framework/Foundation/include/Framework/Tracing.h +++ b/Framework/Foundation/include/Framework/Tracing.h @@ -39,8 +39,8 @@ #define TracyAppInfo(...) \ while (false) { \ } -#define TracyLockableN(T, V, N) T V -#define LockableBase(T) T +#define O2_LOCKABLE_NAMED(T, V, N) T V +#define O2_LOCKABLE(T) T #endif #endif // O2_FRAMEWORK_TRACING_H_ From 178e632ca967462a43420e470d4631f65b4c538f Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 8 Mar 2024 09:35:09 +0100 Subject: [PATCH 5/9] DPL: do not use Tracy anymore Tracy usage never took off, so I am removing all the explicit calls to it and I will replace them with Signpost usage, where it still makes sense. --- .../Core/src/CommonMessageBackendsHelpers.h | 1 - Framework/Core/src/CommonServices.cxx | 1 - Framework/Core/src/DataProcessingDevice.cxx | 29 ------------------- 3 files changed, 31 deletions(-) diff --git a/Framework/Core/src/CommonMessageBackendsHelpers.h b/Framework/Core/src/CommonMessageBackendsHelpers.h index 10d8a9c5d347a..a3684906ff838 100644 --- a/Framework/Core/src/CommonMessageBackendsHelpers.h +++ b/Framework/Core/src/CommonMessageBackendsHelpers.h @@ -38,7 +38,6 @@ struct CommonMessageBackendsHelpers { static ServiceProcessingCallback sendCallback() { return [](ProcessingContext& ctx, void* service) { - ZoneScopedN("send message callback"); T* context = reinterpret_cast(service); DataProcessor::doSend(ctx.services().get(), *context, ctx.services()); }; diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index a0f7fcdc88201..0d0828729ef42 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -741,7 +741,6 @@ auto sendRelayerMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) stats.updateStats({static_cast(static_cast(ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE) + (runningWorkflow.shmSegmentId % 512)), DataProcessingStats::Op::SetIfPositive, freeMemory}); } - ZoneScopedN("send metrics"); auto device = registry.get().device(); int64_t totalBytesIn = 0; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index ae7c19022e556..649aba2d6b502 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -115,14 +115,12 @@ struct ServiceKindExtractor { /// Watching stdin for commands probably a better approach. void on_idle_timer(uv_timer_t* handle) { - ZoneScopedN("Idle timer"); auto* state = (DeviceState*)handle->data; state->loopReason |= DeviceState::TIMER_EXPIRED; } void on_transition_requested_expired(uv_timer_t* handle) { - ZoneScopedN("Transition expired"); auto* state = (DeviceState*)handle->data; state->loopReason |= DeviceState::TIMER_EXPIRED; LOGP(info, "Timer expired. Forcing transition to READY"); @@ -222,12 +220,10 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi // one with the thread id. For the moment we simply use the first one. void run_callback(uv_work_t* handle) { - ZoneScopedN("run_callback"); auto* task = (TaskStreamInfo*)handle->data; auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)}; DataProcessingDevice::doPrepare(ref); DataProcessingDevice::doRun(ref); - // FrameMark; } // Once the processing in a thread is done, this is executed on the main thread. @@ -268,7 +264,6 @@ void run_completion(uv_work_t* handle, int status) quotaEvaluator.handleExpired(reportExpiredOffer); quotaEvaluator.dispose(task->id.index); task->running = false; - ZoneScopedN("run_completion"); } // Context for polling @@ -377,8 +372,6 @@ void DataProcessingDevice::Init() context.statefulProcess = nullptr; context.error = spec.algorithm.onError; context.initError = spec.algorithm.onInitError; - TracyAppInfo(spec.name.data(), spec.name.size()); - ZoneScopedN("DataProcessingDevice::Init"); auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options); if (configStore == nullptr) { @@ -412,7 +405,6 @@ void DataProcessingDevice::Init() if (context.initError) { context.initErrorHandling = [&errorCallback = context.initError, &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) { - ZoneScopedN("Error handling"); /// FIXME: we should pass the salt in, so that the message /// can access information which were stored in the stream. ServiceRegistryRef ref{serviceRegistry, ServiceRegistry::globalDeviceSalt()}; @@ -426,7 +418,6 @@ void DataProcessingDevice::Init() }; } else { context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) { - ZoneScopedN("Error handling"); auto& err = error_from_ref(e); /// FIXME: we should pass the salt in, so that the message /// can access information which were stored in the stream. @@ -449,7 +440,6 @@ void DataProcessingDevice::Init() try { context.statefulProcess = context.init(initContext); } catch (o2::framework::RuntimeErrorRef e) { - ZoneScopedN("error handling"); if (context.initErrorHandling) { (context.initErrorHandling)(e); } @@ -458,14 +448,12 @@ void DataProcessingDevice::Init() try { context.statefulProcess = context.init(initContext); } catch (std::exception& ex) { - ZoneScopedN("error handling"); /// Convert a standard exception to a RuntimeErrorRef /// Notice how this will lose the backtrace information /// and report the exception coming from here. auto e = runtime_error(ex.what()); (context.initErrorHandling)(e); } catch (o2::framework::RuntimeErrorRef e) { - ZoneScopedN("error handling"); (context.initErrorHandling)(e); } } @@ -621,7 +609,6 @@ static auto toBeforwardedMessageSet = [](std::vector& cachedForwar // FIXME: do it in a smarter way than O(N^2) static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector& currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) { - ZoneScopedN("forward inputs"); auto& proxy = registry.get(); // we collect all messages per forward in a map and send them together std::vector forwardedParts; @@ -1078,7 +1065,6 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont if (context.error != nullptr) { context.errorHandling = [&errorCallback = context.error, &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) { - ZoneScopedN("Error handling"); /// FIXME: we should pass the salt in, so that the message /// can access information which were stored in the stream. ServiceRegistryRef ref{serviceRegistry, ServiceRegistry::globalDeviceSalt()}; @@ -1093,7 +1079,6 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont } else { context.errorHandling = [&errorPolicy = mProcessingPolicies.error, &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) { - ZoneScopedN("Error handling"); auto& err = error_from_ref(e); /// FIXME: we should pass the salt in, so that the message /// can access information which were stored in the stream. @@ -1242,8 +1227,6 @@ void DataProcessingDevice::Run() // so that devices which do not have a timer can still start an // enumeration. { - ZoneScopedN("uv idle"); - TracyPlot("past activity", (int64_t)mWasActive); ServiceRegistryRef ref{mServiceRegistry}; ref.get().flushPending(mServiceRegistry); auto shouldNotWait = (mWasActive && @@ -1303,7 +1286,6 @@ void DataProcessingDevice::Run() if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) { state.transitionHandling = TransitionHandlingState::Expired; } - TracyPlot("shouldNotWait", (int)shouldNotWait); if (state.severityStack.empty() == false) { fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back()); state.severityStack.pop_back(); @@ -1349,7 +1331,6 @@ void DataProcessingDevice::Run() fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back()); state.severityStack.pop_back(); } - TracyPlot("loopReason", (int64_t)(uint64_t)state.loopReason); LOGP(debug, "Loop reason mask {:b} & {:b} = {:b}", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags); @@ -1433,7 +1414,6 @@ void DataProcessingDevice::Run() } else { mWasActive = false; } - FrameMark; } auto& spec = ref.get(); /// Cleanup messages which are still pending on exit. @@ -1448,12 +1428,10 @@ void DataProcessingDevice::Run() /// non-data triggers like those which are time based. void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) { - ZoneScopedN("DataProcessingDevice::doPrepare"); auto& context = ref.get(); *context.wasActive = false; { - ZoneScopedN("CallbackService::Id::ClockTick"); ref.get().call(); } // Whether or not we had something to do. @@ -1734,7 +1712,6 @@ struct WaitBackpressurePolicy { void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& info) { auto& context = ref.get(); - ZoneScopedN("DataProcessingDevice::handleData"); enum struct InputType : int { Invalid = 0, @@ -1763,7 +1740,6 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& auto& parts = info.parts; stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()}); - TracyPlot("messages received", (int64_t)parts.Size()); std::vector results; // we can reserve the upper limit results.reserve(parts.Size() / 2); @@ -2060,7 +2036,6 @@ void update_maximum(std::atomic& maximum_value, T const& value) noexcept bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector& completed) { auto& context = ref.get(); - ZoneScopedN("DataProcessingDevice::tryDispatchComputation"); LOGP(debug, "DataProcessingDevice::tryDispatchComputation"); // This is the actual hidden state for the outer loop. In case we decide we // want to support multithreaded dispatching of operations, I can simply @@ -2135,7 +2110,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void { auto& relayer = ref.get(); auto& timingInfo = ref.get(); - ZoneScopedN("DataProcessingDevice::prepareForCurrentTimeslice"); auto timeslice = relayer.getTimesliceForSlot(i); timingInfo.timeslice = timeslice.value; @@ -2422,21 +2396,18 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v try { runNoCatch(action); } catch (o2::framework::RuntimeErrorRef e) { - ZoneScopedN("error handling"); (context.errorHandling)(e, record); } } else { try { runNoCatch(action); } catch (std::exception& ex) { - ZoneScopedN("error handling"); /// Convert a standard exception to a RuntimeErrorRef /// Notice how this will lose the backtrace information /// and report the exception coming from here. auto e = runtime_error(ex.what()); (context.errorHandling)(e, record); } catch (o2::framework::RuntimeErrorRef e) { - ZoneScopedN("error handling"); (context.errorHandling)(e, record); } } From 7c3408d03132c468097d0357cafafa11d984e390 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 8 Mar 2024 09:35:09 +0100 Subject: [PATCH 6/9] DPL: drop unused Tracy APIs ZoneScoped*, TracyAppInfo, FrameMark, TracyAlloc, TracyPlot were all removed and will be replaced by Signposts as needed. --- .../Foundation/include/Framework/Tracing.h | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/Framework/Foundation/include/Framework/Tracing.h b/Framework/Foundation/include/Framework/Tracing.h index 6c9c65f4423e5..91c3b99eb8dd0 100644 --- a/Framework/Foundation/include/Framework/Tracing.h +++ b/Framework/Foundation/include/Framework/Tracing.h @@ -15,30 +15,6 @@ #define DPL_HAS_TRACING #include #else -#define ZoneScoped \ - while (false) { \ - } -#define FrameMark \ - while (false) { \ - } -#define TracyPlot(...) \ - while (false) { \ - } -#define ZoneScopedN(...) \ - while (false) { \ - } -#define ZoneScopedNS(...) \ - while (false) { \ - } -#define TracyAlloc(...) \ - while (false) { \ - } -#define TracyFree(...) \ - while (false) { \ - } -#define TracyAppInfo(...) \ - while (false) { \ - } #define O2_LOCKABLE_NAMED(T, V, N) T V #define O2_LOCKABLE(T) T #endif From 2a57f1a6a3e0c680225f2bf07f44f9c4b1beae1e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 8 Mar 2024 09:35:09 +0100 Subject: [PATCH 7/9] DPL: drop tracy support from the GUI No one is using it and Signposts are probably a better alternative. --- .../GUISupport/src/FrameworkGUIDeviceInspector.cxx | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx index eb62fcc09d338..3ef8b99ed26bb 100644 --- a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx +++ b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx @@ -261,9 +261,6 @@ void displayDeviceInspector(DeviceSpec const& spec, ImGui::Text("Pid: %d (exit status: %d)", info.pid, info.exitStatus); } ImGui::Text("Device state: %s", info.deviceState.data()); -#ifdef DPL_ENABLE_TRACING - ImGui::Text("Tracy Port: %d", info.tracyPort); -#endif ImGui::Text("Rank: %zu/%zu%%%zu/%zu", spec.rank, spec.nSlots, spec.inputTimesliceId, spec.maxInputTimeslices); if (ImGui::Button(ICON_FA_BUG "Attach debugger")) { @@ -324,16 +321,6 @@ void displayDeviceInspector(DeviceSpec const& spec, } #endif -#if DPL_ENABLE_TRACING - ImGui::SameLine(); - if (ImGui::Button("Tracy")) { - std::string tracyPort = std::to_string(info.tracyPort); - auto cmd = fmt::format("tracy-profiler -p {} -a 127.0.0.1 &", info.tracyPort); - LOG(debug) << cmd; - int retVal = system(cmd.c_str()); - (void)retVal; - } -#endif if (control.controller) { if (ImGui::Button("Offer SHM")) { control.controller->write("/shm-offer 1000", strlen("/shm-offer 1000")); From 101fc92ec1d9d6227b7744dd80939052cbfe5e92 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 8 Mar 2024 09:35:09 +0100 Subject: [PATCH 8/9] DPL: drop tracy support for good --- Framework/Core/src/DataProcessingDevice.cxx | 4 ---- Framework/Foundation/include/Framework/Tracing.h | 7 ++++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 649aba2d6b502..58ea6524f0b7d 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -8,10 +8,6 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifdef DPL_ENABLE_TRACING -#define TRACY_ENABLE -#include -#endif #include "Framework/AsyncQueue.h" #include "Framework/DataProcessingDevice.h" #include "Framework/ChannelMatching.h" diff --git a/Framework/Foundation/include/Framework/Tracing.h b/Framework/Foundation/include/Framework/Tracing.h index 91c3b99eb8dd0..72c052ca6c161 100644 --- a/Framework/Foundation/include/Framework/Tracing.h +++ b/Framework/Foundation/include/Framework/Tracing.h @@ -11,9 +11,10 @@ #ifndef O2_FRAMEWORK_TRACING_H_ #define O2_FRAMEWORK_TRACING_H_ -#if DPL_ENABLE_TRACING && __has_include() -#define DPL_HAS_TRACING -#include +#if DPL_ENABLE_TRACING +// FIXME: not implemented yet in terms of Signposts +#define O2_LOCKABLE_NAMED(T, V, N) T V +#define O2_LOCKABLE(T) T #else #define O2_LOCKABLE_NAMED(T, V, N) T V #define O2_LOCKABLE(T) T From 2fbedffecc63d304c86d56e8b6d64f1fb82194ef Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 8 Mar 2024 09:35:09 +0100 Subject: [PATCH 9/9] [FEAT] Drop support for Tracy --- Framework/CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/Framework/CHANGELOG.md b/Framework/CHANGELOG.md index e7278733de246..044274d9515c4 100644 --- a/Framework/CHANGELOG.md +++ b/Framework/CHANGELOG.md @@ -1,3 +1,12 @@ +# 2024-02-22: Drop Tracy support + +Tracy support never took off, so I am dropping it. This was mostly because people do not know about it and having a per process profile GUI was way unpractical. Moreover, needing an extra compile time flag meant one most likely did not have the support compiled in when needed. + +I have therefore decided to replace it with signposts, which hopefully will see better adoption thanks +to the integration with Instruments on mac and the easy way they can be enabled dynamically. + +We could then reintroduce Tracy support as a hook on top of signposts, if really needed. + # 2024-02-16: Improved Signposts. In particular: