From 0935622984ac742738513d4108e31f8d142236e0 Mon Sep 17 00:00:00 2001
From: Mario Dominguez <mariodominguez@eprosima.com>
Date: Wed, 17 Apr 2024 15:56:35 +0200
Subject: [PATCH 1/3] Refs #20706: Add regression BB test

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
---
 .../common/DDSBlackboxTestsDataReader.cpp     | 106 ++++++++++++++++++
 1 file changed, 106 insertions(+)

diff --git a/test/blackbox/common/DDSBlackboxTestsDataReader.cpp b/test/blackbox/common/DDSBlackboxTestsDataReader.cpp
index d2bb301541b..c96fc2a8db6 100644
--- a/test/blackbox/common/DDSBlackboxTestsDataReader.cpp
+++ b/test/blackbox/common/DDSBlackboxTestsDataReader.cpp
@@ -16,6 +16,8 @@
 
 #include <gtest/gtest.h>
 
+#include <fastdds/dds/core/StackAllocatedSequence.hpp>
+#include <fastrtps/transport/test_UDPv4TransportDescriptor.h>
 #include <fastrtps/xmlparser/XMLProfileManager.h>
 
 #include "BlackboxTests.hpp"
@@ -230,6 +232,110 @@ TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo)
     ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count();
 }
 
+//! Regression test for #20706
+//! get_first_untaken_info() returns the first valid change of an instance, not only the first
+//! cache change. This implies searching in all the cache changes of the instance.
+//! In the scenario of having multiple reliable writers and one reader with history size > 1 in the same topic,
+//! it can happen that get_first_untaken_info() returns OK (as it is not currently checking whether the change is in the future)
+//! but take() returns NO_DATA because it is waiting for a previous SequenceNumber from the writer.
+TEST(DDSDataReader, GetFirstUntakenInfoReturnsTheFirstValidChange)
+{
+    PubSubWriter<HelloWorldPubSubType> writer_1(TEST_TOPIC_NAME);
+    PubSubWriter<HelloWorldPubSubType> writer_2(TEST_TOPIC_NAME);
+    // The reader should not take nor read any sample in this test
+    PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME, false, false, false);
+
+    auto testTransport_1 = std::make_shared<test_UDPv4TransportDescriptor>();
+
+    EntityId_t writer1_id;
+    EntityId_t reader_id;
+
+    testTransport_1->drop_data_messages_filter_ =
+            [&writer1_id, &reader_id](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool
+            {
+                uint32_t old_pos = msg.pos;
+
+                // see RTPS DDS 9.4.5.3 Data Submessage
+                EntityId_t readerID;
+                EntityId_t writerID;
+                SequenceNumber_t sn;
+
+                msg.pos += 2; // flags
+                msg.pos += 2; // octets to inline quos
+                CDRMessage::readEntityId(&msg, &readerID);
+                CDRMessage::readEntityId(&msg, &writerID);
+                CDRMessage::readSequenceNumber(&msg, &sn);
+
+                // restore buffer pos
+                msg.pos = old_pos;
+
+                // Loose Seqnum 1
+                if (writerID == writer1_id &&
+                        readerID == reader_id &&
+                        (sn == SequenceNumber_t{0, 1}))
+                {
+                    return true;
+                }
+
+                return false;
+            };
+
+    writer_1.disable_builtin_transport()
+            .add_user_transport_to_pparams(testTransport_1)
+            .history_depth(3)
+            .init();
+
+    writer_2.history_depth(3)
+            .init();
+
+    reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
+            .history_depth(3)
+            .init();
+
+    ASSERT_TRUE(writer_1.isInitialized());
+    ASSERT_TRUE(writer_2.isInitialized());
+    ASSERT_TRUE(reader.isInitialized());
+
+    writer1_id = writer_1.datawriter_guid().entityId;
+    reader_id = reader.datareader_guid().entityId;
+
+    // Wait for discovery.
+    writer_1.wait_discovery();
+    writer_2.wait_discovery();
+    reader.wait_discovery(std::chrono::seconds::zero(), 2);
+
+    // Send writer_1 samples
+    auto data = default_helloworld_data_generator(3);
+
+    reader.startReception(data);
+    writer_1.send(data);
+
+    // The reader should have received samples 2,3 but not 1
+    // get_first_untaken_info() should never return OK since the received changes are all in the future.
+    // We try it several times in case the reader has not received the samples yet.
+    eprosima::fastdds::dds::SampleInfo info;
+    for (size_t i = 0; i < 3; i++)
+    {
+        ASSERT_NE(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK, reader.get_native_reader().get_first_untaken_info(
+                    &info));
+        std::this_thread::sleep_for(std::chrono::milliseconds(500));
+    }
+
+    // Now we send data from writer_2 with no drops and all samples shall be received.
+    data = default_helloworld_data_generator(3);
+    writer_2.send(data);
+    reader.block_for_unread_count_of(3);
+
+    // get_first_untaken_info() must return OK now
+    ASSERT_EQ(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK,
+            reader.get_native_reader().get_first_untaken_info(&info));
+    eprosima::fastdds::dds::StackAllocatedSequence<HelloWorld*, 1> data_values;
+    eprosima::fastdds::dds::SampleInfoSeq sample_infos{1};
+    // As get_first_untaken_info() returns OK, take() must return OK too
+    ASSERT_EQ(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK,
+            reader.get_native_reader().take(data_values, sample_infos));
+}
+
 //! Regression test for Issues #3822 Github #3875
 //! This test needs to late join a reader in the same process.
 //! Not setting this test as parametrized since it only makes sense in intraprocess.

