Skip to content

Commit

Permalink
rebuild fragmentation with state machine. add metrics for fragmentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
marenz2569 committed Jun 15, 2024
1 parent a76dd8a commit d6bfd6e
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 99 deletions.
12 changes: 7 additions & 5 deletions include/l2/upper_mac.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "prometheus.h"
#include "reporter.hpp"
#include "streaming_ordered_output_thread_pool_executor.hpp"
#include <memory>
#include <thread>

/// The class to provide prometheus metrics to the upper mac.
Expand Down Expand Up @@ -215,20 +216,21 @@ class UpperMac {

/// process Upper MAC packets and perform fragment reconstruction and pass it to the upper layers
/// \param packets the packets that were parsed in the upper MAC layer
/// \param stealling_channel_fragmentation the fragmentation reconstructor fragmenting over two stealing channel in
/// the same burst
auto processPackets(UpperMacPackets&& packets,
std::optional<UpperMacFragmentation>& stealling_channel_fragmentation) -> void;
auto processPackets(UpperMacPackets&& packets) -> void;

/// The input queue
std::shared_ptr<StreamingOrderedOutputThreadPoolExecutor<LowerMac::return_type>> input_queue_;

/// The prometheus metrics
std::unique_ptr<UpperMacPrometheusCounters> metrics_;

/// The prometheus metrics for the fragmentation
std::shared_ptr<UpperMacFragmentsPrometheusCounters> fragmentation_metrics_continous_;
std::shared_ptr<UpperMacFragmentsPrometheusCounters> fragmentation_metrics_stealing_channel_;

std::unique_ptr<LogicalLinkControl> logical_link_control_;

UpperMacFragmentation fragmentation_;
std::unique_ptr<UpperMacFragmentation> fragmentation_;

/// The worker thread
std::thread worker_thread_;
Expand Down
204 changes: 132 additions & 72 deletions include/l2/upper_mac_fragments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,146 @@
#pragma once

#include "l2/upper_mac_packet.hpp"
#include "prometheus.h"
#include <cassert>
#include <memory>
#include <optional>
#include <set>
#include <stdexcept>
#include <vector>

/// hold the fragments of fragmented messages
struct UpperMacFragments {
/// the start fragment
std::optional<UpperMacCPlaneSignallingPacket> start_fragment_;
/// the optional continuation fragments
std::vector<UpperMacCPlaneSignallingPacket> continuation_fragments_;
/// the end fragment
std::optional<UpperMacCPlaneSignallingPacket> end_fragment_;
class UpperMacFragmentsPrometheusCounters {
private:
/// The prometheus exporter
std::shared_ptr<PrometheusExporter> prometheus_exporter_;

// NOLINTBEGIN(cppcoreguidelines-avoid-const-or-ref-data-members)

/// The family of counters for received fragments
prometheus::Family<prometheus::Counter>& fragment_count_family_;
/// The counter for total received fragments
prometheus::Counter& fragment_count_total_;
/// The counter for received fragments that could not be reassembled
prometheus::Counter& fragment_count_error_;

// NOLINTEND(cppcoreguidelines-avoid-const-or-ref-data-members)

public:
UpperMacFragmentsPrometheusCounters() = delete;
explicit UpperMacFragmentsPrometheusCounters(const std::shared_ptr<PrometheusExporter>& prometheus_exporter,
const std::string& type)
: prometheus_exporter_(prometheus_exporter)
, fragment_count_family_(prometheus_exporter_->upper_mac_fragment_count())
, fragment_count_total_(fragment_count_family_.Add({{"type", type}, {"counter_type", "All"}}))
, fragment_count_error_(
fragment_count_family_.Add({{"type", type}, {"counter_type", "Reconstuction Error"}})){};

/// This function is called for every fragment where no fitting previous fragment could be found.
auto increment_fragment_reconstruction_error() -> void { fragment_count_error_.Increment(); }
/// This function is called for every fragment.
auto increment_fragment_count() -> void { fragment_count_total_.Increment(); }
};

/// Class that provides the fragment reconstruction for uplink and downlink packets.
/// TODO: Uplink fragmentation may include reserved slots and is therefore harder to reconstruct. This is not handled
/// with this class.
/// TODO: Fragmentation over two slots of a stealing channel is also not handled.
class UpperMacFragmentation {
private:
/// the fragments for the downlink
UpperMacFragments downlink_fragments_;
/// the fragments for the uplink
UpperMacFragments uplink_fragments_;
/// Holds the internal state of the fragment rebuilder
enum class State {
kStart,
kStartFragmentReceived,
kContinuationFragmentReceived,
kEndFragmentReceived,
};
/// The vector that holds the accumulated fragments
std::vector<UpperMacCPlaneSignallingPacket> fragments_;
/// Are continuation allowed in the state machine?
std::map<State, std::set<State>> allowed_state_changes_;
/// The current state of the fragment reassembler
State state_;

/// the metrics for the fragmentation
std::shared_ptr<UpperMacFragmentsPrometheusCounters> metrics_;

/// Try the state transtition with a fragment. Increment the error metrics if there is an invalid state transition
/// attempted
/// \param new_state the new state into which the state machine would be transfered with this fragment
/// \param fragment the control plane signalling packet that is fragmented
/// \return an optional reconstructed control plane signalling packet when reconstuction was successful
auto change_state(State new_state, const UpperMacCPlaneSignallingPacket& fragment)
-> std::optional<UpperMacCPlaneSignallingPacket> {
const auto& valid_state_changes = allowed_state_changes_[state_];

// increment the total fragment counters
if (metrics_) {
metrics_->increment_fragment_count();
}

if (valid_state_changes.count(new_state)) {
// valid state change. perform and add fragment
fragments_.emplace_back(fragment);
state_ = new_state;
} else {
// increment the invalid state metrics
if (metrics_) {
metrics_->increment_fragment_reconstruction_error();
}

// always save the start segment
if (new_state == State::kStartFragmentReceived) {
fragments_ = {fragment};
state_ = State::kStartFragmentReceived;
} else {
fragments_.clear();
state_ = State::kStart;
}
}

// if we are in the end state reassmeble the packet.
if (state_ == State::kEndFragmentReceived) {
std::optional<UpperMacCPlaneSignallingPacket> packet;
for (const auto& fragment : fragments_) {
if (packet) {
packet->tm_sdu_->append(*fragment.tm_sdu_);
} else {
packet = fragment;
}
}
fragments_.clear();
state_ = State::kStart;

return packet;
}

return std::nullopt;
};

public:
UpperMacFragmentation() = default;
UpperMacFragmentation() = delete;

/// Constructor for the fragmentations. Optionally specify if an arbitraty numner of continuation fragments are
/// allowed
explicit UpperMacFragmentation(const std::shared_ptr<UpperMacFragmentsPrometheusCounters>& metrics,
bool continuation_fragments_allowed = true)
: state_(State::kStart)
, metrics_(metrics) {
if (continuation_fragments_allowed) {
allowed_state_changes_ = {
{State::kStart, {State::kStartFragmentReceived}},
{State::kStartFragmentReceived, {State::kContinuationFragmentReceived, State::kEndFragmentReceived}},
{State::kContinuationFragmentReceived,
{State::kContinuationFragmentReceived, State::kEndFragmentReceived}},
{State::kEndFragmentReceived, {State::kStart}}};
} else {
allowed_state_changes_ = {{State::kStart, {State::kStartFragmentReceived}},
{State::kStartFragmentReceived, {State::kEndFragmentReceived}},
{State::kEndFragmentReceived, {State::kStart}}};
}
};

/// Check if we are in the start state i.e., do no have any fragments.
auto is_in_start_state() -> bool { return state_ == State::kStart; }

/// Push a fragment for reconstruction.
/// \param fragment the control plane signalling packet that is fragmented
Expand All @@ -46,80 +158,28 @@ class UpperMacFragmentation {
switch (fragment.type_) {
case MacPacketType::kMacResource:
assert(fragment.fragmentation_);
downlink_fragments_ = UpperMacFragments{.start_fragment_ = fragment};
break;
return change_state(State::kStartFragmentReceived, fragment);
case MacPacketType::kMacFragmentDownlink:
if (downlink_fragments_.start_fragment_) {
downlink_fragments_.continuation_fragments_.push_back(fragment);
}
break;
return change_state(State::kContinuationFragmentReceived, fragment);
case MacPacketType::kMacEndDownlink:
if (downlink_fragments_.start_fragment_) {
downlink_fragments_.end_fragment_ = fragment;
}
break;
return change_state(State::kEndFragmentReceived, fragment);
case MacPacketType::kMacDBlck:
throw std::runtime_error("No fragmentation in MacDBlck");
case MacPacketType::kMacBroadcast:
throw std::runtime_error("No fragmentation in MacBroadcast");
case MacPacketType::kMacAccess:
case MacPacketType::kMacData:
assert(fragment.fragmentation_);
uplink_fragments_ = UpperMacFragments{.start_fragment_ = fragment};
break;
return change_state(State::kStartFragmentReceived, fragment);
case MacPacketType::kMacFragmentUplink:
if (uplink_fragments_.start_fragment_) {
uplink_fragments_.continuation_fragments_.push_back(fragment);
}
break;
return change_state(State::kContinuationFragmentReceived, fragment);
case MacPacketType::kMacEndHu:
case MacPacketType::kMacEndUplink:
if (uplink_fragments_.start_fragment_) {
uplink_fragments_.end_fragment_ = fragment;
}
break;
return change_state(State::kEndFragmentReceived, fragment);
case MacPacketType::kMacUBlck:
throw std::runtime_error("No fragmentation in MacUBlck");
case MacPacketType::kMacUSignal:
throw std::runtime_error("No fragmentation in MacUSignal");
}

// forward and clear on MacEndDownlink
if (downlink_fragments_.end_fragment_) {
UpperMacCPlaneSignallingPacket packet = *downlink_fragments_.start_fragment_;

for (const auto& fragment : downlink_fragments_.continuation_fragments_) {
if (fragment.tm_sdu_) {
packet.tm_sdu_->append(*fragment.tm_sdu_);
}
}
if (downlink_fragments_.end_fragment_->tm_sdu_) {
packet.tm_sdu_->append(*downlink_fragments_.end_fragment_->tm_sdu_);
}

downlink_fragments_ = UpperMacFragments{};

return packet;
}

// forward and clear on MacEndHu and MacEndUplink
if (uplink_fragments_.end_fragment_) {
UpperMacCPlaneSignallingPacket packet = *uplink_fragments_.start_fragment_;

for (const auto& fragment : uplink_fragments_.continuation_fragments_) {
if (fragment.tm_sdu_) {
packet.tm_sdu_->append(*fragment.tm_sdu_);
}
}
if (uplink_fragments_.end_fragment_->tm_sdu_) {
packet.tm_sdu_->append(*uplink_fragments_.end_fragment_->tm_sdu_);
}

uplink_fragments_ = UpperMacFragments{};

return packet;
}

return std::nullopt;
};
};
3 changes: 3 additions & 0 deletions include/prometheus.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class PrometheusExporter {

/// The family of counters for all received upper mac packets
auto upper_mac_packet_count() noexcept -> prometheus::Family<prometheus::Counter>&;

/// The family of counters for all received c-plane fragments
auto upper_mac_fragment_count() noexcept -> prometheus::Family<prometheus::Counter>&;
};

#endif // PROMETHEUS_H
49 changes: 27 additions & 22 deletions src/l2/upper_mac.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "l2/lower_mac.hpp"
#include "l2/upper_mac_fragments.hpp"
#include "streaming_ordered_output_thread_pool_executor.hpp"
#include <memory>
#include <optional>
#include <utility>
#include <variant>
Expand All @@ -26,7 +27,12 @@ UpperMac::UpperMac(const std::shared_ptr<StreamingOrderedOutputThreadPoolExecuto
std::make_unique<LogicalLinkControl>(reporter, std::make_shared<MobileLinkEntity>(reporter, is_downlink))) {
if (prometheus_exporter) {
metrics_ = std::make_unique<UpperMacPrometheusCounters>(prometheus_exporter);
fragmentation_metrics_continous_ =
std::make_shared<UpperMacFragmentsPrometheusCounters>(prometheus_exporter, "Continous");
fragmentation_metrics_stealing_channel_ =
std::make_shared<UpperMacFragmentsPrometheusCounters>(prometheus_exporter, "Stealing Channel");
}
fragmentation_ = std::make_unique<UpperMacFragmentation>(fragmentation_metrics_continous_);
worker_thread_ = std::thread(&UpperMac::worker, this);

#if defined(__linux__)
Expand All @@ -49,15 +55,13 @@ void UpperMac::worker() {
if (slots) {
this->process(*slots);
}
// TODO: Implement a way to send a stop signal until termination
}
}

auto UpperMac::process(const Slots& slots) -> void {
const auto concreate_slots = slots.get_concreate_slots();

/// Use this fragmentation reconstructor for the two stealing channels
std::optional<UpperMacFragmentation> stealing_channel_fragmentation;
UpperMacPackets packets;
for (const auto& slot : concreate_slots) {
UpperMacPackets packets;

Expand All @@ -67,7 +71,7 @@ auto UpperMac::process(const Slots& slots) -> void {
}

try {
packets = UpperMacPacketBuilder::parse_slot(slot);
packets.merge(UpperMacPacketBuilder::parse_slot(slot));
// if (packets.has_user_or_control_plane_data()) {
// std::cout << packets << std::endl;
// }
Expand All @@ -76,32 +80,28 @@ auto UpperMac::process(const Slots& slots) -> void {
metrics_->increment_decode_error(slot);
}

return;
continue;
}
}

try {
processPackets(std::move(packets), stealing_channel_fragmentation);
} catch (std::runtime_error& e) {
if (metrics_) {
metrics_->increment_decode_error(slot);
}
}
try {
processPackets(std::move(packets));
} catch (std::runtime_error& e) {
// TODO: increment other metrics
}
}

auto UpperMac::processPackets(UpperMacPackets&& packets,
std::optional<UpperMacFragmentation>& stealling_channel_fragmentation) -> void {
auto UpperMac::processPackets(UpperMacPackets&& packets) -> void {
// the fragmentation reconstructor for over two stealing channel in the same burst
auto& fragmentation = *fragmentation_;
auto stealling_channel_fragmentation =
UpperMacFragmentation(fragmentation_metrics_stealing_channel_, /*continuation_fragments_allowed=*/false);

for (const auto& packet : packets.c_plane_signalling_packets_) {
if (packet.is_downlink_fragment() || packet.is_uplink_fragment()) {
/// populate the fragmenter for stealing channel
if (packet.fragmentation_on_stealling_channel_ && !stealling_channel_fragmentation) {
stealling_channel_fragmentation = UpperMacFragmentation{};
}

// select the correct fragment reconstructor
auto& fragmentation = fragmentation_;
if (stealling_channel_fragmentation) {
fragmentation = *stealling_channel_fragmentation;
if (packet.fragmentation_on_stealling_channel_) {
fragmentation = stealling_channel_fragmentation;
}

auto reconstructed_fragment = fragmentation.push_fragment(packet);
Expand All @@ -115,6 +115,11 @@ auto UpperMac::processPackets(UpperMacPackets&& packets,
}
}

/// increment the reconstruction error counter if we could not complete the fragmentation over stealing channel
if (!stealling_channel_fragmentation.is_in_start_state() && fragmentation_metrics_stealing_channel_) {
fragmentation_metrics_stealing_channel_->increment_fragment_reconstruction_error();
}

/// increment the packet counter
if (metrics_) {
metrics_->increment_packet_counters(packets);
Expand Down
Loading

0 comments on commit d6bfd6e

Please sign in to comment.