Skip to content

Commit

Permalink
GPU: Remove support for host helper threads (no longer used)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidrohr authored and ktf committed Feb 4, 2025
1 parent 15a7e2f commit dc7e8e9
Show file tree
Hide file tree
Showing 11 changed files with 8 additions and 300 deletions.
2 changes: 1 addition & 1 deletion Common/Topologies/o2prototype_topology.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ The following parameters need adjustment when extending the FLP-EPN configuratio
</decltask>

<decltask id="tracker">
<exe reachable="true">$ALICEO2_INSTALL_DIR/bin/aliceHLTWrapper Tracker_%collectionIndex%_%taskIndex% 1 --dds --poll-period 100 --input type=pull,size=5000,method=connect,property=EPNReceiverOutputAddress,count=1 --output type=push,size=500,method=bind,property=TrackingOutputAddress,min-port=48000 --library libAliHLTTPC.so --component TPCCATracker --run 167808 --parameter '-GlobalTracking -allowGPU -GPUHelperThreads 4 -loglevel=0x7c'</exe>
<exe reachable="true">$ALICEO2_INSTALL_DIR/bin/aliceHLTWrapper Tracker_%collectionIndex%_%taskIndex% 1 --dds --poll-period 100 --input type=pull,size=5000,method=connect,property=EPNReceiverOutputAddress,count=1 --output type=push,size=500,method=bind,property=TrackingOutputAddress,min-port=48000 --library libAliHLTTPC.so --component TPCCATracker --run 167808 --parameter '-GlobalTracking -allowGPU -loglevel=0x7c'</exe>
<!-- <requirement></requirement> -->
<properties>
<id access="read">EPNReceiverOutputAddress</id>
Expand Down
3 changes: 0 additions & 3 deletions GPU/GPUTracking/Base/GPUReconstruction.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,6 @@ int32_t GPUReconstruction::InitPhaseBeforeDevice()
if (!(mRecoSteps.stepsGPUMask & GPUDataTypes::RecoStep::TPCMerging)) {
mProcessingSettings.mergerSortTracks = false;
}
if (!IsGPU()) {
mProcessingSettings.nDeviceHelperThreads = 0;
}

