From 41a43e29efa298aef146a2e0f336769111f5284d Mon Sep 17 00:00:00 2001 From: Mirko K Date: Fri, 8 Jul 2022 21:10:33 +0200 Subject: [PATCH] Buffered packet processing (#97) * Simplify lidar scan assembly code As double-buffering is used for the scan assemnly (in contrast to the ouster example) we can simply zero out our buffer for the new frame. We then no longer need to special-case zeroing-out missing packets. * Initialize FullRotationAccumulator with batchReady == false * Implement buffered packet processing This commit splits the processing of lidar data into two seperate threads. Previously, the code receiving sensor data from the wire was blocked by the lidar or imu processors, causing dataloss on slower systems (Issue #96). * Modifies sensor interface so receiving functions get passed a pointer to a buffer to write their data to * Implements ringbuffers for lidar and imu data in ouster_driver * Splits processing into two seperate threads: * receiveData receives from the wire and fills ringbuffers * processData takes packets from the queue, assembles the lidar scan and runs the processors * Threads are implemented as pure threads instead of ros timers, as timing inconsistencies for the receiver thread were also leading to data loss * Revert "Simplify lidar scan assembly code" This reverts commit 581b35f14ef098041f03131deb18094ea92b98df. This modification to the client code is not necessary for the buffered packet processing, and conflicts with changes since made in the ouster_examples repository. * Lock against race on [lidar|imu]_packet_head Moves the modification of the head indices to before the mutex unlock. This is only necessary for the warning about buffer overruns to work correctly. We don't need to lock against the actual data in the buffer. * Move ringbuffer code to it's own header * Fix linting issues Co-authored-by: Mirko Kugelmeier --- .../ros2_ouster/full_rotation_accumulator.hpp | 2 +- .../interfaces/sensor_interface.hpp | 10 +- .../include/ros2_ouster/ouster_driver.hpp | 24 +++- .../include/ros2_ouster/ringbuffer.hpp | 114 +++++++++++++++ ros2_ouster/include/ros2_ouster/sensor.hpp | 12 +- .../include/ros2_ouster/sensor_tins.hpp | 14 +- ros2_ouster/src/ouster_driver.cpp | 131 ++++++++++++------ ros2_ouster/src/sensor.cpp | 20 ++- ros2_ouster/src/sensor_tins.cpp | 14 +- 9 files changed, 261 insertions(+), 80 deletions(-) create mode 100644 ros2_ouster/include/ros2_ouster/ringbuffer.hpp diff --git a/ros2_ouster/include/ros2_ouster/full_rotation_accumulator.hpp b/ros2_ouster/include/ros2_ouster/full_rotation_accumulator.hpp index 9a97bd4..695d99a 100644 --- a/ros2_ouster/include/ros2_ouster/full_rotation_accumulator.hpp +++ b/ros2_ouster/include/ros2_ouster/full_rotation_accumulator.hpp @@ -35,7 +35,7 @@ class FullRotationAccumulator FullRotationAccumulator( const ouster::sensor::sensor_info & mdata, const ouster::sensor::packet_format & pf) - : _pf(pf) + : _batchReady(false), _pf(pf), _packets_accumulated(0) { _batch = std::make_unique(mdata.format.columns_per_frame, _pf); _ls = std::make_shared( diff --git a/ros2_ouster/include/ros2_ouster/interfaces/sensor_interface.hpp b/ros2_ouster/include/ros2_ouster/interfaces/sensor_interface.hpp index 234ecbc..8f9620f 100644 --- a/ros2_ouster/include/ros2_ouster/interfaces/sensor_interface.hpp +++ b/ros2_ouster/include/ros2_ouster/interfaces/sensor_interface.hpp @@ -78,16 +78,18 @@ class SensorInterface /** * @brief reading a lidar packet * @param state of the sensor - * @return the packet of data + * @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().lidar_packet_size bytes. + * @return true if a packet was recieved, false otherwise */ - virtual uint8_t * readLidarPacket(const ouster::sensor::client_state & state) = 0; + virtual bool readLidarPacket(const ouster::sensor::client_state & state, uint8_t * buf) = 0; /** * @brief reading an imu packet * @param state of the sensor - * @return the packet of data + * @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().imu_packet_size bytes. + * @return true if a packet was recieved, false otherwise */ - virtual uint8_t * readImuPacket(const ouster::sensor::client_state & state) = 0; + virtual bool readImuPacket(const ouster::sensor::client_state & state, uint8_t * buf) = 0; /** * @brief Get lidar sensor's metadata diff --git a/ros2_ouster/include/ros2_ouster/ouster_driver.hpp b/ros2_ouster/include/ros2_ouster/ouster_driver.hpp index aa538cf..b57d404 100644 --- a/ros2_ouster/include/ros2_ouster/ouster_driver.hpp +++ b/ros2_ouster/include/ros2_ouster/ouster_driver.hpp @@ -36,6 +36,8 @@ #include "ros2_ouster/interfaces/data_processor_interface.hpp" #include "ros2_ouster/full_rotation_accumulator.hpp" +#include "ros2_ouster/ringbuffer.hpp" + namespace ros2_ouster { @@ -104,9 +106,9 @@ class OusterDriver : public lifecycle_interface::LifecycleInterface private: /** - * @brief Timer callback to process the UDP socket + * @brief Thread function to process data from the UDP socket */ - void processData(); + void receiveData(); /** * @brief Create TF2 frames for the lidar sensor @@ -135,13 +137,17 @@ class OusterDriver : public lifecycle_interface::LifecycleInterface const std::shared_ptr request, std::shared_ptr response); + /** + * @brief Thread function to process buffered packets that have been received from the sensor + */ + void processData(); + rclcpp::Service::SharedPtr _reset_srv; rclcpp::Service::SharedPtr _metadata_srv; std::unique_ptr _sensor; std::multimap> _data_processors; - rclcpp::TimerBase::SharedPtr _process_timer; std::shared_ptr _full_rotation_accumulator; @@ -153,8 +159,16 @@ class OusterDriver : public lifecycle_interface::LifecycleInterface std::uint32_t _proc_mask; - uint8_t * _lidar_packet_data; - uint8_t * _imu_packet_data; + // Ringbuffers for raw received lidar and imu packets + std::unique_ptr _lidar_packet_buf; + std::unique_ptr _imu_packet_buf; + + // Threads and synchronization primitives for receiving and processing data + std::thread _recv_thread; + std::thread _process_thread; + std::condition_variable _process_cond; + std::mutex _ringbuffer_mutex; + bool _processing_active; }; } // namespace ros2_ouster diff --git a/ros2_ouster/include/ros2_ouster/ringbuffer.hpp b/ros2_ouster/include/ros2_ouster/ringbuffer.hpp new file mode 100644 index 0000000..57458f6 --- /dev/null +++ b/ros2_ouster/include/ros2_ouster/ringbuffer.hpp @@ -0,0 +1,114 @@ +// Copyright 2022, Mirko Kugelmeier +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROS2_OUSTER__RINGBUFFER_HPP_ +#define ROS2_OUSTER__RINGBUFFER_HPP_ + +#include +#include + +namespace ros2_ouster +{ + +/** + * @class ros2_ouster::RingBuffer + * @brief A simple circular buffer implementation used for queued processing of incoming lidar / imu data + */ +class RingBuffer +{ +public: + /** + * Create a new RingBuffer instance + * @param element_size The size (in bytes) of each element + * @param num_elements Number of maximum elements in the buffer. + */ + RingBuffer(std::size_t element_size, std::size_t num_elements) + : _element_size(element_size), + _num_elements(num_elements), + _head(0), + _tail(0), + _buf(new uint8_t[element_size * _num_elements]) + {} + + /** + * @brief Check whether there is any data to be read from the buffer at head() + * @return true if there are no elements in the buffer + */ + bool empty() + { + return _head == _tail; + } + + /** + * @brief Check if the ringbuffer is full + * If the buffer is full, adding new data to it will overwrite or corrupt data at the head() position. + * This function is only safe to call in the producing thread, i.e. no calls to push() may occur simultaneously + * @return true if the buffer is full + */ + bool full() + { + // Make sure we have a consistent value for the head index here. + // This makes sure we do not return 'false positives' when the buffer is not actually + // full due to a race on _head between the conditions. + bool head = _head; + + // This ringbuffer implementation lets both head and tail loop twice around the actual number of + // elements in the buffer to encode the difference between the empty() and full() states. The + // buffer is full if head and tail refer to the same element but on different 'iterations' + // of the loop + return ((_tail * 2) % _num_elements) == head && head != _tail; + } + + /// @return A pointer to the current element to read from + uint8_t * head() + { + return &_buf[(_head % _num_elements) * _element_size]; + } + + /// @return A pointer to the current element to write to + uint8_t * tail() + { + return &_buf[(_tail % _num_elements) * _element_size]; + } + + /// @brief Remove the current head element from the queue + void pop() + { + _head = (_head + 1) % (_num_elements * 2); + } + + /// @brief Add a new element (with data already filled at the location pointed to by tail()) + /// to the buffer + void push() + { + _tail = (_tail + 1) % (_num_elements * 2); + } + +protected: + // The size of each individual element in bytes and the max number of elements in the buffer + const std::size_t _element_size; + const std::size_t _num_elements; + + // Since the ringbuffer is used across threads that produce / consume the data, we use + // std::atomic for head and tail to prevent re-ordering of assignments to these variables + std::atomic _head; + std::atomic _tail; + + // Defining the backing buffer as a unique_ptr gives us our dtor and correct copy/move + // semantics for free + std::unique_ptr _buf; +}; + +} // namespace ros2_ouster + +#endif // ROS2_OUSTER__RINGBUFFER_HPP_ diff --git a/ros2_ouster/include/ros2_ouster/sensor.hpp b/ros2_ouster/include/ros2_ouster/sensor.hpp index d503d07..94c1c35 100644 --- a/ros2_ouster/include/ros2_ouster/sensor.hpp +++ b/ros2_ouster/include/ros2_ouster/sensor.hpp @@ -68,16 +68,18 @@ class Sensor : public ros2_ouster::SensorInterface /** * @brief reading a lidar packet * @param state of the sensor - * @return the packet of data + * @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().lidar_packet_size bytes. + * @return true if a packet was recieved, false otherwise */ - uint8_t * readLidarPacket(const ouster::sensor::client_state & state) override; + bool readLidarPacket(const ouster::sensor::client_state & state, uint8_t * buf) override; /** * @brief reading an imu packet * @param state of the sensor - * @return the packet of data + * @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().imu_packet_size bytes. + * @return true if a packet was recieved, false otherwise */ - uint8_t * readImuPacket(const ouster::sensor::client_state & state) override; + bool readImuPacket(const ouster::sensor::client_state & state, uint8_t * buf) override; /** * @brief Sets the metadata class variable @@ -96,8 +98,6 @@ class Sensor : public ros2_ouster::SensorInterface private: std::shared_ptr _ouster_client; - std::vector _lidar_packet; - std::vector _imu_packet; ros2_ouster::Metadata _metadata{}; }; diff --git a/ros2_ouster/include/ros2_ouster/sensor_tins.hpp b/ros2_ouster/include/ros2_ouster/sensor_tins.hpp index 815f176..f5fb6d7 100644 --- a/ros2_ouster/include/ros2_ouster/sensor_tins.hpp +++ b/ros2_ouster/include/ros2_ouster/sensor_tins.hpp @@ -80,18 +80,20 @@ class SensorTins : public ros2_ouster::SensorInterface /** * @brief reading a lidar packet * @param state of the sensor - * @return the packet of data + * @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().lidar_packet_size bytes. + * @return true if a packet was recieved, false otherwise */ - uint8_t * readLidarPacket( - const ouster::sensor::client_state & state) override; + bool readLidarPacket( + const ouster::sensor::client_state & state, uint8_t * buf) override; /** * @brief reading an imu packet * @param state of the sensor - * @return the packet of data + * @param buf pointer to a buffer to hold the packet data. Must hold getPacketFormat().imu_packet_size bytes. + * @return true if a packet was recieved, false otherwise */ - uint8_t * readImuPacket( - const ouster::sensor::client_state & state) override; + bool readImuPacket( + const ouster::sensor::client_state & state, uint8_t * buf) override; /** * @brief Sets the metadata class variable diff --git a/ros2_ouster/src/ouster_driver.cpp b/ros2_ouster/src/ouster_driver.cpp index 6ec9a5a..3fa61a0 100644 --- a/ros2_ouster/src/ouster_driver.cpp +++ b/ros2_ouster/src/ouster_driver.cpp @@ -145,11 +145,14 @@ void OusterDriver::onActivate() it->second->onActivate(); } - // Speed of the lidar is 1280 hz. We fire our timer event at 2x that rate to - // ensure we can process all of the incoming data in a timely manner. - // See: https://github.com/SteveMacenski/ros2_ouster_drivers/issues/55 - _process_timer = this->create_wall_timer( - 390625ns, std::bind(&OusterDriver::processData, this)); + _lidar_packet_buf = std::make_unique( + _sensor->getPacketFormat().lidar_packet_size, 1024); + _imu_packet_buf = std::make_unique( + _sensor->getPacketFormat().imu_packet_size, 1024); + + _processing_active = true; + _process_thread = std::thread(std::bind(&OusterDriver::processData, this)); + _recv_thread = std::thread(std::bind(&OusterDriver::receiveData, this)); } void OusterDriver::onError() @@ -158,8 +161,16 @@ void OusterDriver::onError() void OusterDriver::onDeactivate() { - _process_timer->cancel(); - _process_timer.reset(); + _processing_active = false; + + if (_recv_thread.joinable()) { + _recv_thread.join(); + } + + _process_cond.notify_all(); + if (_process_thread.joinable()) { + _process_thread.join(); + } DataProcessorMapIt it; for (it = _data_processors.begin(); it != _data_processors.end(); ++it) { @@ -177,8 +188,6 @@ void OusterDriver::onCleanup() void OusterDriver::onShutdown() { - _process_timer->cancel(); - _process_timer.reset(); _tf_b.reset(); DataProcessorMapIt it; @@ -202,44 +211,86 @@ void OusterDriver::broadcastStaticTransforms( } } -void OusterDriver::processData() -{ - try { - ouster::sensor::client_state state = _sensor->get(); - _lidar_packet_data = _sensor->readLidarPacket(state); - _imu_packet_data = _sensor->readImuPacket(state); - - RCLCPP_DEBUG( - this->get_logger(), - "Retrieved packet with state: %s", - ros2_ouster::toString(state).c_str()); - - std::pair key_its; - - uint64_t override_ts = - this->_use_ros_time ? this->now().nanoseconds() : 0; - - if (_lidar_packet_data) { - _full_rotation_accumulator->accumulate(_lidar_packet_data, override_ts); - - key_its = _data_processors.equal_range(ouster::sensor::client_state::LIDAR_DATA); +void OusterDriver::processData() { + std::pair key_lidar_its = + _data_processors.equal_range(ouster::sensor::client_state::LIDAR_DATA); + std::pair key_imu_its = + _data_processors.equal_range(ouster::sensor::client_state::IMU_DATA); + + std::unique_lock ringbuffer_guard(_ringbuffer_mutex); + while (_processing_active) { + // Wait for data in either the lidar or imu ringbuffer + _process_cond.wait( + ringbuffer_guard, [this] () { + return (!_lidar_packet_buf->empty() || + !_imu_packet_buf->empty() || + !_processing_active); + }); + ringbuffer_guard.unlock(); + + uint64_t override_ts = this->_use_ros_time ? this->now().nanoseconds() : 0; + + // If we have data in the lidar buffer, process it + if (!_lidar_packet_buf->empty() && _processing_active) { + _full_rotation_accumulator->accumulate(_lidar_packet_buf->head(), override_ts); + for (DataProcessorMapIt it = key_lidar_its.first; it != key_lidar_its.second; it++) { + it->second->process(_lidar_packet_buf->head(), override_ts); + } + _lidar_packet_buf->pop(); + } - for (DataProcessorMapIt it = key_its.first; it != key_its.second; it++) { - it->second->process(_lidar_packet_data, override_ts); + // If we have data in the imu buffer, process it + if (!_imu_packet_buf->empty() && _processing_active) { + for (DataProcessorMapIt it = key_imu_its.first; it != key_imu_its.second; it++) { + it->second->process(_imu_packet_buf->head(), override_ts); } + _imu_packet_buf->pop(); } - if (_imu_packet_data) { - key_its = _data_processors.equal_range(ouster::sensor::client_state::IMU_DATA); + ringbuffer_guard.lock(); + } +} - for (DataProcessorMapIt it = key_its.first; it != key_its.second; it++) { - it->second->process(_imu_packet_data, override_ts); +void OusterDriver::receiveData() +{ + while (_processing_active) { + try { + // Receive raw sensor data from the network. + // This blocks for some time until either data is received or timeout + ouster::sensor::client_state state = _sensor->get(); + bool got_lidar = _sensor->readLidarPacket(state, _lidar_packet_buf->tail()); + bool got_imu = _sensor->readImuPacket(state, _imu_packet_buf->tail()); + + // If we got some data, push to ringbuffer and signal processing thread + if (got_lidar || got_imu) { + if (got_lidar) { + // If the ringbuffer is full, this means the processing thread is running too slow to + // process all frames. Therefore, we push to it anyway, discarding all (old) data + // in the buffer and emit a warning. + if (_lidar_packet_buf->full()) { + RCLCPP_WARN(this->get_logger(), "Lidar buffer overrun!"); + } + _lidar_packet_buf->push(); + } + + if (got_imu) { + if (_imu_packet_buf->full()) { + RCLCPP_WARN(this->get_logger(), "IMU buffer overrun!"); + } + _imu_packet_buf->push(); + } + _process_cond.notify_all(); } + + RCLCPP_DEBUG( + this->get_logger(), + "Retrieved packet with state: %s", + ros2_ouster::toString(state).c_str()); + } catch (const OusterDriverException & e) { + RCLCPP_WARN( + this->get_logger(), + "Failed to process packet with exception %s.", e.what()); } - } catch (const OusterDriverException & e) { - RCLCPP_WARN( - this->get_logger(), - "Failed to process packet with exception %s.", e.what()); } } diff --git a/ros2_ouster/src/sensor.cpp b/ros2_ouster/src/sensor.cpp index 9073b07..c81c2ed 100644 --- a/ros2_ouster/src/sensor.cpp +++ b/ros2_ouster/src/sensor.cpp @@ -28,8 +28,6 @@ Sensor::Sensor() Sensor::~Sensor() { _ouster_client.reset(); - _lidar_packet.clear(); - _imu_packet.clear(); } void Sensor::reset( @@ -90,8 +88,6 @@ void Sensor::configure( } setMetadata(config.lidar_port, config.imu_port, config.timestamp_mode); - _lidar_packet.resize(this->getPacketFormat().lidar_packet_size + 1); - _imu_packet.resize(this->getPacketFormat().imu_packet_size + 1); } ouster::sensor::client_state Sensor::get() @@ -112,28 +108,28 @@ ouster::sensor::client_state Sensor::get() return state; } -uint8_t * Sensor::readLidarPacket(const ouster::sensor::client_state & state) +bool Sensor::readLidarPacket(const ouster::sensor::client_state & state, uint8_t * buf) { if (state & ouster::sensor::client_state::LIDAR_DATA && ouster::sensor::read_lidar_packet( - *_ouster_client, _lidar_packet.data(), + *_ouster_client, buf, this->getPacketFormat())) { - return _lidar_packet.data(); + return true; } - return nullptr; + return false; } -uint8_t * Sensor::readImuPacket(const ouster::sensor::client_state & state) +bool Sensor::readImuPacket(const ouster::sensor::client_state & state, uint8_t * buf) { if (state & ouster::sensor::client_state::IMU_DATA && ouster::sensor::read_imu_packet( - *_ouster_client, _imu_packet.data(), + *_ouster_client, buf, this->getPacketFormat())) { - return _imu_packet.data(); + return true; } - return nullptr; + return false; } void Sensor::setMetadata( diff --git a/ros2_ouster/src/sensor_tins.cpp b/ros2_ouster/src/sensor_tins.cpp index 5430509..1b31bd2 100644 --- a/ros2_ouster/src/sensor_tins.cpp +++ b/ros2_ouster/src/sensor_tins.cpp @@ -128,21 +128,23 @@ ouster::sensor::client_state SensorTins::get() return _inferred_state; } -uint8_t * SensorTins::readLidarPacket(const ouster::sensor::client_state & state) +bool SensorTins::readLidarPacket(const ouster::sensor::client_state & state, uint8_t * buf) { if (state == ouster::sensor::client_state::LIDAR_DATA) { - return _lidar_packet.data(); + std::memcpy(buf, _lidar_packet.data(), this->getPacketFormat().lidar_packet_size); + return true; } else { - return nullptr; + return false; } } -uint8_t * SensorTins::readImuPacket(const ouster::sensor::client_state & state) +bool SensorTins::readImuPacket(const ouster::sensor::client_state & state, uint8_t * buf) { if (state == ouster::sensor::client_state::IMU_DATA) { - return _imu_packet.data(); + std::memcpy(buf, _imu_packet.data(), this->getPacketFormat().imu_packet_size); + return true; } else { - return nullptr; + return false; } }