diff --git a/CMakeLists.txt b/CMakeLists.txt index 1313c48bbe..5676ec92df 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -249,8 +249,8 @@ endif() if(SLS_USE_SANITIZER) target_compile_options(slsProjectOptions INTERFACE -fsanitize=address,undefined -fno-omit-frame-pointer) target_link_libraries(slsProjectOptions INTERFACE -fsanitize=address,undefined) - # target_compile_options(slsProjectOptions INTERFACE -fsanitize=thread) - # target_link_libraries(slsProjectOptions INTERFACE -fsanitize=thread) + #target_compile_options(slsProjectOptions INTERFACE -fsanitize=thread -fno-omit-frame-pointer) + #target_link_libraries(slsProjectOptions INTERFACE -fsanitize=thread) endif() diff --git a/python/scripts/frameSynchronizerPullSocket.py b/python/scripts/frameSynchronizerPullSocket.py new file mode 100644 index 0000000000..4c13815ab6 --- /dev/null +++ b/python/scripts/frameSynchronizerPullSocket.py @@ -0,0 +1,19 @@ +# SPDX-License-Identifier: LGPL-3.0-or-other +# Copyright (C) 2021 Contributors to the SLS Detector Package +# Script to get combined zmq packets from frame synchronizer using pull zmq sockets +import json +import zmq + +c = zmq.Context() +s = c.socket(zmq.PULL) +s.connect("tcp://127.0.0.1:5555") + +while True: + m = s.recv_multipart() + for p in m: + if p.startswith(b"{"): + print(p.decode().strip()) + else: + print("binary") + print("--------") + diff --git a/slsReceiverSoftware/CMakeLists.txt b/slsReceiverSoftware/CMakeLists.txt index 8ff2b1e6bd..543ff7467e 100755 --- a/slsReceiverSoftware/CMakeLists.txt +++ b/slsReceiverSoftware/CMakeLists.txt @@ -138,13 +138,34 @@ if (SLS_USE_RECEIVER_BINARIES) slsProjectWarnings ) - install(TARGETS slsReceiver slsMultiReceiver + add_executable(slsFrameSynchronizer + src/FrameSynchronizerApp.cpp + ) + + set_target_properties(slsFrameSynchronizer PROPERTIES + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin + ) + if((CMAKE_BUILD_TYPE STREQUAL "Release") AND SLS_LTO_AVAILABLE) + set_property(TARGET slsFrameSynchronizer PROPERTY INTERPROCEDURAL_OPTIMIZATION True) + endif() + + target_link_libraries(slsFrameSynchronizer + PUBLIC + slsReceiverStatic + pthread + rt + PRIVATE + slsProjectWarnings + "$" + ) + + install(TARGETS slsReceiver slsMultiReceiver slsFrameSynchronizer EXPORT "${TARGETS_EXPORT_NAME}" RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/sls -) + ) endif(SLS_USE_RECEIVER_BINARIES) diff --git a/slsReceiverSoftware/include/sls/Receiver.h b/slsReceiverSoftware/include/sls/Receiver.h index f3389c80ec..eb7c4b2143 100644 --- a/slsReceiverSoftware/include/sls/Receiver.h +++ b/slsReceiverSoftware/include/sls/Receiver.h @@ -45,9 +45,8 @@ class Receiver : private virtual slsDetectorDefs { * Call back arguments are: * - startCallbackHeader metadata */ - void registerCallBackStartAcquisition(int (*func)(const startCallbackHeader, - void *), - void *arg); + void registerCallBackStartAcquisition( + void (*func)(const startCallbackHeader, void *), void *arg); /** * Call back for acquisition finished diff --git a/slsReceiverSoftware/src/ClientInterface.cpp b/slsReceiverSoftware/src/ClientInterface.cpp index ce87021516..47914b7bc4 100644 --- a/slsReceiverSoftware/src/ClientInterface.cpp +++ b/slsReceiverSoftware/src/ClientInterface.cpp @@ -33,6 +33,8 @@ using Interface = ServerInterface; #define gettid() syscall(SYS_gettid) #endif +std::mutex ClientInterface::callbackMutex; + ClientInterface::~ClientInterface() { killTcpThread = true; LOG(logINFO) << "Shutting down TCP Socket on port " << portNumber; @@ -54,13 +56,15 @@ std::string ClientInterface::getReceiverVersion() { return APIRECEIVER; } /***callback functions***/ void ClientInterface::registerCallBackStartAcquisition( - int (*func)(const startCallbackHeader, void *), void *arg) { + void (*func)(const startCallbackHeader, void *), void *arg) { + std::lock_guard lock(callbackMutex); startAcquisitionCallBack = func; pStartAcquisition = arg; } void ClientInterface::registerCallBackAcquisitionFinished( void (*func)(const endCallbackHeader, void *), void *arg) { + std::lock_guard lock(callbackMutex); acquisitionFinishedCallBack = func; pAcquisitionFinished = arg; } @@ -69,6 +73,7 @@ void ClientInterface::registerCallBackRawDataReady( void (*func)(sls_receiver_header &, dataCallbackHeader, char *, size_t &, void *), void *arg) { + std::lock_guard lock(callbackMutex); rawDataReadyCallBack = func; pRawDataReady = arg; } @@ -461,15 +466,18 @@ void ClientInterface::setDetectorType(detectorType arg) { std::string(e.what()) + ']'); } // callbacks after (in setdetectortype, the object is reinitialized) - if (startAcquisitionCallBack != nullptr) - impl()->registerCallBackStartAcquisition(startAcquisitionCallBack, - pStartAcquisition); - if (acquisitionFinishedCallBack != nullptr) - impl()->registerCallBackAcquisitionFinished(acquisitionFinishedCallBack, - pAcquisitionFinished); - if (rawDataReadyCallBack != nullptr) - impl()->registerCallBackRawDataReady(rawDataReadyCallBack, - pRawDataReady); + { + std::lock_guard lock(callbackMutex); + if (startAcquisitionCallBack != nullptr) + impl()->registerCallBackStartAcquisition(startAcquisitionCallBack, + pStartAcquisition); + if (acquisitionFinishedCallBack != nullptr) + impl()->registerCallBackAcquisitionFinished( + acquisitionFinishedCallBack, pAcquisitionFinished); + if (rawDataReadyCallBack != nullptr) + impl()->registerCallBackRawDataReady(rawDataReadyCallBack, + pRawDataReady); + } impl()->setThreadIds(parentThreadId, tcpThreadId); } diff --git a/slsReceiverSoftware/src/ClientInterface.h b/slsReceiverSoftware/src/ClientInterface.h index 8bde90c641..0d96318708 100644 --- a/slsReceiverSoftware/src/ClientInterface.h +++ b/slsReceiverSoftware/src/ClientInterface.h @@ -34,9 +34,8 @@ class ClientInterface : private virtual slsDetectorDefs { //***callback functions*** /** params: file path, file name, file index, image size */ - void registerCallBackStartAcquisition(int (*func)(const startCallbackHeader, - void *), - void *arg); + void registerCallBackStartAcquisition( + void (*func)(const startCallbackHeader, void *), void *arg); /** params: total frames caught */ void registerCallBackAcquisitionFinished( @@ -180,8 +179,8 @@ class ClientInterface : private virtual slsDetectorDefs { //***callback parameters*** - int (*startAcquisitionCallBack)(const startCallbackHeader, - void *) = nullptr; + void (*startAcquisitionCallBack)(const startCallbackHeader, + void *) = nullptr; void *pStartAcquisition{nullptr}; void (*acquisitionFinishedCallBack)(const endCallbackHeader, void *) = nullptr; @@ -194,6 +193,8 @@ class ClientInterface : private virtual slsDetectorDefs { pid_t tcpThreadId{0}; std::vector udpips = std::vector(MAX_NUMBER_OF_LISTENING_THREADS); + // necessary if Receiver objects using threads with callbacks + static std::mutex callbackMutex; }; } // namespace sls diff --git a/slsReceiverSoftware/src/FrameSynchronizerApp.cpp b/slsReceiverSoftware/src/FrameSynchronizerApp.cpp new file mode 100644 index 0000000000..790a30b7b0 --- /dev/null +++ b/slsReceiverSoftware/src/FrameSynchronizerApp.cpp @@ -0,0 +1,621 @@ +// SPDX-License-Identifier: LGPL-3.0-or-other +// Copyright (C) 2021 Contributors to the SLS Detector Package +/* Creates the slsFrameSynchronizer for running multiple receivers in different + * threads form a single binary that will spit out zmq streams without + * reconstructing image. Sample python script for pull socket for this combiner + * in python/scripts folder. TODO: Not handling empty frames from one socket + */ +#include "sls/Receiver.h" +#include "sls/ToString.h" +#include "sls/container_utils.h" +#include "sls/logger.h" +#include "sls/sls_detector_defs.h" + +#include //SIGINT +#include +#include +#include +#include +#include +#include +#include +#include //wait +#include +#include + +#include +#include +#include + +std::vector threads; +std::vector semaphores; +sls::TLogLevel printHeadersLevel = sls::logDEBUG; + +/** Define Colors to print data call back in different colors for different + * recievers */ +#define PRINT_IN_COLOR(c, f, ...) \ + printf("\033[%dm" f RESET, 30 + c + 1, ##__VA_ARGS__) + +/** Structure handling different threads */ +using ZmqMsgList = std::vector; +using FrameMap = std::map; +using PortFrameMap = std::map; +struct FrameStatus { + bool starting = true; + bool terminate = false; + int num_receivers; + sem_t available; + std::mutex mtx; + ZmqMsgList headers; + PortFrameMap frames; + ZmqMsgList ends; + + FrameStatus(bool start, bool term, int num_recv) + : starting(start), terminate(term), num_receivers(num_recv) { + sem_init(&available, 0, 0); + } +}; +FrameStatus *global_frame_status = nullptr; + +/** + * Control+C Interrupt Handler + * to let all the processes know to exit properly + */ +void sigInterruptHandler(int p) { + for (size_t i = 0; i != semaphores.size(); ++i) { + sem_post(semaphores[i]); + } +} + +void cleanup() { + if (global_frame_status) { + std::lock_guard lock(global_frame_status->mtx); + for (auto &outer_pair : global_frame_status->frames) { + for (auto &inner_pair : outer_pair.second) { + for (zmq_msg_t *msg : inner_pair.second) { + if (msg) { + zmq_msg_close(msg); + delete msg; + } + } + inner_pair.second.clear(); + } + outer_pair.second.clear(); + } + global_frame_status->frames.clear(); + } +} + +/** + * prints usage of this example program + */ +std::string getHelpMessage() { + std::ostringstream os; + os << "\nUsage:\n" + "./slsFrameSynchronizer [start tcp port] [num recevers] [print " + "callback headers (optional)]\n" + << "\t - tcp port has to be non-zero and 16 bit\n" + << "\t - print callback headers option is 0 (disabled) by default\n"; + return os.str(); +} + +void zmq_free(void *data, void *hint) { delete[] static_cast(data); } + +void print_frames(const PortFrameMap &frame_port_map) { + LOG(sls::logDEBUG) << "Printing frames"; + for (const auto &it : frame_port_map) { + uint16_t udpPort = it.first; + const auto &frame_map = it.second; + LOG(sls::logDEBUG) << "UDP port: " << udpPort; + for (const auto &frame : frame_map) { + uint64_t fnum = frame.first; + const auto &msg_list = frame.second; + LOG(sls::logDEBUG) + << " acq index: " << fnum << '[' << msg_list.size() << ']'; + } + } +} + +/** Valid frame numbers mean they exist across all ports or + * has at least a larger fnum in the port with the missing fnum **/ +std::set get_valid_fnums(const PortFrameMap &port_frame_map) { + // empty list + std::set valid_fnums; + if (port_frame_map.empty()) { + return valid_fnums; + } + + // collect all unique frame numbers from all ports + std::set unique_fnums; + for (auto it = port_frame_map.begin(); it != port_frame_map.begin(); ++it) { + const FrameMap &frame_map = it->second; + for (auto frame = frame_map.begin(); frame != frame_map.end(); + ++frame) { + unique_fnums.insert(frame->first); + } + } + + // collect valid frame numbers + for (auto &fnum : unique_fnums) { + bool is_valid = true; + for (auto it = port_frame_map.begin(); it != port_frame_map.end(); + ++it) { + uint16_t port = it->first; + const FrameMap &frame_map = it->second; + auto frame = frame_map.find(fnum); + // invalid: fnum missing in one port + if (frame == frame_map.end()) { + LOG(sls::logDEBUG) + << "Fnum " << fnum << " is missing in port " << port; + // invalid: fnum greater than all in that port + auto last_frame = std::prev(frame_map.end()); + auto last_fnum = last_frame->first; + if (fnum > last_fnum) { + LOG(sls::logDEBUG) << "And no larger fnum found. Fnum " + << fnum << " is invalid.\n"; + is_valid = false; + break; + } + } + } + if (is_valid) { + valid_fnums.insert(fnum); + } + } + + return valid_fnums; +} + +int zmq_send_multipart(void *socket, const ZmqMsgList &messages) { + size_t num_messages = messages.size(); + for (size_t i = 0; i != num_messages; ++i) { + zmq_msg_t *msg = messages[i]; + // determine flags: ZMQ_SNDMORE for all messages except the last + int flags = (i == num_messages - 1) ? 0 : ZMQ_SNDMORE; + if (zmq_msg_send(msg, socket, flags) == -1) { + LOG(sls::logERROR) + << "Error sending message: " << zmq_strerror(zmq_errno()); + return slsDetectorDefs::FAIL; + } + } + return slsDetectorDefs::OK; +} + +void Correlate(FrameStatus *stat) { + void *context = zmq_ctx_new(); + + void *socket = zmq_socket(context, ZMQ_PUSH); + int rc = zmq_bind(socket, "tcp://*:5555"); + if (rc != 0) { + LOG(sls::logERROR) << "failed to bind"; + } + + while (true) { + sem_wait(&(stat->available)); + { + std::lock_guard lock(stat->mtx); + + if (stat->terminate) { + break; + } + + if (stat->starting) { + // sending all start packets + if (static_cast(stat->headers.size()) == + stat->num_receivers) { + stat->starting = false; + // clean up + zmq_send_multipart(socket, stat->headers); + for (zmq_msg_t *msg : stat->headers) { + if (msg) { + zmq_msg_close(msg); + delete msg; + } + } + stat->headers.clear(); + } + } else { + // print_frames(stat->frames); + auto valid_fnums = get_valid_fnums(stat->frames); + // sending all valid fnum data packets + for (const auto &fnum : valid_fnums) { + ZmqMsgList msg_list; + PortFrameMap &port_frame_map = stat->frames; + for (auto it = port_frame_map.begin(); + it != port_frame_map.end(); ++it) { + uint16_t port = it->first; + const FrameMap &frame_map = it->second; + auto frame = frame_map.find(fnum); + if (frame != frame_map.end()) { + msg_list.insert(msg_list.end(), + stat->frames[port][fnum].begin(), + stat->frames[port][fnum].end()); + // clean up + for (zmq_msg_t *msg : stat->frames[port][fnum]) { + if (msg) { + zmq_msg_close(msg); + delete msg; + } + } + stat->frames[port].erase(fnum); + } + } + LOG(printHeadersLevel) + << "Sending data packets for fnum " << fnum; + zmq_send_multipart(socket, msg_list); + } + } + // sending all end packets + if (static_cast(stat->ends.size()) == stat->num_receivers) { + zmq_send_multipart(socket, stat->ends); + // clean up + for (zmq_msg_t *msg : stat->ends) { + if (msg) { + zmq_msg_close(msg); + delete msg; + } + } + stat->ends.clear(); + } + } + } + zmq_close(socket); + zmq_ctx_destroy(context); +} + +void StartAcquisitionCallback( + const slsDetectorDefs::startCallbackHeader callbackHeader, + void *objectPointer) { + LOG(printHeadersLevel) + << "Start Acquisition:" + << "\n\t[" + << "\n\tUDP Port : " << sls::ToString(callbackHeader.udpPort) + << "\n\tDynamic Range : " << callbackHeader.dynamicRange + << "\n\tDetector Shape : " + << sls::ToString(callbackHeader.detectorShape) + << "\n\tImage Size : " << callbackHeader.imageSize + << "\n\tFile Path : " << callbackHeader.filePath + << "\n\tFile Name : " << callbackHeader.fileName + << "\n\tFile Index : " << callbackHeader.fileIndex + << "\n\tQuad Enable : " << callbackHeader.quad + << "\n\tAdditional Json Header : " + << sls::ToString(callbackHeader.addJsonHeader).c_str() << "\n\t]"; + std::ostringstream oss; + oss << "{\"htype\":\"header\"" + << ", \"udpPorts\":" << sls::ToString(callbackHeader.udpPort) + << ", \"bitmode\":" << callbackHeader.dynamicRange + << ", \"filePath\":\"" << callbackHeader.filePath + << "\", \"fileName\":\"" << callbackHeader.fileName + << "\", \"fileIndex\":" << callbackHeader.fileIndex + << ", \"detshape\":" << sls::ToString(callbackHeader.detectorShape) + << ", \"size\":" << callbackHeader.imageSize + << ", \"quad\":" << (callbackHeader.quad ? 1 : 0); + + if (!callbackHeader.addJsonHeader.empty()) { + oss << ", \"addJsonHeader\": {"; + for (auto it = callbackHeader.addJsonHeader.begin(); + it != callbackHeader.addJsonHeader.end(); ++it) { + if (it != callbackHeader.addJsonHeader.begin()) { + oss << ", "; + } + oss << "\"" << it->first.c_str() << "\":\"" << it->second.c_str() + << "\""; + } + oss << " } "; + } + oss << "}\n"; + + std::string message = oss.str(); + LOG(sls::logDEBUG) << "Start Acquisition message:" << std::endl << message; + + int length = message.length(); + char *hdata = new char[length]; + memcpy(hdata, message.c_str(), length); + zmq_msg_t *hmsg = new zmq_msg_t; + zmq_msg_init_data(hmsg, hdata, length, zmq_free, NULL); + + // push zmq msg into stat to be processed + FrameStatus *stat = static_cast(objectPointer); + { + std::lock_guard lock(stat->mtx); + stat->headers.push_back(hmsg); + stat->starting = true; + // clean up old frames + for (int port : callbackHeader.udpPort) { + for (auto &frame_map : stat->frames[port]) { + for (zmq_msg_t *msg : frame_map.second) { + if (msg) { + zmq_msg_close(msg); + delete msg; + } + } + frame_map.second.clear(); + } + stat->frames[port].clear(); + } + } + sem_post(&stat->available); +} + +void AcquisitionFinishedCallback( + const slsDetectorDefs::endCallbackHeader callbackHeader, + void *objectPointer) { + LOG(printHeadersLevel) << "Acquisition Finished:" + << "\n\t[" + << "\n\tUDP Port : " + << sls::ToString(callbackHeader.udpPort) + << "\n\tComplete Frames : " + << sls::ToString(callbackHeader.completeFrames) + << "\n\tLast Frame Index : " + << sls::ToString(callbackHeader.lastFrameIndex) + << "\n\t]"; + std::ostringstream oss; + oss << "{\"htype\":\"series_end\"" + << ", \"udpPorts\":" << sls::ToString(callbackHeader.udpPort) + << ", \"comleteFrames\":" + << sls::ToString(callbackHeader.completeFrames) + << ", \"lastFrameIndex\":" + << sls::ToString(callbackHeader.lastFrameIndex) << "}\n"; + std::string message = oss.str(); + int length = message.length(); + LOG(sls::logDEBUG) << "Acquisition Finished message:" << std::endl + << message; + + char *hdata = new char[length]; + memcpy(hdata, message.c_str(), length); + zmq_msg_t *hmsg = new zmq_msg_t; + zmq_msg_init_data(hmsg, hdata, length, zmq_free, NULL); + + // push zmq msg into stat to be processed + FrameStatus *stat = static_cast(objectPointer); + { + std::lock_guard lock(stat->mtx); + stat->ends.push_back(hmsg); + } + sem_post(&stat->available); +} + +void GetDataCallback(slsDetectorDefs::sls_receiver_header &header, + slsDetectorDefs::dataCallbackHeader callbackHeader, + char *dataPointer, size_t &imageSize, + void *objectPointer) { + + slsDetectorDefs::sls_detector_header detectorHeader = header.detHeader; + + if (printHeadersLevel < sls::logDEBUG) { + // print in different color for each udp port + PRINT_IN_COLOR((callbackHeader.udpPort % 10), + "Data: " + "\n\tCallback Header: " + "\n\t[" + "\n\tUDP Port: %u" + "\n\tShape: [%u, %u]" + "\n\tAcq Index : %lu" + "\n\tFrame Index :%lu" + "\n\tProgress : %.2f%%" + "\n\tComplete Image :%s" + "\n\tFlip Rows :%s" + "\n\tAdditional Json Header : %s" + "\n\t]" + "\n\ttReceiver Header: " + "\n\t[" + "\n\tFrame Number : %lu" + "\n\tExposure Length :%u" + "\n\tPackets Caught :%u" + "\n\tDetector Specific 1: %lu" + "\n\tTimestamp : %lu" + "\n\tModule Id :%u" + "\n\tRow : %u" + "\n\tColumn :%u" + "\n\tDetector Specific 2 : %u" + "\n\tDetector Specific 3 : %u" + "\n\tDetector Specific 4 : %u" + "\n\tDetector Type : %s" + "\n\tVersion: %u" + "\n\t]" + "\n\tFirst Byte Data: 0x%x" + "\n\tImage Size: %zu\n\n", + callbackHeader.udpPort, callbackHeader.shape.x, + callbackHeader.shape.y, callbackHeader.acqIndex, + callbackHeader.frameIndex, callbackHeader.progress, + sls::ToString(callbackHeader.completeImage).c_str(), + sls::ToString(callbackHeader.flipRows).c_str(), + sls::ToString(callbackHeader.addJsonHeader).c_str(), + detectorHeader.frameNumber, detectorHeader.expLength, + detectorHeader.packetNumber, detectorHeader.detSpec1, + detectorHeader.timestamp, detectorHeader.modId, + detectorHeader.row, detectorHeader.column, + detectorHeader.detSpec2, detectorHeader.detSpec3, + detectorHeader.detSpec4, + sls::ToString(detectorHeader.detType).c_str(), + detectorHeader.version, + // header->packetsMask.to_string().c_str(), + ((uint8_t)(*((uint8_t *)(dataPointer)))), imageSize); + } + std::ostringstream oss; + oss << "{\"htype\":\"module\"" + << ", \"port\":" << callbackHeader.udpPort + << ", \"shape\":" << sls::ToString(callbackHeader.shape) + << ", \"acqIndex\":" << callbackHeader.acqIndex + << ", \"frameIndex\":" << callbackHeader.frameIndex + << ", \"flipRows\":" << (callbackHeader.flipRows ? 1 : 0) + << ", \"progress\":" << callbackHeader.progress + << ", \"completeImage\":" << (callbackHeader.completeImage ? 1 : 0) + << ", \"imageSize\":" << imageSize + << ", \"frameNumber\":" << detectorHeader.frameNumber + << ", \"expLength\":" << detectorHeader.expLength + << ", \"packetNumber\":" << detectorHeader.packetNumber + << ", \"detSpec1\":" << detectorHeader.detSpec1 + << ", \"timestamp\":" << detectorHeader.timestamp + << ", \"modId\":" << detectorHeader.modId + << ", \"row\":" << detectorHeader.row + << ", \"column\":" << detectorHeader.column + << ", \"detSpec2\":" << detectorHeader.detSpec2 + << ", \"detSpec3\":" << detectorHeader.detSpec3 + << ", \"detSpec4\":" << detectorHeader.detSpec4 + << ", \"detType\":" << static_cast(detectorHeader.detType) + << ", \"version\":" << static_cast(detectorHeader.version); + + if (!callbackHeader.addJsonHeader.empty()) { + oss << ", \"addJsonHeader\": {"; + for (auto it = callbackHeader.addJsonHeader.begin(); + it != callbackHeader.addJsonHeader.end(); ++it) { + if (it != callbackHeader.addJsonHeader.begin()) { + oss << ", "; + } + oss << "\"" << it->first.c_str() << "\":\"" << it->second.c_str() + << "\""; + } + oss << " } "; + } + oss << "}\n"; + std::string message = oss.str(); + LOG(sls::logDEBUG) << "Data message:" << std::endl << message; + + // creating header part of data packet + int length = message.length(); + char *hdata = new char[length]; + memcpy(hdata, message.c_str(), length); + zmq_msg_t *hmsg = new zmq_msg_t; + zmq_msg_init_data(hmsg, hdata, length, zmq_free, NULL); + + // created data part of data packet + char *data = new char[imageSize]; + zmq_msg_t *msg = new zmq_msg_t; + zmq_msg_init_data(msg, data, imageSize, zmq_free, NULL); + + // push both parts into stat to be processed + FrameStatus *stat = static_cast(objectPointer); + { + std::lock_guard lock(stat->mtx); + stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber] + .push_back(hmsg); + stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber] + .push_back(msg); + } + sem_post(&stat->available); +} + +/** + * Example of main program using the Receiver class + * + * - Defines in file for: + * - Default Number of receivers is 1 + * - Default Start TCP port is 1954 + */ +int main(int argc, char *argv[]) { + + /** - set default values */ + int numReceivers = 1; + uint16_t startTCPPort = DEFAULT_TCP_RX_PORTNO; + bool printHeaders = false; + + /** - get number of receivers and start tcp port from command line + * arguments */ + if (argc > 1) { + try { + if (argc == 3 || argc == 4) { + startTCPPort = sls::StringTo(argv[1]); + if (startTCPPort == 0) { + throw std::runtime_error("Invalid start tcp port"); + } + numReceivers = std::stoi(argv[2]); + if (numReceivers > 1024) { + cprintf(RED, + "Did you mix up the order of the arguments?\n%s\n", + getHelpMessage().c_str()); + return EXIT_FAILURE; + } + if (numReceivers == 0) { + cprintf(RED, "Invalid number of receivers.\n%s\n", + getHelpMessage().c_str()); + return EXIT_FAILURE; + } + if (argc == 4) { + printHeaders = sls::StringTo(argv[3]); + if (printHeaders) { + printHeadersLevel = sls::logINFOBLUE; + } + } + } else + throw std::runtime_error("Invalid number of arguments"); + } catch (const std::exception &e) { + cprintf(RED, "Error: %s\n%s\n", e.what(), getHelpMessage().c_str()); + return EXIT_FAILURE; + } + } + + cprintf(RESET, "Number of Receivers: %d\n", numReceivers); + cprintf(RESET, "Start TCP Port: %hu\n", startTCPPort); + cprintf(RESET, "Print Callback Headers: %s\n\n", + (printHeaders ? "Enabled" : "Disabled")); + + /** - Catch signal SIGINT to close files and call destructors properly */ + struct sigaction sa; + sa.sa_flags = 0; // no flags + sa.sa_handler = sigInterruptHandler; // handler function + sigemptyset(&sa.sa_mask); // dont block additional signals during invocation + // of handler + if (sigaction(SIGINT, &sa, nullptr) == -1) { + cprintf(RED, "Could not set handler function for SIGINT\n"); + } + + /** - Ignore SIG_PIPE, prevents global signal handler, handle locally, + instead of a server crashing due to client crash when writing, it just + gives error */ + struct sigaction asa; + asa.sa_flags = 0; // no flags + asa.sa_handler = SIG_IGN; // handler function + sigemptyset(&asa.sa_mask); // dont block additional signals during + // invocation of handler + if (sigaction(SIGPIPE, &asa, nullptr) == -1) { + cprintf(RED, "Could not set handler function for SIGPIPE\n"); + } + + FrameStatus stat{true, false, numReceivers}; + // store pointer for signal handler + global_frame_status = &stat; + + // thread synchronizing all packets + void *user_data = static_cast(&stat); + std::thread combinerThread(Correlate, &stat); + + for (int i = 0; i != numReceivers; ++i) { + sem_t *semaphore = new sem_t; + sem_init(semaphore, 1, 0); + semaphores.push_back(semaphore); + + uint16_t port = startTCPPort + i; + threads.emplace_back([i, semaphore, port, user_data]() { + sls::Receiver receiver(port); + receiver.registerCallBackStartAcquisition(StartAcquisitionCallback, + user_data); + receiver.registerCallBackAcquisitionFinished( + AcquisitionFinishedCallback, user_data); + receiver.registerCallBackRawDataReady(GetDataCallback, user_data); + /** - as long as no Ctrl+C */ + sem_wait(semaphore); + sem_destroy(semaphore); + delete semaphore; + + // clean up frames + if (i == 0) + cleanup(); + }); + } + + for (auto &thread : threads) { + thread.join(); + } + + { + std::lock_guard lock(stat.mtx); + stat.terminate = true; + sem_post(&stat.available); + } + combinerThread.join(); + sem_destroy(&stat.available); + + LOG(sls::logINFOBLUE) << "Goodbye!"; + return EXIT_SUCCESS; +} diff --git a/slsReceiverSoftware/src/Implementation.cpp b/slsReceiverSoftware/src/Implementation.cpp index f7aed96ef7..dbacf6eafc 100644 --- a/slsReceiverSoftware/src/Implementation.cpp +++ b/slsReceiverSoftware/src/Implementation.cpp @@ -1802,7 +1802,7 @@ void Implementation::setTransceiverEnableMask(uint32_t mask) { * * * ************************************************/ void Implementation::registerCallBackStartAcquisition( - int (*func)(const startCallbackHeader, void *), void *arg) { + void (*func)(const startCallbackHeader, void *), void *arg) { startAcquisitionCallBack = func; pStartAcquisition = arg; } diff --git a/slsReceiverSoftware/src/Implementation.h b/slsReceiverSoftware/src/Implementation.h index 671f6deb2a..d32b491ea8 100644 --- a/slsReceiverSoftware/src/Implementation.h +++ b/slsReceiverSoftware/src/Implementation.h @@ -265,9 +265,8 @@ class Implementation : private virtual slsDetectorDefs { * * * ************************************************/ /** params: file path, file name, file index, image size */ - void registerCallBackStartAcquisition(int (*func)(const startCallbackHeader, - void *), - void *arg); + void registerCallBackStartAcquisition( + void (*func)(const startCallbackHeader, void *), void *arg); /** params: total frames caught */ void registerCallBackAcquisitionFinished( void (*func)(const endCallbackHeader, void *), void *arg); @@ -374,7 +373,8 @@ class Implementation : private virtual slsDetectorDefs { int ctbDbitOffset{0}; // callbacks - int (*startAcquisitionCallBack)(const startCallbackHeader, void *){nullptr}; + void (*startAcquisitionCallBack)(const startCallbackHeader, + void *){nullptr}; void *pStartAcquisition{nullptr}; void (*acquisitionFinishedCallBack)(const endCallbackHeader, void *){nullptr}; diff --git a/slsReceiverSoftware/src/MultiReceiverApp.cpp b/slsReceiverSoftware/src/MultiReceiverApp.cpp index 7e6b3656ba..061c9a4412 100644 --- a/slsReceiverSoftware/src/MultiReceiverApp.cpp +++ b/slsReceiverSoftware/src/MultiReceiverApp.cpp @@ -53,8 +53,8 @@ std::string getHelpMessage() { * enabled) if registerCallBackRawDataReady or * registerCallBackRawDataModifyReady registered, users get data */ -int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader, - void *objectPointer) { +void StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader, + void *objectPointer) { LOG(sls::logINFOBLUE) << "#### Start Acquisition:" << "\n\t[" << "\n\tUDP Port : " @@ -71,7 +71,6 @@ int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader, << "\n\tAdditional Json Header : " << sls::ToString(callbackHeader.addJsonHeader) << "\n\t]"; - return 0; } /** Acquisition Finished Call back */ diff --git a/slsReceiverSoftware/src/Receiver.cpp b/slsReceiverSoftware/src/Receiver.cpp index d7c6763667..d2d5593d5b 100644 --- a/slsReceiverSoftware/src/Receiver.cpp +++ b/slsReceiverSoftware/src/Receiver.cpp @@ -134,7 +134,7 @@ std::string Receiver::getReceiverVersion() { } void Receiver::registerCallBackStartAcquisition( - int (*func)(const startCallbackHeader, void *), void *arg) { + void (*func)(const startCallbackHeader, void *), void *arg) { tcpipInterface->registerCallBackStartAcquisition(func, arg); } diff --git a/slsSupportLib/include/sls/logger.h b/slsSupportLib/include/sls/logger.h index 437f7e9054..9b949d8027 100644 --- a/slsSupportLib/include/sls/logger.h +++ b/slsSupportLib/include/sls/logger.h @@ -16,6 +16,8 @@ enum TLogLevel { logINFOBLUE, logINFOGREEN, logINFORED, + logINFOCYAN, + logINFOMAGENTA, logINFO, logDEBUG, logDEBUG1, @@ -60,16 +62,25 @@ class Logger { // Danger this buffer need as many elements as TLogLevel static const char *Color(TLogLevel level) noexcept { static const char *const colors[] = { - RED BOLD, YELLOW BOLD, BLUE, GREEN, RED, RESET, - RESET, RESET, RESET, RESET, RESET, RESET}; + RED BOLD, YELLOW BOLD, BLUE, GREEN, RED, CYAN, MAGENTA, + RESET, RESET, RESET, RESET, RESET, RESET, RESET}; + // out of bounds + if (level < 0 || level >= sizeof(colors) / sizeof(colors[0])) { + return RESET; + } return colors[level]; } // Danger this buffer need as many elements as TLogLevel static std::string ToString(TLogLevel level) { static const char *const buffer[] = { - "ERROR", "WARNING", "INFO", "INFO", "INFO", "INFO", - "DEBUG", "DEBUG1", "DEBUG2", "DEBUG3", "DEBUG4", "DEBUG5"}; + "ERROR", "WARNING", "INFO", "INFO", "INFO", + "INFO", "INFO", "INFO", "DEBUG", "DEBUG1", + "DEBUG2", "DEBUG3", "DEBUG4", "DEBUG5"}; + // out of bounds + if (level < 0 || level >= sizeof(buffer) / sizeof(buffer[0])) { + return "UNKNOWN"; + } return buffer[level]; }