Skip to content

Commit

Permalink
Merge pull request #4 from paneda/release_fix_delay_test
Browse files Browse the repository at this point in the history
  • Loading branch information
Landberg authored Nov 7, 2016
2 parents f67a263 + 7c2d6f1 commit cbe22cf
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 18 deletions.
9 changes: 7 additions & 2 deletions src/DabMod.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@

#define ZMQ_INPUT_MAX_FRAME_QUEUE 500

// Default makes it match with old hard-coded settings.
#define ZMQ_INPUT_QUEUE_RESTART_DEPTH ZMQ_INPUT_MAX_FRAME_QUEUE - 3

typedef std::complex<float> complexf;

Expand Down Expand Up @@ -121,6 +123,7 @@ int launch_modulator(int argc, char* argv[])
std::string inputName = "";
std::string inputTransport = "file";
unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE;
unsigned inputQueueRestartDepth = ZMQ_INPUT_QUEUE_RESTART_DEPTH;

std::string outputName;
int useZeroMQOutput = 0;
Expand Down Expand Up @@ -370,6 +373,8 @@ int launch_modulator(int argc, char* argv[])
inputTransport = pt.get("input.transport", "file");
inputMaxFramesQueued = pt.get("input.max_frames_queued",
ZMQ_INPUT_MAX_FRAME_QUEUE);
inputQueueRestartDepth = pt.get("input.restart_depth",
ZMQ_INPUT_QUEUE_RESTART_DEPTH);

inputName = pt.get("input.source", "/dev/stdin");

Expand Down Expand Up @@ -701,7 +706,7 @@ int launch_modulator(int argc, char* argv[])
ret = -1;
throw std::runtime_error("Unable to open input");
#else
inputZeroMQReader->Open(inputName, inputMaxFramesQueued);
inputZeroMQReader->Open(inputName, inputMaxFramesQueued, inputQueueRestartDepth);
m.inputReader = inputZeroMQReader.get();
#endif
}
Expand Down Expand Up @@ -812,7 +817,7 @@ int launch_modulator(int argc, char* argv[])
run_again = true;
// Create a new input reader
inputZeroMQReader = make_shared<InputZeroMQReader>();
inputZeroMQReader->Open(inputName, inputMaxFramesQueued);
inputZeroMQReader->Open(inputName, inputMaxFramesQueued, inputQueueRestartDepth);
m.inputReader = inputZeroMQReader.get();
#endif
}
Expand Down
3 changes: 2 additions & 1 deletion src/InputReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ struct InputZeroMQThreadData
ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > *in_messages;
std::string uri;
size_t max_queued_frames;
size_t restart_queue_depth;
};

class InputZeroMQWorker
Expand Down Expand Up @@ -194,7 +195,7 @@ class InputZeroMQReader : public InputReader
worker_.Stop();
}

int Open(const std::string& uri, size_t max_queued_frames);
int Open(const std::string& uri, size_t max_queued_frames, size_t restart_depth = 0);

int GetNextFrame(void* buffer);

Expand Down
66 changes: 52 additions & 14 deletions src/InputZeroMQReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ struct zmq_dab_message_t
uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144];
};

