Skip to content

Commit

Permalink
Buffered packet processing (#97)
Browse files Browse the repository at this point in the history
* Simplify lidar scan assembly code

As double-buffering is used for the scan assemnly (in contrast to the ouster example)
we can simply zero out our buffer for the new frame. We then no longer need to
special-case zeroing-out missing packets.

* Initialize FullRotationAccumulator with batchReady == false

* Implement buffered packet processing

This commit splits the processing of lidar data into two seperate
threads. Previously, the code receiving sensor data from the wire
was blocked by the lidar or imu processors, causing dataloss on
slower systems (Issue #96).

* Modifies sensor interface so receiving functions get passed a
  pointer to a buffer to write their data to
* Implements ringbuffers for lidar and imu data in ouster_driver
* Splits processing into two seperate threads:
  * receiveData receives from the wire and fills ringbuffers
  * processData takes packets from the queue, assembles the lidar
    scan and runs the processors
* Threads are implemented as pure threads instead of ros timers,
  as timing inconsistencies for the receiver thread were also
  leading to data loss

* Revert "Simplify lidar scan assembly code"

This reverts commit 581b35f.
This modification to the client code is not necessary for the
buffered packet processing, and conflicts with changes since
made in the ouster_examples repository.

* Lock against race on [lidar|imu]_packet_head

Moves the modification of the head indices to before the mutex
unlock. This is only necessary for the warning about buffer
overruns to work correctly. We don't need to lock against the
actual data in the buffer.

* Move ringbuffer code to it's own header

* Fix linting issues

Co-authored-by: Mirko Kugelmeier <[email protected]>
  • Loading branch information
Mirko K and Qgel authored Jul 8, 2022
1 parent 370269b commit 41a43e2
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class FullRotationAccumulator
FullRotationAccumulator(
const ouster::sensor::sensor_info & mdata,
const ouster::sensor::packet_format & pf)
: _pf(pf)
: _batchReady(false), _pf(pf), _packets_accumulated(0)
{
_batch = std::make_unique<ouster::ScanBatcher>(mdata.format.columns_per_frame, _pf);
_ls = std::make_shared<ouster::LidarScan>(
Expand Down
10 changes: 6 additions & 4 deletions ros2_ouster/include/ros2_ouster/interfaces/sensor_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,18 @@ class SensorInterface
/**
* @brief reading a lidar packet
* @param state of the sensor
* @return the packet of data
* @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().lidar_packet_size bytes.
* @return true if a packet was recieved, false otherwise
*/
virtual uint8_t * readLidarPacket(const ouster::sensor::client_state & state) = 0;
virtual bool readLidarPacket(const ouster::sensor::client_state & state, uint8_t * buf) = 0;

/**
* @brief reading an imu packet
* @param state of the sensor
* @return the packet of data
* @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().imu_packet_size bytes.
* @return true if a packet was recieved, false otherwise
*/
virtual uint8_t * readImuPacket(const ouster::sensor::client_state & state) = 0;
virtual bool readImuPacket(const ouster::sensor::client_state & state, uint8_t * buf) = 0;

/**
* @brief Get lidar sensor's metadata
Expand Down
24 changes: 19 additions & 5 deletions ros2_ouster/include/ros2_ouster/ouster_driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "ros2_ouster/interfaces/data_processor_interface.hpp"
#include "ros2_ouster/full_rotation_accumulator.hpp"

#include "ros2_ouster/ringbuffer.hpp"

namespace ros2_ouster
{

Expand Down Expand Up @@ -104,9 +106,9 @@ class OusterDriver : public lifecycle_interface::LifecycleInterface

private:
/**
* @brief Timer callback to process the UDP socket
* @brief Thread function to process data from the UDP socket
*/
void processData();
void receiveData();

/**
* @brief Create TF2 frames for the lidar sensor
Expand Down Expand Up @@ -135,13 +137,17 @@ class OusterDriver : public lifecycle_interface::LifecycleInterface
const std::shared_ptr<ouster_msgs::srv::GetMetadata::Request> request,
std::shared_ptr<ouster_msgs::srv::GetMetadata::Response> response);

/**
* @brief Thread function to process buffered packets that have been received from the sensor
*/
void processData();

rclcpp::Service<std_srvs::srv::Empty>::SharedPtr _reset_srv;
rclcpp::Service<ouster_msgs::srv::GetMetadata>::SharedPtr _metadata_srv;

std::unique_ptr<SensorInterface> _sensor;
std::multimap<ouster::sensor::client_state,
std::unique_ptr<ros2_ouster::DataProcessorInterface>> _data_processors;
rclcpp::TimerBase::SharedPtr _process_timer;

std::shared_ptr<sensor::FullRotationAccumulator> _full_rotation_accumulator;

Expand All @@ -153,8 +159,16 @@ class OusterDriver : public lifecycle_interface::LifecycleInterface

std::uint32_t _proc_mask;

uint8_t * _lidar_packet_data;
uint8_t * _imu_packet_data;
// Ringbuffers for raw received lidar and imu packets
std::unique_ptr<RingBuffer> _lidar_packet_buf;
std::unique_ptr<RingBuffer> _imu_packet_buf;

// Threads and synchronization primitives for receiving and processing data
std::thread _recv_thread;
std::thread _process_thread;
std::condition_variable _process_cond;
std::mutex _ringbuffer_mutex;
bool _processing_active;
};

} // namespace ros2_ouster
Expand Down
114 changes: 114 additions & 0 deletions ros2_ouster/include/ros2_ouster/ringbuffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2022, Mirko Kugelmeier
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROS2_OUSTER__RINGBUFFER_HPP_
#define ROS2_OUSTER__RINGBUFFER_HPP_

#include <atomic>
#include <memory>

namespace ros2_ouster
{

/**
* @class ros2_ouster::RingBuffer
* @brief A simple circular buffer implementation used for queued processing of incoming lidar / imu data
*/
class RingBuffer
{
public:
/**
* Create a new RingBuffer instance
* @param element_size The size (in bytes) of each element
* @param num_elements Number of maximum elements in the buffer.
*/
RingBuffer(std::size_t element_size, std::size_t num_elements)
: _element_size(element_size),
_num_elements(num_elements),
_head(0),
_tail(0),
_buf(new uint8_t[element_size * _num_elements])
{}

/**
* @brief Check whether there is any data to be read from the buffer at head()
* @return true if there are no elements in the buffer
*/
bool empty()
{
return _head == _tail;
}

/**
* @brief Check if the ringbuffer is full
* If the buffer is full, adding new data to it will overwrite or corrupt data at the head() position.
* This function is only safe to call in the producing thread, i.e. no calls to push() may occur simultaneously
* @return true if the buffer is full
*/
bool full()
{
// Make sure we have a consistent value for the head index here.
// This makes sure we do not return 'false positives' when the buffer is not actually
// full due to a race on _head between the conditions.
bool head = _head;

// This ringbuffer implementation lets both head and tail loop twice around the actual number of
// elements in the buffer to encode the difference between the empty() and full() states. The
// buffer is full if head and tail refer to the same element but on different 'iterations'
// of the loop
return ((_tail * 2) % _num_elements) == head && head != _tail;
}

/// @return A pointer to the current element to read from
uint8_t * head()
{
return &_buf[(_head % _num_elements) * _element_size];
}

/// @return A pointer to the current element to write to
uint8_t * tail()
{
return &_buf[(_tail % _num_elements) * _element_size];
}

/// @brief Remove the current head element from the queue
void pop()
{
_head = (_head + 1) % (_num_elements * 2);
}

/// @brief Add a new element (with data already filled at the location pointed to by tail())
/// to the buffer
void push()
{
_tail = (_tail + 1) % (_num_elements * 2);
}

protected:
// The size of each individual element in bytes and the max number of elements in the buffer
const std::size_t _element_size;
const std::size_t _num_elements;

// Since the ringbuffer is used across threads that produce / consume the data, we use
// std::atomic for head and tail to prevent re-ordering of assignments to these variables
std::atomic<std::size_t> _head;
std::atomic<std::size_t> _tail;

// Defining the backing buffer as a unique_ptr gives us our dtor and correct copy/move
// semantics for free
std::unique_ptr<uint8_t[]> _buf;
};

} // namespace ros2_ouster

#endif // ROS2_OUSTER__RINGBUFFER_HPP_
12 changes: 6 additions & 6 deletions ros2_ouster/include/ros2_ouster/sensor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,18 @@ class Sensor : public ros2_ouster::SensorInterface
/**
* @brief reading a lidar packet
* @param state of the sensor
* @return the packet of data
* @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().lidar_packet_size bytes.
* @return true if a packet was recieved, false otherwise
*/
uint8_t * readLidarPacket(const ouster::sensor::client_state & state) override;
bool readLidarPacket(const ouster::sensor::client_state & state, uint8_t * buf) override;

/**
* @brief reading an imu packet
* @param state of the sensor
* @return the packet of data
* @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().imu_packet_size bytes.
* @return true if a packet was recieved, false otherwise
*/
uint8_t * readImuPacket(const ouster::sensor::client_state & state) override;
bool readImuPacket(const ouster::sensor::client_state & state, uint8_t * buf) override;

/**
* @brief Sets the metadata class variable
Expand All @@ -96,8 +98,6 @@ class Sensor : public ros2_ouster::SensorInterface

private:
std::shared_ptr<ouster::sensor::client> _ouster_client;
std::vector<uint8_t> _lidar_packet;
std::vector<uint8_t> _imu_packet;
ros2_ouster::Metadata _metadata{};
};

Expand Down
14 changes: 8 additions & 6 deletions ros2_ouster/include/ros2_ouster/sensor_tins.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,20 @@ class SensorTins : public ros2_ouster::SensorInterface
/**
* @brief reading a lidar packet
* @param state of the sensor
* @return the packet of data
* @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().lidar_packet_size bytes.
* @return true if a packet was recieved, false otherwise
*/
uint8_t * readLidarPacket(
const ouster::sensor::client_state & state) override;
bool readLidarPacket(
const ouster::sensor::client_state & state, uint8_t * buf) override;

/**
* @brief reading an imu packet
* @param state of the sensor
* @return the packet of data
* @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().imu_packet_size bytes.
* @return true if a packet was recieved, false otherwise
*/
uint8_t * readImuPacket(
const ouster::sensor::client_state & state) override;
bool readImuPacket(
const ouster::sensor::client_state & state, uint8_t * buf) override;

/**
* @brief Sets the metadata class variable
Expand Down
Loading

0 comments on commit 41a43e2

Please sign in to comment.