Skip to content

Commit

Permalink
enable UNIX sockets for TParallelMergingFile
Browse files Browse the repository at this point in the history
(cherry picked from commit 7beb0a4)
  • Loading branch information
jblomer authored and dpiparo committed Feb 6, 2025
1 parent aa568a4 commit 2e74423
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 25 deletions.
41 changes: 27 additions & 14 deletions net/net/src/TParallelMergingFile.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,33 @@ Bool_t TParallelMergingFile::UploadAndReset()
{
// Open connection to server
if (fSocket == 0) {
const char *host = fServerLocation.GetHost();
Int_t port = fServerLocation.GetPort();
if (host == 0 || host[0] == '\0') {
host = "localhost";
}
if (port <= 0) {
port = 1095;
}
fSocket = new TSocket(host,port);
if (!fSocket->IsValid()) {
Error("UploadAndReset","Could not contact the server %s:%d\n",host,port);
delete fSocket;
fSocket = 0;
return kFALSE;
const char *path = fServerLocation.GetFile();
if (path && strlen(path) > 0 && path[0] == '/') {
// UNIX domain socket
fSocket = new TSocket(path);
if (!fSocket->IsValid()) {
Error("UploadAndReset", "Could not contact the server %s\n", path);
delete fSocket;
fSocket = 0;
return kFALSE;
}
} else {
// TCP socket
const char *host = fServerLocation.GetHost();
Int_t port = fServerLocation.GetPort();
if (host == 0 || host[0] == '\0') {
host = "localhost";
}
if (port <= 0) {
port = 1095;
}
fSocket = new TSocket(host, port);
if (!fSocket->IsValid()) {
Error("UploadAndReset", "Could not contact the server %s:%d\n", host, port);
delete fSocket;
fSocket = 0;
return kFALSE;
}
}
// Wait till we get the start message
// server tells us who we are
Expand Down
11 changes: 7 additions & 4 deletions tutorials/net/parallelMergeClient.C
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
/// To run this demo do the following:
/// - Open at least 2 windows
/// - Start ROOT in the first windows
/// - Execute in the first window: .x fastMergeServer.C
/// - Execute in the other windows: root.exe -b -l -q .x treeClient.C
/// - Execute in the first window: .x parallelMergeServer.C
/// - Execute in the other windows: root.exe -b -l -q .x 'parallelMergeClient.C("<socket path printed by server>")'
/// (You can put it in the background if wanted).
/// If you want to run the hserv.C on a different host, just change
/// "localhost" in the TSocket ctor below to the desired hostname.
Expand All @@ -25,11 +25,14 @@
#include "TRandom.h"
#include "TError.h"

void parallelMergeClient()
#include <string>

void parallelMergeClient(const std::string &socketPath)
{
gBenchmark->Start("treeClient");

TParallelMergingFile *file = (TParallelMergingFile*)TFile::Open("mergedClient.root?pmerge=localhost:1095","RECREATE");
TParallelMergingFile *file =
(TParallelMergingFile *)TFile::Open((std::string("mergedClient.root?pmerge=") + socketPath).c_str(), "RECREATE");

file->Write();
file->UploadAndReset(); // We do this early to get assigned an index.
Expand Down
28 changes: 21 additions & 7 deletions tutorials/net/parallelMergeServer.C
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
/// To run this demo do the following:
/// - Open three windows
/// - Start ROOT in all three windows
/// - Execute in the first window: .x hserv2.C
/// - Execute in the second and third windows: .x hclient.C
/// - Execute in the first window: .x parallelMergeServer.C
/// - Execute in the second and third windows: .x parallelMergeClient.C("<socket path printed by the server>")
///
/// \macro_code
///
Expand Down Expand Up @@ -319,10 +319,23 @@ struct ParallelFileMerger : public TObject
};

void parallelMergeServer(bool cache = false) {
// Open a server socket looking for connections on a named service or
// on a specified port.
//TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
TServerSocket *ss = new TServerSocket(1095, kTRUE, 100);
// Open a server socket looking for connections on a named service
TString socketPath = "rootserv."; // prefix for temporary file in the temp folder
// Get a unique, temporary file name for the socket. We remove and close the file
// immediatly in order to reopen it as a socket. There is a race here: between
// the removal and the creation of the socket, the file could have been recreated.
// But it is unlikely (due to the random letters in the name) and harmless: the socket
// cannot be created in this case.
FILE *dummy = gSystem->TempFileName(socketPath);
if (!dummy) {
Error("fastMergeServer", "Cannot create temporary file for socket\n");
return;
}

std::string strSocketPath(socketPath.View());
remove(strSocketPath.c_str());
fclose(dummy);
TServerSocket *ss = new TServerSocket(socketPath);
if (!ss->IsValid()) {
return;
}
Expand All @@ -343,7 +356,7 @@ void parallelMergeServer(bool cache = false) {
kProtocolVersion = 1
};

printf("fastMergeServerHist ready to accept connections\n");
printf("fastMergeServerHist ready to accept connections on %s\n", strSocketPath.c_str());
while (true) {
TMessage *mess;
TSocket *s;
Expand Down Expand Up @@ -438,5 +451,6 @@ void parallelMergeServer(bool cache = false) {

mergers.Delete();
delete mon;
remove(strSocketPath.c_str());
delete ss;
}

0 comments on commit 2e74423

Please sign in to comment.