Skip to content

Commit

Permalink
implement borzoi sender thread
Browse files Browse the repository at this point in the history
  • Loading branch information
marenz2569 committed Jul 2, 2024
1 parent 5863c98 commit a40daf3
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 29 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ add_executable(tetra-decoder
src/main.cpp
src/decoder.cpp
src/bit_stream_decoder.cpp
src/borzoi_sender.cpp
src/iq_stream_decoder.cpp
src/prometheus.cpp
src/l2/access_assignment_channel.cpp
Expand Down
43 changes: 43 additions & 0 deletions include/borzoi_sender.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (C) 2022-2024 Transit Live Mapping Solutions
* All rights reserved.
*
* Authors:
* Marenz Schmidl
*/

#pragma once

#include "l2/logical_link_control_packet.hpp"
#include "l2/slot.hpp"
#include "thread_safe_fifo.hpp"
#include <atomic>
#include <thread>
#include <variant>

class BorzoiSender {
public:
BorzoiSender() = delete;

/// This class sends the HTTPS Post requests to borzoi. https://github.com/tlm-solutions/borzoi
/// \param queue the queue holds either the parsed packets (std::unique_ptr<LogicalLinkControlPacket>) or Slots that
/// failed to decode
/// \param termination_flag this flag is set when the sender should terminate after finishing all work
BorzoiSender(ThreadSafeFifo<std::variant<std::unique_ptr<LogicalLinkControlPacket>, Slots>>& queue,
std::atomic_bool& termination_flag, unsigned borzoi_port);

~BorzoiSender();

private:
/// The thread function for continously process incomming parsed packets or failed slots.
auto worker() -> void;

/// The input queue
ThreadSafeFifo<std::variant<std::unique_ptr<LogicalLinkControlPacket>, Slots>>& queue_;

/// The flag that is set when terminating the program
std::atomic_bool& termination_flag_;

/// The worker thread
std::thread worker_thread_;
};
10 changes: 10 additions & 0 deletions include/decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
#pragma once

#include "bit_stream_decoder.hpp"
#include "borzoi_sender.hpp"
#include "iq_stream_decoder.hpp"
#include "l2/lower_mac.hpp"
#include "l2/upper_mac.hpp"
#include "thread_safe_fifo.hpp"
#include <atomic>
#include <memory>
#include <optional>
Expand Down Expand Up @@ -54,12 +56,20 @@ class Decoder {
/// This flag is passed for the StreamingOrderedOutputThreadPoolExecutor to the upper mac.
std::atomic_bool upper_mac_termination_flag_ = false;

/// This flag is passed from the upper mac to the borzoi sender.
std::atomic_bool borzoi_sender_termination_flag_ = false;
/// This queue is used to pass data from the upper mac to the borzoi sender.
ThreadSafeFifo<std::variant<std::unique_ptr<LogicalLinkControlPacket>, Slots>> bozoi_queue_;

/// The worker queue for the lower mac
std::shared_ptr<StreamingOrderedOutputThreadPoolExecutor<LowerMac::return_type>> lower_mac_work_queue_;

/// The reference to the upper mac thread class
std::unique_ptr<UpperMac> upper_mac_;

/// The reference to the borzoi sender thread class
std::unique_ptr<BorzoiSender> borzoi_sender_;

std::shared_ptr<BitStreamDecoder> bit_stream_decoder_;
std::unique_ptr<IQStreamDecoder> iq_stream_decoder_;

Expand Down
14 changes: 12 additions & 2 deletions include/l2/upper_mac.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "l2/upper_mac_packet_builder.hpp"
#include "prometheus.h"
#include "streaming_ordered_output_thread_pool_executor.hpp"
#include "thread_safe_fifo.hpp"
#include <atomic>
#include <memory>
#include <thread>
Expand All @@ -25,13 +26,17 @@ class UpperMac {
public:
UpperMac() = delete;
///
/// \param queue the input queue from the lower mac
/// \param input_queue the input queue from the lower mac
/// \param output_queue the queue where successfully parsed packets or failed slots are inserted
/// \param termination_flag the flag that indicates that the worker thread should stop execution after all work is
/// finished
/// \param output_termination_flag this flag is set when all work is finished and pushed into the queue
/// \param prometheus_exporter the reference to the prometheus exporter that is used for the metrics in the upper
/// mac
UpperMac(const std::shared_ptr<StreamingOrderedOutputThreadPoolExecutor<LowerMac::return_type>>& input_queue,
std::atomic_bool& termination_flag, const std::shared_ptr<PrometheusExporter>& prometheus_exporter);
ThreadSafeFifo<std::variant<std::unique_ptr<LogicalLinkControlPacket>, Slots>>& output_queue,
std::atomic_bool& termination_flag, std::atomic_bool& output_termination_flag,
const std::shared_ptr<PrometheusExporter>& prometheus_exporter);
~UpperMac();

private:
Expand All @@ -51,6 +56,11 @@ class UpperMac {
std::shared_ptr<StreamingOrderedOutputThreadPoolExecutor<LowerMac::return_type>> input_queue_;
/// The termination flag
std::atomic_bool& termination_flag_;
/// the termination flag on the input for the next stage
std::atomic_bool& output_termination_flag_;

/// The output queue
ThreadSafeFifo<std::variant<std::unique_ptr<LogicalLinkControlPacket>, Slots>>& output_queue_;

/// The prometheus metrics
std::unique_ptr<UpperMacMetrics> metrics_;
Expand Down
62 changes: 62 additions & 0 deletions include/thread_safe_fifo.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (C) 2024 Transit Live Mapping Solutions
* All rights reserved.
*
* Authors:
* Marenz Schmidl
*/

#pragma once

#include <deque>
#include <mutex>
#include <optional>
#include <utility>

template <typename T> class ThreadSafeFifo {
public:
using OptionalT = std::optional<T>;

ThreadSafeFifo() = default;
~ThreadSafeFifo() = default;

ThreadSafeFifo(const ThreadSafeFifo&) = delete;
auto operator=(const ThreadSafeFifo&) -> ThreadSafeFifo& = delete;

ThreadSafeFifo(ThreadSafeFifo&&) = delete;
auto operator=(ThreadSafeFifo&&) -> ThreadSafeFifo& = delete;

// get a finished item of a nullopt
auto get_or_null() -> OptionalT {
using namespace std::chrono_literals;

OptionalT result;

{
std::lock_guard<std::mutex> lk(mutex_);
if (!queue_.empty()) {
result = std::forward<T>(queue_.front());
queue_.pop_front();
}
}

return result;
};

auto empty() -> bool {
std::lock_guard<std::mutex> lk(mutex_);
return queue_.empty();
};

auto push_back(T&& element) -> void {
std::lock_guard<std::mutex> lk(mutex_);
queue_.push_back(std::forward<T>(element));
};

private:
/// the mutex that is used to access the queue.
std::mutex mutex_;

/// the wrapped queue
std::deque<T> queue_;
};
73 changes: 73 additions & 0 deletions src/borzoi_sender.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (C) 2022-2024 Transit Live Mapping Solutions
* All rights reserved.
*
* Authors:
* Marenz Schmidl
*/

#include "borzoi_sender.hpp"
#include "l3/circuit_mode_control_entity_packet.hpp"
#include "l3/mobile_link_entity_packet.hpp"
#include "l3/short_data_service_packet.hpp"

#if defined(__linux__)
#include <pthread.h>
#endif

BorzoiSender::BorzoiSender(ThreadSafeFifo<std::variant<std::unique_ptr<LogicalLinkControlPacket>, Slots>>& queue,
std::atomic_bool& termination_flag, unsigned borzoi_port)
: queue_(queue)
, termination_flag_(termination_flag) {
worker_thread_ = std::thread(&BorzoiSender::worker, this);

#if defined(__linux__)
auto handle = worker_thread_.native_handle();
pthread_setname_np(handle, "BorzoiSender");
#endif
}

BorzoiSender::~BorzoiSender() { worker_thread_.join(); }

void BorzoiSender::worker() {
for (;;) {
const auto return_value = queue_.get_or_null();

if (!return_value) {
if (termination_flag_.load() && queue_.empty()) {
break;
}

continue;
}

std::visit(
[](auto&& arg) {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, std::unique_ptr<LogicalLinkControlPacket>>) {
/// process the parsed packet
if (auto* llc = dynamic_cast<LogicalLinkControlPacket*>(arg.get())) {
if (llc->basic_link_information_ &&
(llc->basic_link_information_->basic_link_type_ == BasicLinkType::kBlAckWithoutFcs ||
llc->basic_link_information_->basic_link_type_ == BasicLinkType::kBlAckWithFcs)) {
return;
}
std::cout << *llc;
if (auto* mle = dynamic_cast<MobileLinkEntityPacket*>(llc)) {
std::cout << *mle;
if (auto* cmce = dynamic_cast<CircuitModeControlEntityPacket*>(llc)) {
std::cout << *cmce;
if (auto* sds = dynamic_cast<ShortDataServicePacket*>(llc)) {
std::cout << *sds;
}
}
std::cout << std::endl;
}
}
} else if constexpr (std::is_same_v<T, Slots>) {
/// send out the slots which had an error while parsing
}
},
*return_value);
}
}
5 changes: 4 additions & 1 deletion src/decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/

#include "decoder.hpp"
#include "borzoi_sender.hpp"
#include "l2/upper_mac.hpp"
#include <arpa/inet.h>
#include <cassert>
Expand All @@ -34,7 +35,9 @@ Decoder::Decoder(unsigned receive_port, unsigned send_port, bool packed, std::op
, iq_or_bit_stream_(iq_or_bit_stream) {
auto is_uplink = uplink_scrambling_code_.has_value();
auto lower_mac = std::make_shared<LowerMac>(prometheus_exporter, uplink_scrambling_code);
upper_mac_ = std::make_unique<UpperMac>(lower_mac_work_queue_, upper_mac_termination_flag_, prometheus_exporter);
upper_mac_ = std::make_unique<UpperMac>(lower_mac_work_queue_, bozoi_queue_, upper_mac_termination_flag_,
borzoi_sender_termination_flag_, prometheus_exporter);
borzoi_sender_ = std::make_unique<BorzoiSender>(bozoi_queue_, borzoi_sender_termination_flag_, send_port);
bit_stream_decoder_ =
std::make_shared<BitStreamDecoder>(lower_mac_work_queue_, lower_mac, uplink_scrambling_code_.has_value());
iq_stream_decoder_ =
Expand Down
39 changes: 13 additions & 26 deletions src/l2/upper_mac.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
#endif

UpperMac::UpperMac(const std::shared_ptr<StreamingOrderedOutputThreadPoolExecutor<LowerMac::return_type>>& input_queue,
std::atomic_bool& termination_flag, const std::shared_ptr<PrometheusExporter>& prometheus_exporter)
ThreadSafeFifo<std::variant<std::unique_ptr<LogicalLinkControlPacket>, Slots>>& output_queue,
std::atomic_bool& termination_flag, std::atomic_bool& output_termination_flag,
const std::shared_ptr<PrometheusExporter>& prometheus_exporter)
: input_queue_(input_queue)
, termination_flag_(termination_flag)
, output_termination_flag_(output_termination_flag)
, output_queue_(output_queue)
, logical_link_control_(prometheus_exporter) {
if (prometheus_exporter) {
metrics_ = std::make_unique<UpperMacMetrics>(prometheus_exporter);
Expand Down Expand Up @@ -52,11 +56,14 @@ void UpperMac::worker() {
continue;
}

const auto& slots = *return_value;
auto slots = *return_value;
if (slots) {
this->process(*slots);
}
}

// forward the termination to the next stage
output_termination_flag_.store(true);
}

auto UpperMac::process(const Slots& slots) -> void {
Expand Down Expand Up @@ -92,6 +99,8 @@ auto UpperMac::process(const Slots& slots) -> void {
for (const auto& slot : concreate_slots) {
metrics_->increment_decode_error(slot);
}
// send the broken slot to borzoi
output_queue_.push_back(slots);
}
}
}
Expand Down Expand Up @@ -136,30 +145,8 @@ auto UpperMac::processPackets(UpperMacPackets&& packets) -> void {
}

for (const auto& packet : c_plane_packets) {
auto parsed_packet = logical_link_control_.parse(packet);

if (auto* llc = dynamic_cast<LogicalLinkControlPacket*>(parsed_packet.get())) {
if (llc->basic_link_information_ &&
(llc->basic_link_information_->basic_link_type_ == BasicLinkType::kBlAckWithoutFcs ||
llc->basic_link_information_->basic_link_type_ == BasicLinkType::kBlAckWithFcs)) {
continue;
}
std::cout << *llc;
if (auto* mle = dynamic_cast<MobileLinkEntityPacket*>(llc)) {
std::cout << *mle;
if (auto* cmce = dynamic_cast<CircuitModeControlEntityPacket*>(llc)) {
std::cout << *cmce;
if (auto* sds = dynamic_cast<ShortDataServicePacket*>(llc)) {
std::cout << *sds;
}
}
std::cout << std::endl;
}
}
auto llc = logical_link_control_.parse(packet);

// if (!parsed_packet->is_null_pdu()) {
// std::cout << *parsed_packet << std::endl;
// }
/// TODO: send this packet to borzoi
output_queue_.push_back(std::move(llc));
}
}

0 comments on commit a40daf3

Please sign in to comment.