int InputZeroMQReader::Open(const std::string& uri, size_t max_queued_frames)
int InputZeroMQReader::Open(const std::string& uri, size_t max_queued_frames, size_t restart_depth)
{
// The URL might start with zmq+tcp://
if (uri.substr(0, 4) == "zmq+") {
Expand All @@ -75,6 +75,7 @@ int InputZeroMQReader::Open(const std::string& uri, size_t max_queued_frames)

workerdata_.uri = uri_;
workerdata_.max_queued_frames = max_queued_frames;
workerdata_.restart_queue_depth = restart_depth;
// launch receiver thread
worker_.Start(&workerdata_);

Expand Down Expand Up @@ -134,20 +135,31 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
// zmq sockets are not thread safe. That's why
// we create it here, and not at object creation.

const int hwm = 1;
const int hwm = 30;
const int linger = 0;
try {
subscriber.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
subscriber.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
subscriber.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
subscriber.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
subscriber.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
subscriber.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
subscriber.connect(workerdata->uri.c_str());
subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages

etiLog.level(trace) << "ZMQ,workerdata->max_queued_frames: " << workerdata->max_queued_frames;
etiLog.level(trace) << "ZMQ,workerdata->restart_queue_depth: " << workerdata->restart_queue_depth;

size_t throwFrames = 0;
while (running)
{
zmq::message_t incoming;
subscriber.recv(&incoming);

if(throwFrames > 0)
{ // Drop first throwFrames number of frames received.
// It is most likely old frames from frame source HWM-buffer.
--throwFrames;
continue;
}

if (m_to_drop) {
queue_size = workerdata->in_messages->size();
if (queue_size > 4) {
Expand Down Expand Up @@ -200,24 +212,50 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
workerdata->in_messages->notify();

if (!buffer_full) {
etiLog.level(warn) << "ZeroMQ buffer overfull !";
etiLog.level(warn) << "ZeroMQ buffer overflow!";

buffer_full = true;
throw std::runtime_error("ZMQ input full");

//throw std::runtime_error("ZMQ input full");
}

queue_size = workerdata->in_messages->size();

/* Drop three more incoming ETI frames before
* we start accepting them again, to guarantee
* that we keep transmission frame vs. ETI frame
* phase.
*/
m_to_drop = 3;
// /* Drop three more incoming ETI frames before
// * we start accepting them again, to guarantee
// * that we keep transmission frame vs. ETI frame
// * phase.
// */
//m_to_drop = 3;

// Make sure we drop a multiple of 8 frames.
// There are 4 in each ZMQ-msg, with NUM_FRAMES_PER_ZMQ_MESSAGE = 4 as default.
// We drop the current msg with 4 frames and need to drop another to keep phase.
// Plus the amount we need to reach wanted start depth.
m_to_drop = (8 / NUM_FRAMES_PER_ZMQ_MESSAGE) - 1; // To keep frame phase for all modes.

// Add amount of frames to reach wanted depth in a multiple of 8.
if(workerdata->restart_queue_depth != 0) // We have a restart_queue_depth value defined.
{
m_to_drop += ((workerdata->max_queued_frames - workerdata->restart_queue_depth) / 8) * (8 / NUM_FRAMES_PER_ZMQ_MESSAGE);
}
else // We use half the max_queued_frames as default restart value when restart_queue_depth isn't defined.
{
m_to_drop += ((workerdata->max_queued_frames / 2) / 8) * (8 / NUM_FRAMES_PER_ZMQ_MESSAGE);
}

if(m_to_drop % (8 / NUM_FRAMES_PER_ZMQ_MESSAGE) == 0)
{
--m_to_drop;
if(m_to_drop < 0)
{
m_to_drop = 0;
}
}
}

if (queue_size < 5) {
etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !";
etiLog.level(warn) << "ZMQ,ZeroMQ buffer low: " << queue_size << " elements !";
}
}
}
Expand Down
28 changes: 27 additions & 1 deletion src/OutputUHD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ void UHDWorker::process()

num_underflows = 0;
num_late_packets = 0;
num_consecutive_underflow_msgs = 0;
process_extra_frame = false;

while (uwd->running) {
md.has_time_spec = false;
Expand All @@ -630,6 +632,18 @@ void UHDWorker::process()
etiLog.log(trace, "UHD,pop");

handle_frame(&frame);

if(process_extra_frame)
{
for (int loop = 0; loop < 5; ++loop)
{
etiLog.log(trace, "UHD,produce extra frame");
etiLog.log(trace, "UHD,wait");
uwd->frames.wait_and_pop(frame);
etiLog.log(trace, "UHD,pop");
handle_frame(&frame);
}
}
}
}

Expand Down Expand Up @@ -767,7 +781,19 @@ void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame)
"%d underruns and %d late packets since last status.\n",
usrp_time,
num_underflows, num_late_packets);
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));

//boost::this_thread::sleep(boost::posix_time::milliseconds(100));
process_extra_frame = true;
if(++num_consecutive_underflow_msgs > 10)
{
num_consecutive_underflow_msgs = 0;
//boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
throw std::runtime_error("Too many consecutive underruns. Indicates unspecified internal problem, exiting...");
}
}
else
{
num_consecutive_underflow_msgs = 0;
}
num_underflows = 0;
num_late_packets = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/OutputUHD.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include <chrono>
#include <memory>
#include <string>
#include <atomic>

#include "Log.h"
#include "ModOutput.h"
Expand Down Expand Up @@ -154,6 +155,8 @@ class UHDWorker {
// Asynchronous message statistics
int num_underflows;
int num_late_packets;
int num_consecutive_underflow_msgs;
std::atomic<bool> process_extra_frame;

uhd::tx_metadata_t md;
bool last_tx_time_initialised;
Expand Down

0 comments on commit cbe22cf

Please sign in to comment.