if (mProcessingSettings.debugLevel > 3 || !IsGPU() || mProcessingSettings.deterministicGPUReconstruction) {
mProcessingSettings.delayedOutput = false;
Expand Down
10 changes: 1 addition & 9 deletions GPU/GPUTracking/Base/GPUReconstructionCPU.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#define GPURECONSTRUCTIONICPU_H

#include "GPUReconstruction.h"
#include "GPUReconstructionHelpers.h"
#include "GPUConstantMem.h"
#include <stdexcept>
#include "utils/timer.h"
Expand Down Expand Up @@ -117,13 +116,6 @@ class GPUReconstructionCPU : public GPUReconstructionKernels<GPUReconstructionCP
virtual void RecordMarker(deviceEvent* ev, int32_t stream) {}
virtual void SynchronizeGPU() {}
virtual void ReleaseEvent(deviceEvent ev) {}
virtual int32_t StartHelperThreads() { return 0; }
virtual int32_t StopHelperThreads() { return 0; }
virtual void RunHelperThreads(int32_t (GPUReconstructionHelpers::helperDelegateBase::*function)(int32_t, int32_t, GPUReconstructionHelpers::helperParam*), GPUReconstructionHelpers::helperDelegateBase* functionCls, int32_t count) {}
virtual void WaitForHelperThreads() {}
virtual int32_t HelperError(int32_t iThread) const { return 0; }
virtual int32_t HelperDone(int32_t iThread) const { return 0; }
virtual void ResetHelperThreads(int32_t helpers) {}

size_t TransferMemoryResourceToGPU(GPUMemoryResource* res, int32_t stream = -1, deviceEvent* ev = nullptr, deviceEvent* evList = nullptr, int32_t nEvents = 1) { return TransferMemoryInternal(res, stream, ev, evList, nEvents, true, res->Ptr(), res->PtrDevice()); }
size_t TransferMemoryResourceToHost(GPUMemoryResource* res, int32_t stream = -1, deviceEvent* ev = nullptr, deviceEvent* evList = nullptr, int32_t nEvents = 1) { return TransferMemoryInternal(res, stream, ev, evList, nEvents, false, res->PtrDevice(), res->Ptr()); }
Expand Down Expand Up @@ -294,7 +286,7 @@ HighResTimer& GPUReconstructionCPU::getTimer(const char* name, int32_t num)
static int32_t id = getNextTimerId();
timerMeta* timer = getTimerById(id);
if (timer == nullptr) {
int32_t max = std::max<int32_t>({getOMPMaxThreads(), mProcessingSettings.nDeviceHelperThreads + 1, mProcessingSettings.nStreams});
int32_t max = std::max<int32_t>({getOMPMaxThreads(), mProcessingSettings.nStreams});
timer = insertTimer(id, name, J, max, 1, RecoStep::NoRecoStep);
}
if (num == -1) {
Expand Down
139 changes: 0 additions & 139 deletions GPU/GPUTracking/Base/GPUReconstructionDeviceBase.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -41,57 +41,6 @@ GPUReconstructionDeviceBase::GPUReconstructionDeviceBase(const GPUSettingsDevice

GPUReconstructionDeviceBase::~GPUReconstructionDeviceBase() = default;

void* GPUReconstructionDeviceBase::helperWrapper_static(void* arg)
{
GPUReconstructionHelpers::helperParam* par = (GPUReconstructionHelpers::helperParam*)arg;
GPUReconstructionDeviceBase* cls = par->cls;
return cls->helperWrapper(par);
}

void* GPUReconstructionDeviceBase::helperWrapper(GPUReconstructionHelpers::helperParam* par)
{
if (mProcessingSettings.debugLevel >= 3) {
GPUInfo("\tHelper thread %d starting", par->num);
}

// cpu_set_t mask; //TODO add option
// CPU_ZERO(&mask);
// CPU_SET(par->num * 2 + 2, &mask);
// sched_setaffinity(0, sizeof(mask), &mask);

par->mutex[0].lock();
while (par->terminate == false) {
for (int32_t i = par->num + 1; i < par->count; i += mProcessingSettings.nDeviceHelperThreads + 1) {
// if (mProcessingSettings.debugLevel >= 3) GPUInfo("\tHelper Thread %d Running, Slice %d+%d, Phase %d", par->num, i, par->phase);
if ((par->functionCls->*par->function)(i, par->num + 1, par)) {
par->error = 1;
}
if (par->reset) {
break;
}
par->done = i + 1;
// if (mProcessingSettings.debugLevel >= 3) GPUInfo("\tHelper Thread %d Finished, Slice %d+%d, Phase %d", par->num, i, par->phase);
}
ResetThisHelperThread(par);
par->mutex[0].lock();
}
if (mProcessingSettings.debugLevel >= 3) {
GPUInfo("\tHelper thread %d terminating", par->num);
}
par->mutex[1].unlock();
pthread_exit(nullptr);
return (nullptr);
}

void GPUReconstructionDeviceBase::ResetThisHelperThread(GPUReconstructionHelpers::helperParam* par)
{
if (par->reset) {
GPUImportant("GPU Helper Thread %d reseting", par->num);
}
par->reset = false;
par->mutex[1].unlock();
}

int32_t GPUReconstructionDeviceBase::GetGlobalLock(void*& pLock)
{
#ifdef _WIN32
Expand Down Expand Up @@ -138,86 +87,6 @@ void GPUReconstructionDeviceBase::ReleaseGlobalLock(void* sem)
#endif
}

void GPUReconstructionDeviceBase::ResetHelperThreads(int32_t helpers)
{
GPUImportant("Error occurred, GPU tracker helper threads will be reset (Number of threads %d (%d))", mProcessingSettings.nDeviceHelperThreads, mNSlaveThreads);
SynchronizeGPU();
for (int32_t i = 0; i < mProcessingSettings.nDeviceHelperThreads; i++) {
mHelperParams[i].reset = true;
if (helpers || i >= mProcessingSettings.nDeviceHelperThreads) {
pthread_mutex_lock(&((pthread_mutex_t*)mHelperParams[i].mutex)[1]);
}
}
GPUImportant("GPU Tracker helper threads have ben reset");
}

int32_t GPUReconstructionDeviceBase::StartHelperThreads()
{
int32_t nThreads = mProcessingSettings.nDeviceHelperThreads;
if (nThreads) {
mHelperParams = new GPUReconstructionHelpers::helperParam[nThreads];
if (mHelperParams == nullptr) {
GPUError("Memory allocation error");
ExitDevice();
return (1);
}
for (int32_t i = 0; i < nThreads; i++) {
mHelperParams[i].cls = this;
mHelperParams[i].terminate = false;
mHelperParams[i].reset = false;
mHelperParams[i].num = i;
for (int32_t j = 0; j < 2; j++) {
mHelperParams[i].mutex[j].lock();
}

if (pthread_create(&mHelperParams[i].threadId, nullptr, helperWrapper_static, &mHelperParams[i])) {
GPUError("Error starting slave thread");
ExitDevice();
return (1);
}
}
}
mNSlaveThreads = nThreads;
return (0);
}

int32_t GPUReconstructionDeviceBase::StopHelperThreads()
{
if (mNSlaveThreads) {
for (int32_t i = 0; i < mNSlaveThreads; i++) {
mHelperParams[i].terminate = true;
mHelperParams[i].mutex[0].unlock();
mHelperParams[i].mutex[1].lock();
if (pthread_join(mHelperParams[i].threadId, nullptr)) {
GPUError("Error waiting for thread to terminate");
return (1);
}
}
delete[] mHelperParams;
}
mNSlaveThreads = 0;
return (0);
}

void GPUReconstructionDeviceBase::WaitForHelperThreads()
{
for (int32_t i = 0; i < mProcessingSettings.nDeviceHelperThreads; i++) {
pthread_mutex_lock(&((pthread_mutex_t*)mHelperParams[i].mutex)[1]);
}
}

void GPUReconstructionDeviceBase::RunHelperThreads(int32_t (GPUReconstructionHelpers::helperDelegateBase::*function)(int32_t i, int32_t t, GPUReconstructionHelpers::helperParam* p), GPUReconstructionHelpers::helperDelegateBase* functionCls, int32_t count)
{
for (int32_t i = 0; i < mProcessingSettings.nDeviceHelperThreads; i++) {
mHelperParams[i].done = 0;
mHelperParams[i].error = 0;
mHelperParams[i].function = function;
mHelperParams[i].functionCls = functionCls;
mHelperParams[i].count = count;
pthread_mutex_unlock(&((pthread_mutex_t*)mHelperParams[i].mutex)[0]);
}
}

int32_t GPUReconstructionDeviceBase::InitDevice()
{
// cpu_set_t mask;
Expand Down Expand Up @@ -262,10 +131,6 @@ int32_t GPUReconstructionDeviceBase::InitDevice()
mProcShadow.mMemoryResProcessors = RegisterMemoryAllocation(&mProcShadow, &GPUProcessorProcessors::SetPointersDeviceProcessor, GPUMemoryResource::MEMORY_PERMANENT | GPUMemoryResource::MEMORY_HOST, "Processors");
AllocateRegisteredMemory(mProcShadow.mMemoryResProcessors);

if (StartHelperThreads()) {
return (1);
}

if (mMaster == nullptr || mProcessingSettings.debugLevel >= 2) {
GPUInfo("GPU Tracker initialization successfull"); // Verbosity reduced because GPU backend will print GPUImportant message!
}
Expand All @@ -282,10 +147,6 @@ void* GPUReconstructionDeviceBase::GPUProcessorProcessors::SetPointersDeviceProc

int32_t GPUReconstructionDeviceBase::ExitDevice()
{
if (StopHelperThreads()) {
return (1);
}

int32_t retVal = ExitDevice_Runtime();
mProcessorsShadow = nullptr;
mHostMemoryPool = mHostMemoryBase = mDeviceMemoryPool = mDeviceMemoryBase = mHostMemoryPoolEnd = mDeviceMemoryPoolEnd = mHostMemoryPermanent = mDeviceMemoryPermanent = nullptr;
Expand Down
17 changes: 1 addition & 16 deletions GPU/GPUTracking/Base/GPUReconstructionDeviceBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include "GPUReconstructionCPU.h"
#include <pthread.h>
#include "GPUReconstructionHelpers.h"
#include "GPUChain.h"
#include <vector>

Expand Down Expand Up @@ -61,24 +60,10 @@ class GPUReconstructionDeviceBase : public GPUReconstructionCPU
size_t GPUMemCpyAlways(bool onGpu, void* dst, const void* src, size_t size, int32_t stream, int32_t toGPU, deviceEvent* ev = nullptr, deviceEvent* evList = nullptr, int32_t nEvents = 1) override;
size_t WriteToConstantMemory(size_t offset, const void* src, size_t size, int32_t stream = -1, deviceEvent* ev = nullptr) override = 0;

int32_t StartHelperThreads() override;
int32_t StopHelperThreads() override;
void RunHelperThreads(int32_t (GPUReconstructionHelpers::helperDelegateBase::*function)(int32_t, int32_t, GPUReconstructionHelpers::helperParam*), GPUReconstructionHelpers::helperDelegateBase* functionCls, int32_t count) override;
int32_t HelperError(int32_t iThread) const override { return mHelperParams[iThread].error; }
int32_t HelperDone(int32_t iThread) const override { return mHelperParams[iThread].done; }
void WaitForHelperThreads() override;
void ResetHelperThreads(int32_t helpers) override;
void ResetThisHelperThread(GPUReconstructionHelpers::helperParam* par);

int32_t GetGlobalLock(void*& pLock);
void ReleaseGlobalLock(void* sem);

static void* helperWrapper_static(void* arg);
void* helperWrapper(GPUReconstructionHelpers::helperParam* par);

int32_t mDeviceId = -1; // Device ID used by backend
GPUReconstructionHelpers::helperParam* mHelperParams = nullptr; // Control Struct for helper threads
int32_t mNSlaveThreads = 0; // Number of slave threads currently active
int32_t mDeviceId = -1; // Device ID used by backend

struct DebugEvents {
deviceEvent DebugStart, DebugStop; // Debug timer events
Expand Down
50 changes: 0 additions & 50 deletions GPU/GPUTracking/Base/GPUReconstructionHelpers.h

This file was deleted.

1 change: 0 additions & 1 deletion GPU/GPUTracking/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ set(HDRS_INSTALL
Base/GPUConstantMem.h
Base/GPUParam.inc
Base/GPUParamRTC.h
Base/GPUReconstructionHelpers.h
Base/GPUReconstructionIncludes.h
Base/GPUReconstructionIncludesITS.h
Base/GPUReconstructionKernelMacros.h
Expand Down
1 change: 0 additions & 1 deletion GPU/GPUTracking/Definitions/GPUSettingsList.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ AddOption(registerStandaloneInputMemory, bool, false, "registerInputMemory", 0,
AddOption(ompThreads, int32_t, -1, "omp", 't', "Number of OMP threads to run (-1: all)", min(-1), message("Using %s OMP threads"))
AddOption(ompKernels, uint8_t, 2, "", 0, "Parallelize with OMP inside kernels instead of over slices, 2 for nested parallelization over TPC sectors and inside kernels")
AddOption(ompAutoNThreads, bool, true, "", 0, "Auto-adjust number of OMP threads, decreasing the number for small input data")
AddOption(nDeviceHelperThreads, int32_t, 1, "", 0, "Number of CPU helper threads for CPU processing")
AddOption(nStreams, int8_t, 8, "", 0, "Number of GPU streams / command queues")
AddOption(nTPCClustererLanes, int8_t, -1, "", 0, "Number of TPC clusterers that can run in parallel (-1 = autoset)")
AddOption(overrideClusterizerFragmentLen, int32_t, -1, "", 0, "Force the cluster max fragment len to a certain value (-1 = autodetect)")
Expand Down
13 changes: 0 additions & 13 deletions GPU/GPUTracking/Global/GPUChain.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#define GPUCHAIN_H

#include "GPUReconstructionCPU.h"
#include "GPUReconstructionHelpers.h"

namespace o2
{
Expand Down Expand Up @@ -111,12 +110,6 @@ class GPUChain
}
}
inline void StreamWaitForEvents(int32_t stream, deviceEvent* evList, int32_t nEvents = 1) { mRec->StreamWaitForEvents(stream, evList, nEvents); }
template <class T>
void RunHelperThreads(T function, GPUReconstructionHelpers::helperDelegateBase* functionCls, int32_t count);
inline void WaitForHelperThreads() { mRec->WaitForHelperThreads(); }
inline int32_t HelperError(int32_t iThread) const { return mRec->HelperError(iThread); }
inline int32_t HelperDone(int32_t iThread) const { return mRec->HelperDone(iThread); }
inline void ResetHelperThreads(int32_t helpers) { mRec->ResetHelperThreads(helpers); }
inline int32_t GPUDebug(const char* state = "UNKNOWN", int32_t stream = -1) { return mRec->GPUDebug(state, stream); }
// nEvents is forced to 0 if evList == nullptr
inline void TransferMemoryResourceToGPU(RecoStep step, GPUMemoryResource* res, int32_t stream = -1, deviceEvent* ev = nullptr, deviceEvent* evList = nullptr, int32_t nEvents = 1) { timeCpy(step, true, &GPUReconstructionCPU::TransferMemoryResourceToGPU, res, stream, ev, evList, nEvents); }
Expand Down Expand Up @@ -242,12 +235,6 @@ class GPUChain
void timeCpy(RecoStep step, int32_t toGPU, S T::*func, Args... args);
};

template <class T>
inline void GPUChain::RunHelperThreads(T function, GPUReconstructionHelpers::helperDelegateBase* functionCls, int32_t count)
{
mRec->RunHelperThreads((int32_t(GPUReconstructionHelpers::helperDelegateBase::*)(int32_t, int32_t, GPUReconstructionHelpers::helperParam*))function, functionCls, count);
}

template <bool Always, class T, class S, typename... Args>
inline void GPUChain::timeCpy(RecoStep step, int32_t toGPU, S T::*func, Args... args)
{
Expand Down
7 changes: 1 addition & 6 deletions GPU/GPUTracking/Global/GPUChainTracking.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#define GPUCHAINTRACKING_H

#include "GPUChain.h"
#include "GPUReconstructionHelpers.h"
#include "GPUDataTypes.h"
#include <atomic>
#include <mutex>
Expand Down Expand Up @@ -68,7 +67,7 @@ struct GPUTPCCFChainContext;
struct GPUNewCalibValues;
struct GPUTriggerOutputs;

class GPUChainTracking : public GPUChain, GPUReconstructionHelpers::helperDelegateBase
class GPUChainTracking : public GPUChain
{
friend class GPUReconstruction;

Expand Down Expand Up @@ -314,15 +313,11 @@ class GPUChainTracking : public GPUChain, GPUReconstructionHelpers::helperDelega
void RunTPCClusterFilter(o2::tpc::ClusterNativeAccess* clusters, std::function<o2::tpc::ClusterNative*(size_t)> allocator, bool applyClusterCuts);
bool NeedTPCClustersOnGPU();

std::atomic_flag mLockAtomicOutputBuffer = ATOMIC_FLAG_INIT;
std::mutex mMutexUpdateCalib;
std::unique_ptr<GPUChainTrackingFinalContext> mPipelineFinalizationCtx;
GPUChainTrackingFinalContext* mPipelineNotifyCtx = nullptr;
std::function<void()> mWaitForFinalInputs;

int32_t HelperReadEvent(int32_t iSlice, int32_t threadId, GPUReconstructionHelpers::helperParam* par);
int32_t HelperOutput(int32_t iSlice, int32_t threadId, GPUReconstructionHelpers::helperParam* par);

int32_t OutputStream() const { return mRec->NStreams() - 2; }
};
} // namespace gpu
Expand Down
Loading

0 comments on commit dc7e8e9

Please sign in to comment.