Skip to content

Commit

Permalink
implement a crude way to stop program on CTRL-C or EOF
Browse files Browse the repository at this point in the history
  • Loading branch information
marenz2569 committed Jan 23, 2024
1 parent 38e9308 commit 31946c5
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 5 deletions.
3 changes: 2 additions & 1 deletion include/iq_stream_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <bit_stream_decoder.hpp>
#include <fixed_queue.hpp>
#include <l2/lower_mac.hpp>
#include <signal_handler.hpp>
#include <streaming_ordered_output_thread_pool_executor.hpp>

/**
Expand All @@ -27,7 +28,7 @@ class IQStreamDecoder {
public:
IQStreamDecoder(std::shared_ptr<LowerMac> lower_mac, std::shared_ptr<BitStreamDecoder> bit_stream_decoder,
bool is_uplink);
~IQStreamDecoder() = default;
~IQStreamDecoder();

void process_complex(std::complex<float> symbol) noexcept;

Expand Down
3 changes: 3 additions & 0 deletions include/signal_handler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#pragma once

volatile extern bool stop;
4 changes: 3 additions & 1 deletion include/streaming_ordered_output_thread_pool_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
#include <type_traits>
#include <vector>

#include <signal_handler.hpp>

// thread pool executing work but outputting it the order of the input
template <typename ReturnType> class StreamingOrderedOutputThreadPoolExecutor {

public:
StreamingOrderedOutputThreadPoolExecutor(int numWorkers);
~StreamingOrderedOutputThreadPoolExecutor() = default;
~StreamingOrderedOutputThreadPoolExecutor();

// append work to the queue
void queueWork(std::function<ReturnType()> work);
Expand Down
3 changes: 3 additions & 0 deletions src/decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,19 @@ void Decoder::main_loop() {
auto bytes_read = read(input_fd_, rx_buffer, sizeof(rx_buffer));

if (errno == EINTR) {
stop = 1;
return;
} else if (bytes_read < 0) {
throw std::runtime_error("Read error");
} else if (bytes_read == 0) {
stop = 1;
return;
}

if (output_file_fd_.has_value()) {
if (write(*output_file_fd_, rx_buffer, bytes_read) != bytes_read) {
// unable to write to output TODO: possible log or fail hard
stop = 1;
return;
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/iq_stream_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ IQStreamDecoder::IQStreamDecoder(std::shared_ptr<LowerMac> lower_mac,
#endif
}

IQStreamDecoder::~IQStreamDecoder() {
// TODO: replace this crude hack that keeps the StreamingOrderedOutputThreadPoolExecutor<...> get function from blocking on programm stop
threadPool_->queueWork([]() { return std::vector<std::function<void()>>(); });
upperMacWorkerThread_.join();
}

void IQStreamDecoder::upperMacWorker() {
while (true) {
while (!stop) {
for (auto func : threadPool_->get()) {
func();
}
Expand Down
3 changes: 2 additions & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
#include <stdio.h>

#include <decoder.hpp>
#include <signal_handler.hpp>

static bool stop = false;
volatile bool stop = false;

void sigint_handler(int s) {
(void)s;
Expand Down
8 changes: 7 additions & 1 deletion src/streaming_ordered_output_thread_pool_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ StreamingOrderedOutputThreadPoolExecutor<ReturnType>::StreamingOrderedOutputThre
}
}

template <typename ReturnType>
StreamingOrderedOutputThreadPoolExecutor<ReturnType>::~StreamingOrderedOutputThreadPoolExecutor() {
for (auto& t : workers)
t.join();
}

template <typename ReturnType> void StreamingOrderedOutputThreadPoolExecutor<ReturnType>::worker() {
while (true) {
while (!stop) {
std::optional<std::pair<uint64_t, std::function<ReturnType()>>> work{};

{
Expand Down

0 comments on commit 31946c5

Please sign in to comment.