From 2e74423e3dd0d46a5a36cd1cde0ba8e0f8d97798 Mon Sep 17 00:00:00 2001 From: Jakob Blomer Date: Mon, 3 Feb 2025 13:13:18 +0100 Subject: [PATCH] enable UNIX sockets for TParallelMergingFile (cherry picked from commit 7beb0a4bceabd455f69ac4efc8f16c97c32d5b67) --- net/net/src/TParallelMergingFile.cxx | 41 ++++++++++++++++++---------- tutorials/net/parallelMergeClient.C | 11 +++++--- tutorials/net/parallelMergeServer.C | 28 ++++++++++++++----- 3 files changed, 55 insertions(+), 25 deletions(-) diff --git a/net/net/src/TParallelMergingFile.cxx b/net/net/src/TParallelMergingFile.cxx index 9972209073082..ef94964d24ad6 100644 --- a/net/net/src/TParallelMergingFile.cxx +++ b/net/net/src/TParallelMergingFile.cxx @@ -77,20 +77,33 @@ Bool_t TParallelMergingFile::UploadAndReset() { // Open connection to server if (fSocket == 0) { - const char *host = fServerLocation.GetHost(); - Int_t port = fServerLocation.GetPort(); - if (host == 0 || host[0] == '\0') { - host = "localhost"; - } - if (port <= 0) { - port = 1095; - } - fSocket = new TSocket(host,port); - if (!fSocket->IsValid()) { - Error("UploadAndReset","Could not contact the server %s:%d\n",host,port); - delete fSocket; - fSocket = 0; - return kFALSE; + const char *path = fServerLocation.GetFile(); + if (path && strlen(path) > 0 && path[0] == '/') { + // UNIX domain socket + fSocket = new TSocket(path); + if (!fSocket->IsValid()) { + Error("UploadAndReset", "Could not contact the server %s\n", path); + delete fSocket; + fSocket = 0; + return kFALSE; + } + } else { + // TCP socket + const char *host = fServerLocation.GetHost(); + Int_t port = fServerLocation.GetPort(); + if (host == 0 || host[0] == '\0') { + host = "localhost"; + } + if (port <= 0) { + port = 1095; + } + fSocket = new TSocket(host, port); + if (!fSocket->IsValid()) { + Error("UploadAndReset", "Could not contact the server %s:%d\n", host, port); + delete fSocket; + fSocket = 0; + return kFALSE; + } } // Wait till we get the start message // server tells us who we are diff --git a/tutorials/net/parallelMergeClient.C b/tutorials/net/parallelMergeClient.C index e49d449eeb262..f0b41f8e46e5c 100644 --- a/tutorials/net/parallelMergeClient.C +++ b/tutorials/net/parallelMergeClient.C @@ -6,8 +6,8 @@ /// To run this demo do the following: /// - Open at least 2 windows /// - Start ROOT in the first windows -/// - Execute in the first window: .x fastMergeServer.C -/// - Execute in the other windows: root.exe -b -l -q .x treeClient.C +/// - Execute in the first window: .x parallelMergeServer.C +/// - Execute in the other windows: root.exe -b -l -q .x 'parallelMergeClient.C("")' /// (You can put it in the background if wanted). /// If you want to run the hserv.C on a different host, just change /// "localhost" in the TSocket ctor below to the desired hostname. @@ -25,11 +25,14 @@ #include "TRandom.h" #include "TError.h" -void parallelMergeClient() +#include + +void parallelMergeClient(const std::string &socketPath) { gBenchmark->Start("treeClient"); - TParallelMergingFile *file = (TParallelMergingFile*)TFile::Open("mergedClient.root?pmerge=localhost:1095","RECREATE"); + TParallelMergingFile *file = + (TParallelMergingFile *)TFile::Open((std::string("mergedClient.root?pmerge=") + socketPath).c_str(), "RECREATE"); file->Write(); file->UploadAndReset(); // We do this early to get assigned an index. diff --git a/tutorials/net/parallelMergeServer.C b/tutorials/net/parallelMergeServer.C index cb0a9c270fb05..702841d827a7d 100644 --- a/tutorials/net/parallelMergeServer.C +++ b/tutorials/net/parallelMergeServer.C @@ -14,8 +14,8 @@ /// To run this demo do the following: /// - Open three windows /// - Start ROOT in all three windows -/// - Execute in the first window: .x hserv2.C -/// - Execute in the second and third windows: .x hclient.C +/// - Execute in the first window: .x parallelMergeServer.C +/// - Execute in the second and third windows: .x parallelMergeClient.C("") /// /// \macro_code /// @@ -319,10 +319,23 @@ struct ParallelFileMerger : public TObject }; void parallelMergeServer(bool cache = false) { - // Open a server socket looking for connections on a named service or - // on a specified port. - //TServerSocket *ss = new TServerSocket("rootserv", kTRUE); - TServerSocket *ss = new TServerSocket(1095, kTRUE, 100); + // Open a server socket looking for connections on a named service + TString socketPath = "rootserv."; // prefix for temporary file in the temp folder + // Get a unique, temporary file name for the socket. We remove and close the file + // immediatly in order to reopen it as a socket. There is a race here: between + // the removal and the creation of the socket, the file could have been recreated. + // But it is unlikely (due to the random letters in the name) and harmless: the socket + // cannot be created in this case. + FILE *dummy = gSystem->TempFileName(socketPath); + if (!dummy) { + Error("fastMergeServer", "Cannot create temporary file for socket\n"); + return; + } + + std::string strSocketPath(socketPath.View()); + remove(strSocketPath.c_str()); + fclose(dummy); + TServerSocket *ss = new TServerSocket(socketPath); if (!ss->IsValid()) { return; } @@ -343,7 +356,7 @@ void parallelMergeServer(bool cache = false) { kProtocolVersion = 1 }; - printf("fastMergeServerHist ready to accept connections\n"); + printf("fastMergeServerHist ready to accept connections on %s\n", strSocketPath.c_str()); while (true) { TMessage *mess; TSocket *s; @@ -438,5 +451,6 @@ void parallelMergeServer(bool cache = false) { mergers.Delete(); delete mon; + remove(strSocketPath.c_str()); delete ss; }