From 51abaa763a07821181ff8b993910e10a7c2e855c Mon Sep 17 00:00:00 2001
From: Mario Dominguez <mariodominguez@eprosima.com>
Date: Wed, 17 Apr 2024 15:57:46 +0200
Subject: [PATCH 2/3] Refs #20706: Fix

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
---
 include/fastdds/dds/subscriber/DataReader.hpp     |  4 +++-
 src/cpp/fastdds/subscriber/DataReaderImpl.hpp     |  3 ++-
 .../subscriber/history/DataReaderHistory.cpp      | 15 +++++++++++++--
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/include/fastdds/dds/subscriber/DataReader.hpp b/include/fastdds/dds/subscriber/DataReader.hpp
index ad7eaee01ae..61846b6a91b 100644
--- a/include/fastdds/dds/subscriber/DataReader.hpp
+++ b/include/fastdds/dds/subscriber/DataReader.hpp
@@ -785,7 +785,9 @@ class DataReader : public DomainEntity
             const void* instance) const;
 
     /**
-     * @brief Returns information about the first untaken sample.
+     * @brief Returns information about the first untaken sample. This method is meant to be called prior to
+     * a read() or take() operation as it does not modify the status condition of the entity.
+     *
      *
      * @param [out] info Pointer to a SampleInfo_t structure to store first untaken sample information.
      *
diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp
index 4ba6f323862..6294dd3598c 100644
--- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp
+++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp
@@ -199,7 +199,8 @@ class DataReaderImpl
             SampleInfoSeq& sample_infos);
 
     /**
-     * @brief Returns information about the first untaken sample.
+     * @brief Returns information about the first untaken sample. This method is meant to be called prior to
+     * a read() or take() operation as it does not modify the status condition of the entity.
      * @param [out] info Pointer to a SampleInfo structure to store first untaken sample information.
      * @return true if sample info was returned. false if there is no sample to take.
      */
diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
index 392c86f4864..cce8eb50e51 100644
--- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
+++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
@@ -356,9 +356,20 @@ bool DataReaderHistory::get_first_untaken_info(
     for (auto& it : data_available_instances_)
     {
         auto& instance_changes = it.second->cache_changes;
-        if (!instance_changes.empty())
+        for (auto& instance_change : instance_changes)
         {
-            ReadTakeCommand::generate_info(info, *(it.second), instance_changes.front());
+            WriterProxy* wp;
+            bool is_future_change = false;
+
+            {
+                std::lock_guard<RecursiveTimedMutex> _(mp_reader->getMutex());
+                if (mp_reader->begin_sample_access_nts(instance_change, wp, is_future_change) && is_future_change)
+                {
+                    continue;
+                }
+            }
+
+            ReadTakeCommand::generate_info(info, *(it.second), instance_change);
             return true;
         }
     }

From cf9078ca129a2e04cb9412b5c8ff862072c517b9 Mon Sep 17 00:00:00 2001
From: Mario Dominguez <mariodominguez@eprosima.com>
Date: Thu, 18 Apr 2024 07:47:33 +0200
Subject: [PATCH 3/3] Refs #20706: Apply review suggestions

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
---
 src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
index cce8eb50e51..d8455e497ee 100644
--- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
+++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
@@ -358,12 +358,13 @@ bool DataReaderHistory::get_first_untaken_info(
         auto& instance_changes = it.second->cache_changes;
         for (auto& instance_change : instance_changes)
         {
-            WriterProxy* wp;
+            WriterProxy* wp = nullptr;
             bool is_future_change = false;
 
+            if (mp_reader->begin_sample_access_nts(instance_change, wp, is_future_change))
             {
-                std::lock_guard<RecursiveTimedMutex> _(mp_reader->getMutex());
-                if (mp_reader->begin_sample_access_nts(instance_change, wp, is_future_change) && is_future_change)
+                mp_reader->end_sample_access_nts(instance_change, wp, false);
+                if (is_future_change)
                 {
                     continue;
                 }