From d06bcdaaf22924d5014a508fa62b5088130b7b4f Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Thu, 3 Nov 2016 14:17:36 +0100 Subject: [PATCH 01/29] Throw first 25 received frames to avoid old frames from source ZMQ output HWM buffer. --- src/InputZeroMQReader.cpp | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 820a4ff9..c51d64b9 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -134,20 +134,29 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) // zmq sockets are not thread safe. That's why // we create it here, and not at object creation. - const int hwm = 1; + const int hwm = 5; const int linger = 0; try { - subscriber.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); - subscriber.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); - subscriber.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + subscriber.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); + subscriber.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + subscriber.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); subscriber.connect(workerdata->uri.c_str()); subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + bool startup = true; + size_t throwFrames = 25; while (running) { zmq::message_t incoming; subscriber.recv(&incoming); + if(startup && (throwFrames > 0)) + { // Drop first 25 frames received. + // It is most likely old frames from senders HWM-buffer. + --throwFrames; + continue; + } + if (m_to_drop) { queue_size = workerdata->in_messages->size(); if (queue_size > 4) { From f2cf96dbe9f737833cb144033ff6dada8191c1a0 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Thu, 3 Nov 2016 15:39:40 +0100 Subject: [PATCH 02/29] removed underrun-fix for now. It makes it hard to get up and running --- src/OutputUHD.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 9ee93b49..a468bc57 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -767,7 +767,7 @@ void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame) "%d underruns and %d late packets since last status.\n", usrp_time, num_underflows, num_late_packets); - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + //boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); } num_underflows = 0; num_late_packets = 0; From e8336314335395a8cbe687b99d511125dbd0d3f0 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Thu, 3 Nov 2016 19:20:50 +0100 Subject: [PATCH 03/29] re-added the underrun-fix with a little stricter requirement --- src/OutputUHD.cpp | 11 ++++++++++- src/OutputUHD.h | 1 + 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index a468bc57..9853eefe 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -767,8 +767,17 @@ void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame) "%d underruns and %d late packets since last status.\n", usrp_time, num_underflows, num_late_packets); - //boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + + if(++num_consecutive_underflow_msgs > 10) + { + num_consecutive_underflow_msgs = 0; + boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + } } + else + { + num_consecutive_underflow_msgs = 0; + } num_underflows = 0; num_late_packets = 0; diff --git a/src/OutputUHD.h b/src/OutputUHD.h index 1c591364..284cae0f 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -154,6 +154,7 @@ class UHDWorker { // Asynchronous message statistics int num_underflows; int num_late_packets; + int num_consecutive_underflow_msgs; uhd::tx_metadata_t md; bool last_tx_time_initialised; From a77ad0852b2a72c02bf39541f26e3ac1691aceca Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Thu, 3 Nov 2016 19:33:21 +0100 Subject: [PATCH 04/29] trying to fix underrun-fix... --- src/OutputUHD.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 9853eefe..71b92182 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -771,7 +771,8 @@ void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame) if(++num_consecutive_underflow_msgs > 10) { num_consecutive_underflow_msgs = 0; - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + //boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + throw std::runtime_error("Too many consecutive underruns. Indicates unspecified internal problem, exiting..."); } } else From 8dd9f17d8d27bf7d414557fe32f6ca3af98e1a8e Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Thu, 3 Nov 2016 20:06:29 +0100 Subject: [PATCH 05/29] Allow more underruns before exit... --- src/OutputUHD.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 71b92182..e445bc09 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -619,6 +619,7 @@ void UHDWorker::process() num_underflows = 0; num_late_packets = 0; + num_consecutive_underflow_msgs = 0; while (uwd->running) { md.has_time_spec = false; @@ -768,7 +769,7 @@ void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame) usrp_time, num_underflows, num_late_packets); - if(++num_consecutive_underflow_msgs > 10) + if(++num_consecutive_underflow_msgs > 25) { num_consecutive_underflow_msgs = 0; //boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); From 0e8df26566014d0f1afc5ae428ba0185ac5ff26b Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 13:25:05 +0100 Subject: [PATCH 06/29] re-worked the logic to drop frames when buffer overflow. --- src/InputReader.h | 2 +- src/InputZeroMQReader.cpp | 29 ++++++++++++++++++----------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/InputReader.h b/src/InputReader.h index daacc9eb..e1e6d97c 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -158,7 +158,7 @@ class InputZeroMQWorker InputZeroMQWorker() : running(false), zmqcontext(1), - m_to_drop(0) { } + m_to_drop(40) { } void Start(struct InputZeroMQThreadData* workerdata); void Stop(); diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index c51d64b9..3db00371 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -134,7 +134,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) // zmq sockets are not thread safe. That's why // we create it here, and not at object creation. - const int hwm = 5; + const int hwm = 25; const int linger = 0; try { subscriber.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); @@ -143,19 +143,18 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.connect(workerdata->uri.c_str()); subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - bool startup = true; - size_t throwFrames = 25; + // size_t throwFrames = 50; while (running) { zmq::message_t incoming; subscriber.recv(&incoming); - if(startup && (throwFrames > 0)) - { // Drop first 25 frames received. - // It is most likely old frames from senders HWM-buffer. - --throwFrames; - continue; - } + // if(throwFrames > 0) + // { // Drop first throwFrames number of frames received. + // // It is most likely old frames from frame source HWM-buffer. + // --throwFrames; + // continue; + // } if (m_to_drop) { queue_size = workerdata->in_messages->size(); @@ -212,7 +211,13 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) etiLog.level(warn) << "ZeroMQ buffer overfull !"; buffer_full = true; - throw std::runtime_error("ZMQ input full"); + + // while(workerdata->in_messages->size() > 14) + // { + // for() + // workerdata->in_messages->pop(); + // } + //throw std::runtime_error("ZMQ input full"); } queue_size = workerdata->in_messages->size(); @@ -222,7 +227,9 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) * that we keep transmission frame vs. ETI frame * phase. */ - m_to_drop = 3; + // m_to_drop = 3; + m_to_drop = workerdata->max_queued_frames - 8; + } if (queue_size < 5) { From c92500613f7b0af88610d95c6270e8afe4037f61 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 13:51:57 +0100 Subject: [PATCH 07/29] Increased lower limit on buffer restart --- src/InputZeroMQReader.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 3db00371..3ad88ebc 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -228,7 +228,8 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) * phase. */ // m_to_drop = 3; - m_to_drop = workerdata->max_queued_frames - 8; + // make sure max_queued_frames is at least 20 frames. + m_to_drop = workerdata->max_queued_frames - 20; } From 4000726315af295306e0883f06c9cbd8fc3dc9e2 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 14:44:53 +0100 Subject: [PATCH 08/29] reverse solution test... --- src/InputZeroMQReader.cpp | 4 ++-- src/OutputUHD.cpp | 26 +++++++++++++++++++------- src/OutputUHD.h | 3 +++ 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 3ad88ebc..285fda21 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -228,8 +228,8 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) * phase. */ // m_to_drop = 3; - // make sure max_queued_frames is at least 20 frames. - m_to_drop = workerdata->max_queued_frames - 20; + // make sure max_queued_frames is at least 16 ETI-frames. + m_to_drop = workerdata->max_queued_frames - (16 / NUM_FRAMES_PER_ZMQ_MESSAGE); } diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index e445bc09..29e0f8d4 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -124,7 +124,8 @@ OutputUHD::OutputUHD( first_run(true), gps_fix_verified(false), worker(&uwd), - myDelayBuf(0) + myDelayBuf(0), + process_extra_frame(false) { myConf.muting = true; // is remote-controllable, and reset by the GPS fix check myConf.staticDelayUs = 0; // is remote-controllable @@ -631,6 +632,15 @@ void UHDWorker::process() etiLog.log(trace, "UHD,pop"); handle_frame(&frame); + + if(process_extra_frame) + { + etiLog.log(trace, "UHD,produce extra frame"); + etiLog.log(trace, "UHD,wait"); + uwd->frames.wait_and_pop(frame); + etiLog.log(trace, "UHD,pop"); + handle_frame(&frame); + } } } @@ -769,12 +779,14 @@ void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame) usrp_time, num_underflows, num_late_packets); - if(++num_consecutive_underflow_msgs > 25) - { - num_consecutive_underflow_msgs = 0; - //boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - throw std::runtime_error("Too many consecutive underruns. Indicates unspecified internal problem, exiting..."); - } + //boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + process_extra_frame = true; + if(++num_consecutive_underflow_msgs > 20) + { + num_consecutive_underflow_msgs = 0; + //boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + throw std::runtime_error("Too many consecutive underruns. Indicates unspecified internal problem, exiting..."); + } } else { diff --git a/src/OutputUHD.h b/src/OutputUHD.h index 284cae0f..0b6951ed 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -52,6 +52,7 @@ #include #include #include +#include #include "Log.h" #include "ModOutput.h" @@ -276,6 +277,8 @@ class OutputUHD: public ModOutput, public RemoteControllable { boost::unique_future gps_fix_future; boost::thread gps_fix_task; + std::atomic process_extra_frame; + // Wait time in seconds to get fix static const int initial_gps_fix_wait = 180; From e0e6a9a9eb6606d3bf5b96a0e637cdf5418f6568 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 14:52:11 +0100 Subject: [PATCH 09/29] moved process_extra_frame-flag to correct class --- src/OutputUHD.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/OutputUHD.h b/src/OutputUHD.h index 0b6951ed..6070bc67 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -156,6 +156,7 @@ class UHDWorker { int num_underflows; int num_late_packets; int num_consecutive_underflow_msgs; + std::atomic process_extra_frame; uhd::tx_metadata_t md; bool last_tx_time_initialised; @@ -277,8 +278,6 @@ class OutputUHD: public ModOutput, public RemoteControllable { boost::unique_future gps_fix_future; boost::thread gps_fix_task; - std::atomic process_extra_frame; - // Wait time in seconds to get fix static const int initial_gps_fix_wait = 180; From 9d88baafe1c3657ae3fb85eb5d9e2d71aea62750 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 15:03:13 +0100 Subject: [PATCH 10/29] Moved process_extra_frame-init to correct class too... --- src/OutputUHD.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 29e0f8d4..c52f4f16 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -124,8 +124,7 @@ OutputUHD::OutputUHD( first_run(true), gps_fix_verified(false), worker(&uwd), - myDelayBuf(0), - process_extra_frame(false) + myDelayBuf(0) { myConf.muting = true; // is remote-controllable, and reset by the GPS fix check myConf.staticDelayUs = 0; // is remote-controllable @@ -620,7 +619,8 @@ void UHDWorker::process() num_underflows = 0; num_late_packets = 0; - num_consecutive_underflow_msgs = 0; + num_consecutive_underflow_msgs = 0; + process_extra_frame = false; while (uwd->running) { md.has_time_spec = false; From b570b7c6e5b5285d5a27932424169ebf5fb3887c Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 15:11:42 +0100 Subject: [PATCH 11/29] produce more extra frames at underrun... --- src/OutputUHD.cpp | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index c52f4f16..b9731146 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -635,11 +635,14 @@ void UHDWorker::process() if(process_extra_frame) { - etiLog.log(trace, "UHD,produce extra frame"); - etiLog.log(trace, "UHD,wait"); - uwd->frames.wait_and_pop(frame); - etiLog.log(trace, "UHD,pop"); - handle_frame(&frame); + for (int loop = 0; loop < 5; ++loop) + { + etiLog.log(trace, "UHD,produce extra frame"); + etiLog.log(trace, "UHD,wait"); + uwd->frames.wait_and_pop(frame); + etiLog.log(trace, "UHD,pop"); + handle_frame(&frame); + } } } } From 63b7dfa212acf5ca8dc0e37080c66c081fe1f3b3 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 15:33:29 +0100 Subject: [PATCH 12/29] restored to no drop at start and simpler buffer reset. --- src/InputReader.h | 2 +- src/InputZeroMQReader.cpp | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/InputReader.h b/src/InputReader.h index e1e6d97c..daacc9eb 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -158,7 +158,7 @@ class InputZeroMQWorker InputZeroMQWorker() : running(false), zmqcontext(1), - m_to_drop(40) { } + m_to_drop(0) { } void Start(struct InputZeroMQThreadData* workerdata); void Stop(); diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 285fda21..f655caee 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -228,8 +228,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) * phase. */ // m_to_drop = 3; - // make sure max_queued_frames is at least 16 ETI-frames. - m_to_drop = workerdata->max_queued_frames - (16 / NUM_FRAMES_PER_ZMQ_MESSAGE); + m_to_drop = workerdata->max_queued_frames / 2; } From 165f103dfd87d66b8aa7babd985ac58101f91e29 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 15:47:39 +0100 Subject: [PATCH 13/29] returned to own start-drop-handling --- src/InputZeroMQReader.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f655caee..f13ddc4f 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -143,18 +143,18 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.connect(workerdata->uri.c_str()); subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - // size_t throwFrames = 50; + size_t throwFrames = 25; while (running) { zmq::message_t incoming; subscriber.recv(&incoming); - // if(throwFrames > 0) - // { // Drop first throwFrames number of frames received. - // // It is most likely old frames from frame source HWM-buffer. - // --throwFrames; - // continue; - // } + if(throwFrames > 0) + { // Drop first throwFrames number of frames received. + // It is most likely old frames from frame source HWM-buffer. + --throwFrames; + continue; + } if (m_to_drop) { queue_size = workerdata->in_messages->size(); From 923053dc0d5f4d60eda89df83dd84a417bcc4999 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 15:55:14 +0100 Subject: [PATCH 14/29] changing start values... --- src/InputZeroMQReader.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f13ddc4f..2fc39e10 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -134,7 +134,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) // zmq sockets are not thread safe. That's why // we create it here, and not at object creation. - const int hwm = 25; + const int hwm = 5; const int linger = 0; try { subscriber.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); @@ -143,7 +143,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.connect(workerdata->uri.c_str()); subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - size_t throwFrames = 25; + size_t throwFrames = 12; while (running) { zmq::message_t incoming; From 4dce06b4834cb302449f509b1957b6b514193b37 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 16:05:56 +0100 Subject: [PATCH 15/29] no frame throw at start... --- src/InputZeroMQReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 2fc39e10..f039d4b8 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -143,7 +143,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.connect(workerdata->uri.c_str()); subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - size_t throwFrames = 12; + size_t throwFrames = 0; while (running) { zmq::message_t incoming; From fdf1bdf0a9b2157936603d378c68777fb3df0b18 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 16:17:58 +0100 Subject: [PATCH 16/29] changed HWM from 5 to 10 --- src/InputZeroMQReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f039d4b8..e919fe68 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -134,7 +134,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) // zmq sockets are not thread safe. That's why // we create it here, and not at object creation. - const int hwm = 5; + const int hwm = 10; const int linger = 0; try { subscriber.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); From f39488eb0f67339dc79415317314ce68a07d712d Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 16:32:02 +0100 Subject: [PATCH 17/29] changed HWM from 10 to 50 --- src/InputZeroMQReader.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index e919fe68..3cb3e05f 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -134,7 +134,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) // zmq sockets are not thread safe. That's why // we create it here, and not at object creation. - const int hwm = 10; + const int hwm = 50; const int linger = 0; try { subscriber.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); @@ -220,6 +220,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) //throw std::runtime_error("ZMQ input full"); } + workerdata->in_messages->clear(); queue_size = workerdata->in_messages->size(); /* Drop three more incoming ETI frames before @@ -228,7 +229,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) * phase. */ // m_to_drop = 3; - m_to_drop = workerdata->max_queued_frames / 2; + //m_to_drop = workerdata->max_queued_frames / 2; } From 959b0703ad7892d9ba7eb34441ef02f4ee143157 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 16:42:09 +0100 Subject: [PATCH 18/29] modified overflow behavior --- src/InputZeroMQReader.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 3cb3e05f..21163eb5 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -220,7 +220,6 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) //throw std::runtime_error("ZMQ input full"); } - workerdata->in_messages->clear(); queue_size = workerdata->in_messages->size(); /* Drop three more incoming ETI frames before @@ -228,8 +227,8 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) * that we keep transmission frame vs. ETI frame * phase. */ - // m_to_drop = 3; - //m_to_drop = workerdata->max_queued_frames / 2; + //m_to_drop = 3; + m_to_drop = workerdata->max_queued_frames / 2; } From 4164c4e9cdbfc611eb0b9bdbd30c8ba5f8f4b06c Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 17:03:19 +0100 Subject: [PATCH 19/29] modified to no extra drop on buffer full --- src/InputZeroMQReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 21163eb5..a6e74909 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -228,7 +228,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) * phase. */ //m_to_drop = 3; - m_to_drop = workerdata->max_queued_frames / 2; + //m_to_drop = workerdata->max_queued_frames / 2; } From 33e51bfb6c49542661164aaeb51dec70626cf8cd Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 17:15:41 +0100 Subject: [PATCH 20/29] Re-added throwFrames=20 --- src/InputZeroMQReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index a6e74909..3b456711 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -143,7 +143,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.connect(workerdata->uri.c_str()); subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - size_t throwFrames = 0; + size_t throwFrames = 20; while (running) { zmq::message_t incoming; From b911a8952fe36c95bb1d2e510e8f6d9531af6eb2 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 17:23:37 +0100 Subject: [PATCH 21/29] HWM=30, no-throw --- src/InputZeroMQReader.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 3b456711..fa50f306 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -134,7 +134,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) // zmq sockets are not thread safe. That's why // we create it here, and not at object creation. - const int hwm = 50; + const int hwm = 30; const int linger = 0; try { subscriber.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); @@ -143,7 +143,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.connect(workerdata->uri.c_str()); subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - size_t throwFrames = 20; + size_t throwFrames = 0; while (running) { zmq::message_t incoming; From 1199c6255ace14ba18fbfb5a36069bc8162f3399 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 18:22:48 +0100 Subject: [PATCH 22/29] re-add overflow code... --- src/InputZeroMQReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index fa50f306..525bb1fd 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -228,7 +228,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) * phase. */ //m_to_drop = 3; - //m_to_drop = workerdata->max_queued_frames / 2; + m_to_drop = workerdata->max_queued_frames - 12; } From 244a25546a23ff40f72d9a927b7cde1aab103f0f Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 18:59:26 +0100 Subject: [PATCH 23/29] added restart-depth parameter --- src/DabMod.cpp | 9 +++++++-- src/InputReader.h | 3 ++- src/InputZeroMQReader.cpp | 18 ++++++++++-------- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 6f35e229..c6f2daea 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -71,6 +71,8 @@ #define ZMQ_INPUT_MAX_FRAME_QUEUE 500 +// Default makes it match with old hard-coded settings. +#define ZMQ_INPUT_QUEUE_RESTART_DEPTH ZMQ_INPUT_MAX_FRAME_QUEUE - 3 typedef std::complex complexf; @@ -121,6 +123,7 @@ int launch_modulator(int argc, char* argv[]) std::string inputName = ""; std::string inputTransport = "file"; unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE; + unsigned inputQueueRestartDepth = ZMQ_INPUT_QUEUE_RESTART_DEPTH; std::string outputName; int useZeroMQOutput = 0; @@ -370,6 +373,8 @@ int launch_modulator(int argc, char* argv[]) inputTransport = pt.get("input.transport", "file"); inputMaxFramesQueued = pt.get("input.max_frames_queued", ZMQ_INPUT_MAX_FRAME_QUEUE); + inputQueueRestartDepth = pt.get("input.restart_depth", + ZMQ_INPUT_QUEUE_RESTART_DEPTH); inputName = pt.get("input.source", "/dev/stdin"); @@ -701,7 +706,7 @@ int launch_modulator(int argc, char* argv[]) ret = -1; throw std::runtime_error("Unable to open input"); #else - inputZeroMQReader->Open(inputName, inputMaxFramesQueued); + inputZeroMQReader->Open(inputName, inputMaxFramesQueued, inputQueueRestartDepth); m.inputReader = inputZeroMQReader.get(); #endif } @@ -812,7 +817,7 @@ int launch_modulator(int argc, char* argv[]) run_again = true; // Create a new input reader inputZeroMQReader = make_shared(); - inputZeroMQReader->Open(inputName, inputMaxFramesQueued); + inputZeroMQReader->Open(inputName, inputMaxFramesQueued, inputQueueRestartDepth); m.inputReader = inputZeroMQReader.get(); #endif } diff --git a/src/InputReader.h b/src/InputReader.h index daacc9eb..8d184618 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -150,6 +150,7 @@ struct InputZeroMQThreadData ThreadsafeQueue > > *in_messages; std::string uri; size_t max_queued_frames; + size_t restart_queue_depth; }; class InputZeroMQWorker @@ -194,7 +195,7 @@ class InputZeroMQReader : public InputReader worker_.Stop(); } - int Open(const std::string& uri, size_t max_queued_frames); + int Open(const std::string& uri, size_t max_queued_frames, size_t restart_depth = 0); int GetNextFrame(void* buffer); diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 525bb1fd..2f1bf1c2 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -63,7 +63,7 @@ struct zmq_dab_message_t uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; }; -int InputZeroMQReader::Open(const std::string& uri, size_t max_queued_frames) +int InputZeroMQReader::Open(const std::string& uri, size_t max_queued_frames, size_t restart_depth) { // The URL might start with zmq+tcp:// if (uri.substr(0, 4) == "zmq+") { @@ -75,6 +75,7 @@ int InputZeroMQReader::Open(const std::string& uri, size_t max_queued_frames) workerdata_.uri = uri_; workerdata_.max_queued_frames = max_queued_frames; + workerdata_.restart_queue_depth = restart_depth; // launch receiver thread worker_.Start(&workerdata_); @@ -212,11 +213,6 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) buffer_full = true; - // while(workerdata->in_messages->size() > 14) - // { - // for() - // workerdata->in_messages->pop(); - // } //throw std::runtime_error("ZMQ input full"); } @@ -228,8 +224,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) * phase. */ //m_to_drop = 3; - m_to_drop = workerdata->max_queued_frames - 12; - + if(workerdata->restart_queue_depth != 0) + { + m_to_drop = workerdata->max_queued_frames - workerdata->restart_queue_depth; + } + else + { + m_to_drop = workerdata->max_queued_frames / 2; + } } if (queue_size < 5) { From 0dc9de3335035ec88065a3aa04d911b434e0f691 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Fri, 4 Nov 2016 20:05:15 +0100 Subject: [PATCH 24/29] added some logs --- src/InputZeroMQReader.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 2f1bf1c2..9fac9704 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -144,6 +144,9 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.connect(workerdata->uri.c_str()); subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + etiLog.level(trace) << "ZMQ,workerdata->max_queued_frames: " << workerdata->max_queued_frames; + etiLog.level(trace) << "ZMQ,workerdata->restart_queue_depth: " << workerdata->restart_queue_depth; + size_t throwFrames = 0; while (running) { @@ -226,10 +229,12 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) //m_to_drop = 3; if(workerdata->restart_queue_depth != 0) { + etiLog.level(warn) << "resetting ZeroMQ buffer to: " << workerdata->restart_queue_depth; m_to_drop = workerdata->max_queued_frames - workerdata->restart_queue_depth; } else { + etiLog.level(warn) << "resetting ZeroMQ buffer to: " << workerdata->max_queued_frames / 2; m_to_drop = workerdata->max_queued_frames / 2; } } From a0bc6188e63f7c8f2603e20a13cb148d1a16cea5 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Mon, 7 Nov 2016 10:26:04 +0100 Subject: [PATCH 25/29] Added 'ZMQ,' to some logs. Halved the number of frames to drop at buffer overflow --- src/InputZeroMQReader.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 9fac9704..ea04c545 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -212,7 +212,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) workerdata->in_messages->notify(); if (!buffer_full) { - etiLog.level(warn) << "ZeroMQ buffer overfull !"; + etiLog.level(warn) << "ZMQ,ZeroMQ buffer overfull !"; buffer_full = true; @@ -229,18 +229,18 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) //m_to_drop = 3; if(workerdata->restart_queue_depth != 0) { - etiLog.level(warn) << "resetting ZeroMQ buffer to: " << workerdata->restart_queue_depth; - m_to_drop = workerdata->max_queued_frames - workerdata->restart_queue_depth; + etiLog.level(warn) << "ZMQ,resetting ZeroMQ buffer to: " << workerdata->restart_queue_depth; + m_to_drop = (workerdata->max_queued_frames - workerdata->restart_queue_depth) / 2; } else { - etiLog.level(warn) << "resetting ZeroMQ buffer to: " << workerdata->max_queued_frames / 2; + etiLog.level(warn) << "ZMQ,resetting ZeroMQ buffer to: " << workerdata->max_queued_frames / 2; m_to_drop = workerdata->max_queued_frames / 2; } } if (queue_size < 5) { - etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !"; + etiLog.level(warn) << "ZMQ,ZeroMQ buffer low: " << queue_size << " elements !"; } } } From ed9315f4b736ef782510d58020d2e15cc423aae5 Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Mon, 7 Nov 2016 10:32:09 +0100 Subject: [PATCH 26/29] changed to drop a foutrh of the packages as they contain 4 frames... --- src/InputZeroMQReader.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index ea04c545..d858d430 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -230,12 +230,12 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) if(workerdata->restart_queue_depth != 0) { etiLog.level(warn) << "ZMQ,resetting ZeroMQ buffer to: " << workerdata->restart_queue_depth; - m_to_drop = (workerdata->max_queued_frames - workerdata->restart_queue_depth) / 2; + m_to_drop = (workerdata->max_queued_frames - workerdata->restart_queue_depth) / 4; } else { etiLog.level(warn) << "ZMQ,resetting ZeroMQ buffer to: " << workerdata->max_queued_frames / 2; - m_to_drop = workerdata->max_queued_frames / 2; + m_to_drop = workerdata->max_queued_frames / 4; } } From 53e13172cbd05da9923ca313ad0f5f399360eedb Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Mon, 7 Nov 2016 11:20:35 +0100 Subject: [PATCH 27/29] make sure framphase is intact when dropping frames. --- src/InputZeroMQReader.cpp | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index d858d430..42eb4dbb 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -212,7 +212,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) workerdata->in_messages->notify(); if (!buffer_full) { - etiLog.level(warn) << "ZMQ,ZeroMQ buffer overfull !"; + etiLog.level(warn) << "ZeroMQ buffer overflow!"; buffer_full = true; @@ -229,14 +229,29 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) //m_to_drop = 3; if(workerdata->restart_queue_depth != 0) { - etiLog.level(warn) << "ZMQ,resetting ZeroMQ buffer to: " << workerdata->restart_queue_depth; + etiLog.level(warn) << "resetting ZeroMQ buffer to: " << workerdata->restart_queue_depth; m_to_drop = (workerdata->max_queued_frames - workerdata->restart_queue_depth) / 4; } else { - etiLog.level(warn) << "ZMQ,resetting ZeroMQ buffer to: " << workerdata->max_queued_frames / 2; + etiLog.level(warn) << "resetting ZeroMQ buffer to: " << workerdata->max_queued_frames / 2; m_to_drop = workerdata->max_queued_frames / 4; } + if(m_to_drop != 0) + { + /* Make sure we drop a multiple of 8 frames. + * There are 4 in each ZMQ-msg, and we drop the + * current msg with 4 frames. + */ + if(m_to_drop % 2 == 0) + { + --m_to_drop; + if(m_to_drop < 0) + { + m_to_drop = 0; + } + } + } } if (queue_size < 5) { From bf87db1de73af73f804ed5e02c8f9bd0b44eb97c Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Mon, 7 Nov 2016 13:06:32 +0100 Subject: [PATCH 28/29] re-factored buffer overflow-fix to handle other than 4 frames per ZMQ-msg --- src/InputZeroMQReader.cpp | 44 +++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 42eb4dbb..cc474994 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -221,35 +221,35 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) queue_size = workerdata->in_messages->size(); - /* Drop three more incoming ETI frames before - * we start accepting them again, to guarantee - * that we keep transmission frame vs. ETI frame - * phase. - */ + // /* Drop three more incoming ETI frames before + // * we start accepting them again, to guarantee + // * that we keep transmission frame vs. ETI frame + // * phase. + // */ //m_to_drop = 3; - if(workerdata->restart_queue_depth != 0) + + // Make sure we drop a multiple of 8 frames. + // There are 4 in each ZMQ-msg, with NUM_FRAMES_PER_ZMQ_MESSAGE = 4 as default. + // We drop the current msg with 4 frames and need to drop another to keep phase. + // Plus the amount we need to reach wanted start depth. + m_to_drop = (8 / NUM_FRAMES_PER_ZMQ_MESSAGE) - 1; // To keep frame phase for all modes. + + // Add amount of frames to reach wanted depth in a multiple of 8. + if(workerdata->restart_queue_depth != 0) // We have a restart_queue_depth value defined. { - etiLog.level(warn) << "resetting ZeroMQ buffer to: " << workerdata->restart_queue_depth; - m_to_drop = (workerdata->max_queued_frames - workerdata->restart_queue_depth) / 4; + m_to_drop += ((workerdata->max_queued_frames - workerdata->restart_queue_depth) / 8) * (8 / NUM_FRAMES_PER_ZMQ_MESSAGE); } - else + else // We use half the max_queued_frames as default restart value when restart_queue_depth isn't defined. { - etiLog.level(warn) << "resetting ZeroMQ buffer to: " << workerdata->max_queued_frames / 2; - m_to_drop = workerdata->max_queued_frames / 4; + m_to_drop += ((workerdata->max_queued_frames / 2) / 8) * (8 / NUM_FRAMES_PER_ZMQ_MESSAGE); } - if(m_to_drop != 0) + + if(m_to_drop % (8 / NUM_FRAMES_PER_ZMQ_MESSAGE) == 0) { - /* Make sure we drop a multiple of 8 frames. - * There are 4 in each ZMQ-msg, and we drop the - * current msg with 4 frames. - */ - if(m_to_drop % 2 == 0) + --m_to_drop; + if(m_to_drop < 0) { - --m_to_drop; - if(m_to_drop < 0) - { - m_to_drop = 0; - } + m_to_drop = 0; } } } From 7c2d6f1500bb075a95768b760181c3ebe74b8a2f Mon Sep 17 00:00:00 2001 From: Fredrik Landberg Date: Mon, 7 Nov 2016 13:07:53 +0100 Subject: [PATCH 29/29] Changed to 10 consecutive underruns before exiting. --- src/OutputUHD.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index b9731146..573d79d3 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -784,17 +784,17 @@ void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame) //boost::this_thread::sleep(boost::posix_time::milliseconds(100)); process_extra_frame = true; - if(++num_consecutive_underflow_msgs > 20) + if(++num_consecutive_underflow_msgs > 10) { num_consecutive_underflow_msgs = 0; //boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); throw std::runtime_error("Too many consecutive underruns. Indicates unspecified internal problem, exiting..."); } } - else - { - num_consecutive_underflow_msgs = 0; - } + else + { + num_consecutive_underflow_msgs = 0; + } num_underflows = 0; num_late_packets = 0;