diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 6f35e229..c6f2daea 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -71,6 +71,8 @@ #define ZMQ_INPUT_MAX_FRAME_QUEUE 500 +// Default makes it match with old hard-coded settings. +#define ZMQ_INPUT_QUEUE_RESTART_DEPTH ZMQ_INPUT_MAX_FRAME_QUEUE - 3 typedef std::complex complexf; @@ -121,6 +123,7 @@ int launch_modulator(int argc, char* argv[]) std::string inputName = ""; std::string inputTransport = "file"; unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE; + unsigned inputQueueRestartDepth = ZMQ_INPUT_QUEUE_RESTART_DEPTH; std::string outputName; int useZeroMQOutput = 0; @@ -370,6 +373,8 @@ int launch_modulator(int argc, char* argv[]) inputTransport = pt.get("input.transport", "file"); inputMaxFramesQueued = pt.get("input.max_frames_queued", ZMQ_INPUT_MAX_FRAME_QUEUE); + inputQueueRestartDepth = pt.get("input.restart_depth", + ZMQ_INPUT_QUEUE_RESTART_DEPTH); inputName = pt.get("input.source", "/dev/stdin"); @@ -701,7 +706,7 @@ int launch_modulator(int argc, char* argv[]) ret = -1; throw std::runtime_error("Unable to open input"); #else - inputZeroMQReader->Open(inputName, inputMaxFramesQueued); + inputZeroMQReader->Open(inputName, inputMaxFramesQueued, inputQueueRestartDepth); m.inputReader = inputZeroMQReader.get(); #endif } @@ -812,7 +817,7 @@ int launch_modulator(int argc, char* argv[]) run_again = true; // Create a new input reader inputZeroMQReader = make_shared(); - inputZeroMQReader->Open(inputName, inputMaxFramesQueued); + inputZeroMQReader->Open(inputName, inputMaxFramesQueued, inputQueueRestartDepth); m.inputReader = inputZeroMQReader.get(); #endif } diff --git a/src/InputReader.h b/src/InputReader.h index daacc9eb..8d184618 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -150,6 +150,7 @@ struct InputZeroMQThreadData ThreadsafeQueue > > *in_messages; std::string uri; size_t max_queued_frames; + size_t restart_queue_depth; }; class InputZeroMQWorker @@ -194,7 +195,7 @@ class InputZeroMQReader : public InputReader worker_.Stop(); } - int Open(const std::string& uri, size_t max_queued_frames); + int Open(const std::string& uri, size_t max_queued_frames, size_t restart_depth = 0); int GetNextFrame(void* buffer); diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 820a4ff9..cc474994 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -63,7 +63,7 @@ struct zmq_dab_message_t uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; }; -int InputZeroMQReader::Open(const std::string& uri, size_t max_queued_frames) +int InputZeroMQReader::Open(const std::string& uri, size_t max_queued_frames, size_t restart_depth) { // The URL might start with zmq+tcp:// if (uri.substr(0, 4) == "zmq+") { @@ -75,6 +75,7 @@ int InputZeroMQReader::Open(const std::string& uri, size_t max_queued_frames) workerdata_.uri = uri_; workerdata_.max_queued_frames = max_queued_frames; + workerdata_.restart_queue_depth = restart_depth; // launch receiver thread worker_.Start(&workerdata_); @@ -134,20 +135,31 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) // zmq sockets are not thread safe. That's why // we create it here, and not at object creation. - const int hwm = 1; + const int hwm = 30; const int linger = 0; try { - subscriber.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); - subscriber.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); - subscriber.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + subscriber.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); + subscriber.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + subscriber.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); subscriber.connect(workerdata->uri.c_str()); subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + etiLog.level(trace) << "ZMQ,workerdata->max_queued_frames: " << workerdata->max_queued_frames; + etiLog.level(trace) << "ZMQ,workerdata->restart_queue_depth: " << workerdata->restart_queue_depth; + + size_t throwFrames = 0; while (running) { zmq::message_t incoming; subscriber.recv(&incoming); + if(throwFrames > 0) + { // Drop first throwFrames number of frames received. + // It is most likely old frames from frame source HWM-buffer. + --throwFrames; + continue; + } + if (m_to_drop) { queue_size = workerdata->in_messages->size(); if (queue_size > 4) { @@ -200,24 +212,50 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) workerdata->in_messages->notify(); if (!buffer_full) { - etiLog.level(warn) << "ZeroMQ buffer overfull !"; + etiLog.level(warn) << "ZeroMQ buffer overflow!"; buffer_full = true; - throw std::runtime_error("ZMQ input full"); + + //throw std::runtime_error("ZMQ input full"); } queue_size = workerdata->in_messages->size(); - /* Drop three more incoming ETI frames before - * we start accepting them again, to guarantee - * that we keep transmission frame vs. ETI frame - * phase. - */ - m_to_drop = 3; + // /* Drop three more incoming ETI frames before + // * we start accepting them again, to guarantee + // * that we keep transmission frame vs. ETI frame + // * phase. + // */ + //m_to_drop = 3; + + // Make sure we drop a multiple of 8 frames. + // There are 4 in each ZMQ-msg, with NUM_FRAMES_PER_ZMQ_MESSAGE = 4 as default. + // We drop the current msg with 4 frames and need to drop another to keep phase. + // Plus the amount we need to reach wanted start depth. + m_to_drop = (8 / NUM_FRAMES_PER_ZMQ_MESSAGE) - 1; // To keep frame phase for all modes. + + // Add amount of frames to reach wanted depth in a multiple of 8. + if(workerdata->restart_queue_depth != 0) // We have a restart_queue_depth value defined. + { + m_to_drop += ((workerdata->max_queued_frames - workerdata->restart_queue_depth) / 8) * (8 / NUM_FRAMES_PER_ZMQ_MESSAGE); + } + else // We use half the max_queued_frames as default restart value when restart_queue_depth isn't defined. + { + m_to_drop += ((workerdata->max_queued_frames / 2) / 8) * (8 / NUM_FRAMES_PER_ZMQ_MESSAGE); + } + + if(m_to_drop % (8 / NUM_FRAMES_PER_ZMQ_MESSAGE) == 0) + { + --m_to_drop; + if(m_to_drop < 0) + { + m_to_drop = 0; + } + } } if (queue_size < 5) { - etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !"; + etiLog.level(warn) << "ZMQ,ZeroMQ buffer low: " << queue_size << " elements !"; } } } diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 9ee93b49..573d79d3 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -619,6 +619,8 @@ void UHDWorker::process() num_underflows = 0; num_late_packets = 0; + num_consecutive_underflow_msgs = 0; + process_extra_frame = false; while (uwd->running) { md.has_time_spec = false; @@ -630,6 +632,18 @@ void UHDWorker::process() etiLog.log(trace, "UHD,pop"); handle_frame(&frame); + + if(process_extra_frame) + { + for (int loop = 0; loop < 5; ++loop) + { + etiLog.log(trace, "UHD,produce extra frame"); + etiLog.log(trace, "UHD,wait"); + uwd->frames.wait_and_pop(frame); + etiLog.log(trace, "UHD,pop"); + handle_frame(&frame); + } + } } } @@ -767,7 +781,19 @@ void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame) "%d underruns and %d late packets since last status.\n", usrp_time, num_underflows, num_late_packets); - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + + //boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + process_extra_frame = true; + if(++num_consecutive_underflow_msgs > 10) + { + num_consecutive_underflow_msgs = 0; + //boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + throw std::runtime_error("Too many consecutive underruns. Indicates unspecified internal problem, exiting..."); + } + } + else + { + num_consecutive_underflow_msgs = 0; } num_underflows = 0; num_late_packets = 0; diff --git a/src/OutputUHD.h b/src/OutputUHD.h index 1c591364..6070bc67 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -52,6 +52,7 @@ #include #include #include +#include #include "Log.h" #include "ModOutput.h" @@ -154,6 +155,8 @@ class UHDWorker { // Asynchronous message statistics int num_underflows; int num_late_packets; + int num_consecutive_underflow_msgs; + std::atomic process_extra_frame; uhd::tx_metadata_t md; bool last_tx_time_initialised;