diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 0e0536fd2605..148dfee173f9 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -310,6 +310,7 @@ class TTopicSession : public TActorBootstrapped { void SendStatisticToRowDispatcher(); void SendSessionError(TActorId readActorId, TStatus status); bool CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev); + void RestartSessionIfOldestClient(const TClientsInfo& info); private: @@ -750,10 +751,12 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) { auto it = Clients.find(ev->Sender); if (it == Clients.end()) { - LOG_ROW_DISPATCHER_DEBUG("Wrong ClientSettings"); + LOG_ROW_DISPATCHER_WARN("Ignore TEvStopSession from " << ev->Sender << ", no client"); return; } auto& info = *it->second; + RestartSessionIfOldestClient(info); + UnreadBytes -= info.UnreadBytes; Metrics.UnreadBytes->Sub(info.UnreadBytes); if (const auto formatIt = FormatHandlers.find(info.HandlerSettings); formatIt != FormatHandlers.end()) { @@ -769,6 +772,41 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) { SubscribeOnNextEvent(); } +void TTopicSession::RestartSessionIfOldestClient(const TClientsInfo& info) { + // if we read historical data (because of this client), then we restart the session. + + if (!ReadSession || !info.NextMessageOffset) { + return; + } + TMaybe minMessageOffset; + for (auto& [readActorId, clientPtr] : Clients) { + if (info.ReadActorId == readActorId || !clientPtr->NextMessageOffset) { + continue; + } + if (!minMessageOffset) { + minMessageOffset = clientPtr->NextMessageOffset; + continue; + } + minMessageOffset = std::min(minMessageOffset, clientPtr->NextMessageOffset); + } + if (!minMessageOffset) { + return; + } + + if (info.NextMessageOffset >= minMessageOffset) { + return; + } + LOG_ROW_DISPATCHER_INFO("Client (on StopSession) has less offset (" << info.NextMessageOffset << ") than others clients (" << minMessageOffset << "), stop (restart) topic session"); + Metrics.RestartSessionByOffsets->Inc(); + ++RestartSessionByOffsets; + info.RestartSessionByOffsetsByQuery->Inc(); + StopReadSession(); + + if (!ReadSession) { + Schedule(TDuration::Seconds(Config.GetTimeoutBeforeStartSessionSec()), new NFq::TEvPrivate::TEvCreateSession()); + } +} + void TTopicSession::FatalError(TStatus status) { LOG_ROW_DISPATCHER_ERROR("FatalError: " << status.GetErrorMessage()); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 48524b7ebf52..379fe1528fbf 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -27,6 +28,7 @@ constexpr ui64 TimeoutBeforeStartSessionSec = 3; constexpr ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec; static_assert(GrabTimeoutSec <= WAIT_TIMEOUT.Seconds()); +template class TFixture : public NTests::TBaseFixture { public: using TBase = NTests::TBaseFixture; @@ -42,6 +44,7 @@ class TFixture : public NTests::TBaseFixture { } void Init(const TString& topicPath, ui64 maxSessionUsedMemory = std::numeric_limits::max()) { + TopicPath = topicPath; Config.SetTimeoutBeforeStartSessionSec(TimeoutBeforeStartSessionSec); Config.SetMaxSessionUsedMemory(maxSessionUsedMemory); Config.SetSendStatusPeriodSec(2); @@ -60,6 +63,11 @@ class TFixture : public NTests::TBaseFixture { CompileNotifier = Runtime.AllocateEdgeActor(); const auto compileServiceActorId = Runtime.Register(CreatePurecalcCompileServiceMock(CompileNotifier)); + if (MockTopicSession) { + PqGatewayNotifier = Runtime.AllocateEdgeActor(); + MockPqGateway = CreateMockPqGateway(Runtime, PqGatewayNotifier); + } + TopicSession = Runtime.Register(NewTopicSession( "read_group", topicPath, @@ -73,7 +81,7 @@ class TFixture : public NTests::TBaseFixture { CredentialsProviderFactory, MakeIntrusive(), MakeIntrusive(), - CreatePqNativeGateway(pqServices), + !MockTopicSession ? CreatePqNativeGateway(pqServices) : MockPqGateway, 16000000 ).release()); Runtime.EnableScheduleForActor(TopicSession); @@ -95,12 +103,17 @@ class TFixture : public NTests::TBaseFixture { const auto ping = Runtime.GrabEdgeEvent(CompileNotifier); UNIT_ASSERT_C(ping, "Compilation is not performed for predicate: " << predicate); } + + if (MockTopicSession) { + Runtime.GrabEdgeEvent(PqGatewayNotifier, TDuration::Seconds(GrabTimeoutSec)); + MockPqGateway->AddEvent(TopicPath, NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent(nullptr, 0, 0), 0); + } } - NYql::NPq::NProto::TDqPqTopicSource BuildSource(TString topic, bool emptyPredicate = false, const TString& consumer = DefaultPqConsumer) { + NYql::NPq::NProto::TDqPqTopicSource BuildSource(bool emptyPredicate = false, const TString& consumer = DefaultPqConsumer) { NYql::NPq::NProto::TDqPqTopicSource settings; settings.SetEndpoint(GetDefaultPqEndpoint()); - settings.SetTopicPath(topic); + settings.SetTopicPath(TopicPath); settings.SetConsumerName(consumer); settings.SetFormat("json_each_row"); settings.MutableToken()->SetName("token"); @@ -185,10 +198,53 @@ class TFixture : public NTests::TBaseFixture { return TRow().AddUint64(100 * index).AddString(TStringBuilder() << "value" << index); } + using TMessageInformation = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation; + using TMessage = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage; + + TMessageInformation MakeNextMessageInformation(size_t offset, size_t uncompressedSize) { + auto now = TInstant::Now(); + TMessageInformation msgInfo( + offset, + "ProducerId", + 0, + now, + now, + MakeIntrusive(), + MakeIntrusive(), + uncompressedSize, + "messageGroupId" + ); + return msgInfo; + } + + void PQWrite( + const std::vector& sequence, + ui64 firstMessageOffset = 0) { + if (!MockTopicSession) { + NYql::NDq::PQWrite(sequence, TopicPath, GetDefaultPqEndpoint()); + } else { + ui64 offset = firstMessageOffset; + TVector msgs; + size_t size = 0; + for (const auto& s : sequence) { + TMessage msg(s, nullptr, MakeNextMessageInformation(offset++, s.size()), CreatePartitionSession()); + msgs.emplace_back(msg); + size += s.size(); + } + MockPqGateway->AddEvent(TopicPath, NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent(msgs, {}, CreatePartitionSession()), size); + } + } + + void PassAway() { + Runtime.Send(new IEventHandle(TopicSession, RowDispatcherActorId, new NActors::TEvents::TEvPoisonPill)); + } + public: + TString TopicPath; NActors::TActorId TopicSession; NActors::TActorId RowDispatcherActorId; NActors::TActorId CompileNotifier; + NActors::TActorId PqGatewayNotifier; NYdb::TDriver Driver = NYdb::TDriver(NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr"))); std::shared_ptr CredentialsProviderFactory; NActors::TActorId ReadActorId1; @@ -196,6 +252,7 @@ class TFixture : public NTests::TBaseFixture { NActors::TActorId ReadActorId3; ui64 PartitionId = 0; NConfig::TRowDispatcherConfig Config; + TIntrusivePtr MockPqGateway; const TString Json1 = "{\"dt\":100,\"value\":\"value1\"}"; const TString Json2 = "{\"dt\":200,\"value\":\"value2\"}"; @@ -203,33 +260,37 @@ class TFixture : public NTests::TBaseFixture { const TString Json4 = "{\"dt\":400,\"value\":\"value4\"}"; }; +using TRealTopicFixture = TFixture; +using TMockTopicFixture = TFixture; + } // anonymous namespace Y_UNIT_TEST_SUITE(TopicSessionTests) { - Y_UNIT_TEST_F(TwoSessionsWithoutOffsets, TFixture) { + + Y_UNIT_TEST_F(TwoSessionsWithoutOffsets, TRealTopicFixture) { const TString topicName = "topic1"; PQCreateStream(topicName); Init(topicName); - auto source = BuildSource(topicName); + auto source = BuildSource(); StartSession(ReadActorId1, source); StartSession(ReadActorId2, source); std::vector data = { Json1 }; - PQWrite(data, topicName); + PQWrite(data); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }); ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }); ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1); data = { Json2 }; - PQWrite(data, topicName); + PQWrite(data); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1); ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }); ExpectMessageBatch(ReadActorId2, { JsonMessage(2) }); ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 2); - auto source2 = BuildSource(topicName, false, "OtherConsumer"); + auto source2 = BuildSource(false, "OtherConsumer"); StartSession(ReadActorId3, source2, Nothing(), true); ExpectSessionError(ReadActorId3, EStatusId::PRECONDITION_FAILED, "Use the same consumer"); @@ -237,17 +298,17 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId2, source); } - Y_UNIT_TEST_F(TwoSessionWithoutPredicate, TFixture) { + Y_UNIT_TEST_F(TwoSessionWithoutPredicate, TRealTopicFixture) { const TString topicName = "twowithoutpredicate"; PQCreateStream(topicName); Init(topicName); - auto source1 = BuildSource(topicName, true); - auto source2 = BuildSource(topicName, true); + auto source1 = BuildSource(true); + auto source2 = BuildSource(true); StartSession(ReadActorId1, source1); StartSession(ReadActorId2, source2); const std::vector data = { Json1 }; - PQWrite(data, topicName); + PQWrite(data); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); Runtime.Send(new IEventHandle(TopicSession, ReadActorId1, new TEvRowDispatcher::TEvGetNextBatch())); Runtime.Send(new IEventHandle(TopicSession, ReadActorId2, new TEvRowDispatcher::TEvGetNextBatch())); @@ -258,17 +319,17 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId2, source2); } - Y_UNIT_TEST_F(SessionWithPredicateAndSessionWithoutPredicate, TFixture) { + Y_UNIT_TEST_F(SessionWithPredicateAndSessionWithoutPredicate, TRealTopicFixture) { const TString topicName = "topic2"; PQCreateStream(topicName); Init(topicName); - auto source1 = BuildSource(topicName, false); - auto source2 = BuildSource(topicName, true); + auto source1 = BuildSource(false); + auto source2 = BuildSource(true); StartSession(ReadActorId1, source1); StartSession(ReadActorId2, source2); const std::vector data = { Json1 }; - PQWrite(data, topicName); + PQWrite(data); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }); ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }); @@ -277,22 +338,22 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId2, source2); } - Y_UNIT_TEST_F(SecondSessionWithoutOffsetsAfterSessionConnected, TFixture) { + Y_UNIT_TEST_F(SecondSessionWithoutOffsetsAfterSessionConnected, TRealTopicFixture) { const TString topicName = "topic3"; PQCreateStream(topicName); Init(topicName); - auto source = BuildSource(topicName); + auto source = BuildSource(); StartSession(ReadActorId1, source); const std::vector data = { Json1 }; - PQWrite(data, topicName); + PQWrite(data); ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }); StartSession(ReadActorId2, source); const std::vector data2 = { Json2 }; - PQWrite(data2, topicName); + PQWrite(data2); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }); @@ -302,13 +363,13 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId2, source); } - Y_UNIT_TEST_F(TwoSessionsWithOffsets, TFixture) { + Y_UNIT_TEST_F(TwoSessionsWithOffsets, TRealTopicFixture) { const TString topicName = "topic4"; PQCreateStream(topicName); Init(topicName); - auto source = BuildSource(topicName); + auto source = BuildSource(); const std::vector data = { Json1, Json2, Json3}; - PQWrite(data, topicName); + PQWrite(data); StartSession(ReadActorId1, source, 1); StartSession(ReadActorId2, source, 2); @@ -321,7 +382,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { ExpectMessageBatch(ReadActorId2, expected2); const std::vector data2 = { Json4 }; - PQWrite(data2, topicName); + PQWrite(data2); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); ExpectMessageBatch(ReadActorId1, { JsonMessage(4) }); ExpectMessageBatch(ReadActorId2, { JsonMessage(4) }); @@ -330,38 +391,38 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId2, source); } - Y_UNIT_TEST_F(BadDataSessionError, TFixture) { + Y_UNIT_TEST_F(BadDataSessionError, TRealTopicFixture) { const TString topicName = "topic5"; PQCreateStream(topicName); Init(topicName); - auto source = BuildSource(topicName); + auto source = BuildSource(); StartSession(ReadActorId1, source); const std::vector data = { "not json", "noch einmal / nicht json" }; - PQWrite(data, topicName); + PQWrite(data); ExpectSessionError(ReadActorId1, EStatusId::BAD_REQUEST, "INCORRECT_TYPE: The JSON element does not have the requested type."); StopSession(ReadActorId1, source); } - Y_UNIT_TEST_F(WrongFieldType, TFixture) { + Y_UNIT_TEST_F(WrongFieldType, TRealTopicFixture) { const TString topicName = "wrong_field"; PQCreateStream(topicName); Init(topicName); - auto source = BuildSource(topicName); + auto source = BuildSource(); StartSession(ReadActorId1, source); source.AddColumns("field1"); source.AddColumnTypes("[DataType; String]"); StartSession(ReadActorId2, source); - PQWrite({ Json1 }, topicName); + PQWrite({ Json1 }); ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }); ExpectSessionError(ReadActorId2, EStatusId::PRECONDITION_FAILED, "Failed to parse json messages, found 1 missing values"); - PQWrite({ Json2 }, topicName); + PQWrite({ Json2 }); ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }); ExpectSessionError(ReadActorId2, EStatusId::PRECONDITION_FAILED, "Failed to parse json messages, found 1 missing values"); @@ -370,15 +431,15 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId2, source); } - Y_UNIT_TEST_F(RestartSessionIfNewClientWithOffset, TFixture) { + Y_UNIT_TEST_F(RestartSessionIfNewClientWithOffset, TRealTopicFixture) { const TString topicName = "topic6"; PQCreateStream(topicName); Init(topicName); - auto source = BuildSource(topicName); + auto source = BuildSource(); StartSession(ReadActorId1, source); const std::vector data = { Json1, Json2, Json3 }; // offset 0, 1, 2 - PQWrite(data, topicName); + PQWrite(data); ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(2), JsonMessage(3) }); @@ -386,7 +447,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StartSession(ReadActorId2, source, 1); ExpectNewDataArrived({ReadActorId2}); - PQWrite({ Json4 }, topicName); + PQWrite({ Json4 }); ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(4) }); @@ -396,20 +457,20 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId2, source); } - Y_UNIT_TEST_F(ReadNonExistentTopic, TFixture) { + Y_UNIT_TEST_F(ReadNonExistentTopic, TRealTopicFixture) { const TString topicName = "topic7"; Init(topicName); - auto source = BuildSource(topicName); + auto source = BuildSource(); StartSession(ReadActorId1, source); ExpectSessionError(ReadActorId1, EStatusId::SCHEME_ERROR, "no path"); StopSession(ReadActorId1, source); } - Y_UNIT_TEST_F(SlowSession, TFixture) { + Y_UNIT_TEST_F(SlowSession, TRealTopicFixture) { const TString topicName = "topic8"; PQCreateStream(topicName); Init(topicName, 40); - auto source = BuildSource(topicName); + auto source = BuildSource(); StartSession(ReadActorId1, source); StartSession(ReadActorId2, source); // slow session @@ -417,7 +478,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { auto writeMessages = [&]() { for (size_t i = 0; i < messagesSize; ++i) { const std::vector data = { Json1 }; - PQWrite(data, topicName); + PQWrite(data); } Sleep(TDuration::MilliSeconds(100)); }; @@ -450,12 +511,12 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId1, source); } - Y_UNIT_TEST_F(TwoSessionsWithDifferentSchemes, TFixture) { + Y_UNIT_TEST_F(TwoSessionsWithDifferentSchemes, TRealTopicFixture) { const TString topicName = "dif_schemes"; PQCreateStream(topicName); Init(topicName); - auto source1 = BuildSource(topicName); - auto source2 = BuildSource(topicName); + auto source1 = BuildSource(); + auto source2 = BuildSource(); source2.AddColumns("field1"); source2.AddColumnTypes("[DataType; String]"); @@ -465,19 +526,19 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { TString json1 = "{\"dt\":100,\"value\":\"value1\", \"field1\":\"field1\"}"; TString json2 = "{\"dt\":200,\"value\":\"value2\", \"field1\":\"field2\"}"; - PQWrite({ json1, json2 }, topicName); + PQWrite({ json1, json2 }); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(2) }); ExpectMessageBatch(ReadActorId2, { JsonMessage(1).AddString("field1"), JsonMessage(2).AddString("field2") }); - auto source3 = BuildSource(topicName); + auto source3 = BuildSource(); source3.AddColumns("field2"); source3.AddColumnTypes("[DataType; String]"); auto readActorId3 = Runtime.AllocateEdgeActor(); StartSession(readActorId3, source3); TString json3 = "{\"dt\":300,\"value\":\"value3\", \"field1\":\"value1_field1\", \"field2\":\"value1_field2\"}"; - PQWrite({ json3 }, topicName); + PQWrite({ json3 }); ExpectNewDataArrived({ReadActorId1, ReadActorId2, readActorId3}); ExpectMessageBatch(ReadActorId1, { JsonMessage(3) }); ExpectMessageBatch(ReadActorId2, { JsonMessage(3).AddString("value1_field1") }); @@ -488,7 +549,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { TString json4 = "{\"dt\":400,\"value\":\"value4\", \"field1\":\"value2_field1\", \"field2\":\"value2_field2\"}"; TString json5 = "{\"dt\":500,\"value\":\"value5\", \"field1\":\"value3_field1\", \"field2\":\"value3_field2\"}"; - PQWrite({ json4, json5 }, topicName); + PQWrite({ json4, json5 }); ExpectNewDataArrived({ReadActorId2}); ExpectMessageBatch(ReadActorId2, { JsonMessage(4).AddString("value2_field1"), JsonMessage(5).AddString("value3_field1") }); @@ -496,27 +557,55 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StopSession(ReadActorId2, source2); } - Y_UNIT_TEST_F(TwoSessionsWithDifferentColumnTypes, TFixture) { + Y_UNIT_TEST_F(TwoSessionsWithDifferentColumnTypes, TRealTopicFixture) { const TString topicName = "dif_types"; PQCreateStream(topicName); Init(topicName); - auto source1 = BuildSource(topicName); + auto source1 = BuildSource(); source1.AddColumns("field1"); source1.AddColumnTypes("[OptionalType; [DataType; String]]"); StartSession(ReadActorId1, source1); TString json1 = "{\"dt\":100,\"field1\":\"str\",\"value\":\"value1\"}"; - PQWrite({ json1 }, topicName); + PQWrite({ json1 }); ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1).AddString("str", true) }); - auto source2 = BuildSource(topicName); + auto source2 = BuildSource(); source2.AddColumns("field1"); source2.AddColumnTypes("[DataType; String]"); StartSession(ReadActorId2, source2, Nothing(), true); ExpectSessionError(ReadActorId2, EStatusId::SCHEME_ERROR, "Use the same column type in all queries via RD, current type for column `field1` is [OptionalType; [DataType; String]] (requested type is [DataType; String])"); } + + Y_UNIT_TEST_F(RestartSessionIfQueryStopped, TMockTopicFixture) { + Init("fake_topic", 1000); + auto source = BuildSource(); + + StartSession(ReadActorId1, source); + std::vector data = { Json1, Json2, Json3 }; + PQWrite(data, 1); + ExpectNewDataArrived({ReadActorId1}); + ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(2), JsonMessage(3) }); + + StartSession(ReadActorId2, source, 1); + std::vector data2 = { Json1 }; + PQWrite(data2, 1); + ExpectNewDataArrived({ReadActorId2}); + ExpectMessageBatch(ReadActorId2, { JsonMessage(1)}); + + StopSession(ReadActorId2, source); + Runtime.GrabEdgeEvent(PqGatewayNotifier, TDuration::Seconds(GrabTimeoutSec)); + MockPqGateway->AddEvent(TopicPath, NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent(nullptr, 0, 0), 0); + + std::vector data3 = { Json4 }; + PQWrite(data3, 4); + ExpectNewDataArrived({ReadActorId1}); + ExpectMessageBatch(ReadActorId1, { JsonMessage(4) }); + + PassAway(); + } } } // namespace NFq::NRowDispatcher::NTests diff --git a/ydb/core/fq/libs/row_dispatcher/ut/ya.make b/ydb/core/fq/libs/row_dispatcher/ut/ya.make index 17812aa343d2..782b4498a291 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ut/ya.make @@ -17,6 +17,7 @@ PEERDIR( ydb/core/testlib/actors ydb/tests/fq/pq_async_io yql/essentials/sql/pg_dummy + ydb/library/yql/providers/pq/gateway/dummy ) SIZE(MEDIUM) diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_blocking_queue.h b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_blocking_queue.h new file mode 100644 index 000000000000..7a8db40dc554 --- /dev/null +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_blocking_queue.h @@ -0,0 +1,85 @@ +#include + +#include +#include + +namespace NYql { + +template +class TBlockingEQueue { +public: + explicit TBlockingEQueue(size_t maxSize) + : MaxSize_(maxSize) + { + } + void Push(TEvent&& e, size_t size = 0) { + with_lock(Mutex_) { + CanPush_.WaitI(Mutex_, [this] () {return CanPushPredicate();}); + Events_.emplace_back(std::move(e), size ); + Size_ += size; + } + CanPop_.BroadCast(); + } + + void BlockUntilEvent() { + with_lock(Mutex_) { + CanPop_.WaitI(Mutex_, [this] () {return CanPopPredicate();}); + } + } + + TMaybe Pop(bool block) { + with_lock(Mutex_) { + if (block) { + CanPop_.WaitI(Mutex_, [this] () {return CanPopPredicate();}); + } else { + if (!CanPopPredicate()) { + return {}; + } + } + if (Events_.empty()) { + return {}; + } + + auto [front, size] = std::move(Events_.front()); + Events_.pop_front(); + Size_ -= size; + if (Size_ < MaxSize_) { + CanPush_.BroadCast(); + } + return std::move(front); // cast to TMaybe<> + } + } + + void Stop() { + with_lock(Mutex_) { + Stopped_ = true; + CanPop_.BroadCast(); + CanPush_.BroadCast(); + } + } + + bool IsStopped() { + with_lock(Mutex_) { + return Stopped_; + } + } + +private: + bool CanPopPredicate() const { + return !Events_.empty() || Stopped_; + } + + bool CanPushPredicate() const { + return Size_ < MaxSize_ || Stopped_; + } + + size_t MaxSize_; + size_t Size_ = 0; + TDeque> Events_; + bool Stopped_ = false; + TMutex Mutex_; + TCondVar CanPop_; + TCondVar CanPush_; +}; + +} diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp index 71f73876222f..36914e27f455 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp @@ -3,90 +3,13 @@ #include -#include #include #include +#include "yql_pq_blocking_queue.h" namespace NYql { -template -class TBlockingEQueue { -public: - explicit TBlockingEQueue(size_t maxSize) - : MaxSize_(maxSize) - { - } - void Push(TEvent&& e, size_t size = 0) { - with_lock(Mutex_) { - CanPush_.WaitI(Mutex_, [this] () {return CanPushPredicate();}); - Events_.emplace_back(std::move(e), size ); - Size_ += size; - } - CanPop_.BroadCast(); - } - - void BlockUntilEvent() { - with_lock(Mutex_) { - CanPop_.WaitI(Mutex_, [this] () {return CanPopPredicate();}); - } - } - - TMaybe Pop(bool block) { - with_lock(Mutex_) { - if (block) { - CanPop_.WaitI(Mutex_, [this] () {return CanPopPredicate();}); - } else { - if (!CanPopPredicate()) { - return {}; - } - } - if (Events_.empty()) { - return {}; - } - - auto [front, size] = std::move(Events_.front()); - Events_.pop_front(); - Size_ -= size; - if (Size_ < MaxSize_) { - CanPush_.BroadCast(); - } - return std::move(front); // cast to TMaybe<> - } - } - - void Stop() { - with_lock(Mutex_) { - Stopped_ = true; - CanPop_.BroadCast(); - CanPush_.BroadCast(); - } - } - - bool IsStopped() { - with_lock(Mutex_) { - return Stopped_; - } - } - -private: - bool CanPopPredicate() const { - return !Events_.empty() || Stopped_; - } - - bool CanPushPredicate() const { - return Size_ < MaxSize_ || Stopped_; - } - - size_t MaxSize_; - size_t Size_ = 0; - TDeque> Events_; - bool Stopped_ = false; - TMutex Mutex_; - TCondVar CanPop_; - TCondVar CanPush_; -}; - class TFileTopicReadSession : public NYdb::NTopic::IReadSession { constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); diff --git a/ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp b/ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp new file mode 100644 index 000000000000..ff4dba4de4f3 --- /dev/null +++ b/ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp @@ -0,0 +1,184 @@ +#include "mock_pq_gateway.h" + +#include +#include + +namespace NYql::NDq { + +namespace { + +using TQueue = NYql::TBlockingEQueue; + +class TMockTopicReadSession : public NYdb::NTopic::IReadSession { +public: + TMockTopicReadSession(NYdb::NTopic::TPartitionSession::TPtr session, std::shared_ptr queue) + : Session(std::move(session)) + , Queue(queue) { + if (Queue->IsStopped()) { + Queue->~TBlockingEQueue(); + new (Queue.get()) TQueue(4_MB); + } + ThreadPool.Start(1); + } + + NThreading::TFuture WaitEvent() override { + return NThreading::Async([&] () { + Queue->BlockUntilEvent(); + return NThreading::MakeFuture(); + }, ThreadPool); + } + + TVector GetEvents(bool block, TMaybe maxEventsCount, size_t /*maxByteSize*/) override { + TVector res; + for (auto event = Queue->Pop(block); !event.Empty() && res.size() <= maxEventsCount.GetOrElse(std::numeric_limits::max()); event = Queue->Pop(/*block=*/ false)) { + res.push_back(*event); + } + return res; + } + + TVector GetEvents(const NYdb::NTopic::TReadSessionGetEventSettings& settings) override { + return GetEvents(settings.Block_, settings.MaxEventsCount_, settings.MaxByteSize_); + } + + TMaybe GetEvent(bool block, size_t /*maxByteSize*/) override { + return Queue->Pop(block); + } + + TMaybe GetEvent(const NYdb::NTopic::TReadSessionGetEventSettings& settings) override { + return GetEvent(settings.Block_, settings.MaxByteSize_); + } + + bool Close(TDuration timeout = TDuration::Max()) override { + Y_UNUSED(timeout); + Queue->Stop(); + ThreadPool.Stop(); + return true; + } + + NYdb::NTopic::TReaderCounters::TPtr GetCounters() const override {return nullptr;} + TString GetSessionId() const override {return "fake";} +private: + TThreadPool ThreadPool; + NYdb::NTopic::TPartitionSession::TPtr Session; + std::shared_ptr Queue; +}; + +struct TDummyPartitionSession: public NYdb::NTopic::TPartitionSession { + TDummyPartitionSession() {} + void RequestStatus() override {} +}; + +class TMockPqGateway : public IMockPqGateway { + + struct TMockTopicClient : public NYql::ITopicClient { + + TMockTopicClient(TMockPqGateway* self): Self(self) { } + + NYdb::TAsyncStatus CreateTopic(const TString& /*path*/, const NYdb::NTopic::TCreateTopicSettings& /*settings*/ = {}) override {return NYdb::TAsyncStatus{};} + NYdb::TAsyncStatus AlterTopic(const TString& /*path*/, const NYdb::NTopic::TAlterTopicSettings& /*settings*/ = {}) override {return NYdb::TAsyncStatus{};} + NYdb::TAsyncStatus DropTopic(const TString& /*path*/, const NYdb::NTopic::TDropTopicSettings& /*settings*/ = {}) override {return NYdb::TAsyncStatus{};} + NYdb::NTopic::TAsyncDescribeTopicResult DescribeTopic(const TString& /*path*/, + const NYdb::NTopic::TDescribeTopicSettings& /*settings*/ = {}) override {return NYdb::NTopic::TAsyncDescribeTopicResult{};} + + NYdb::NTopic::TAsyncDescribeConsumerResult DescribeConsumer(const TString& /*path*/, const TString& /*consumer*/, + const NYdb::NTopic::TDescribeConsumerSettings& /*settings*/ = {}) override {return NYdb::NTopic::TAsyncDescribeConsumerResult{};} + + NYdb::NTopic::TAsyncDescribePartitionResult DescribePartition(const TString& /*path*/, i64 /*partitionId*/, + const NYdb::NTopic::TDescribePartitionSettings& /*settings*/ = {}) override {return NYdb::NTopic::TAsyncDescribePartitionResult{};} + + std::shared_ptr CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings) override { + Y_ENSURE(!settings.Topics_.empty()); + TString topic = settings.Topics_.front().Path_; + Self->Runtime.Send(new NActors::IEventHandle(Self->Notifier, NActors::TActorId(), new NYql::NDq::TEvMockPqEvents::TEvCreateSession())); + return std::make_shared(MakeIntrusive(), Self->GetEventQueue(topic)); + } + + std::shared_ptr CreateSimpleBlockingWriteSession( + const NYdb::NTopic::TWriteSessionSettings& /*settings*/) override { + return nullptr; + } + std::shared_ptr CreateWriteSession(const NYdb::NTopic::TWriteSessionSettings& /*settings*/) override { + return nullptr; + } + + NYdb::TAsyncStatus CommitOffset(const TString& /*path*/, ui64 /*partitionId*/, const TString& /*consumerName*/, ui64 /*offset*/, + const NYdb::NTopic::TCommitOffsetSettings& /*settings*/ = {}) override {return NYdb::TAsyncStatus{};} + + TMockPqGateway* Self; + }; +public: + + TMockPqGateway( + NActors::TTestActorRuntime& runtime, + NActors::TActorId notifier) + : Runtime(runtime) + , Notifier(notifier) {} + + ~TMockPqGateway() {} + + NThreading::TFuture OpenSession(const TString& /*sessionId*/, const TString& /*username*/) override { + return NThreading::MakeFuture(); + } + NThreading::TFuture CloseSession(const TString& /*sessionId*/) override { + return NThreading::MakeFuture(); + } + + ::NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath( + const TString& /*sessionId*/, + const TString& /*cluster*/, + const TString& /*database*/, + const TString& /*path*/, + const TString& /*token*/) override { + return ::NPq::NConfigurationManager::TAsyncDescribePathResult{}; + } + + NThreading::TFuture ListStreams( + const TString& /*sessionId*/, + const TString& /*cluster*/, + const TString& /*database*/, + const TString& /*token*/, + ui32 /*limit*/, + const TString& /*exclusiveStartStreamName*/ = {}) override { + return NThreading::TFuture{}; + } + + void UpdateClusterConfigs( + const TString& /*clusterName*/, + const TString& /*endpoint*/, + const TString& /*database*/, + bool /*secure*/) override {} + + NYql::ITopicClient::TPtr GetTopicClient(const NYdb::TDriver& /*driver*/, const NYdb::NTopic::TTopicClientSettings& /*settings*/) override { + return MakeIntrusive(this); + } + + std::shared_ptr GetEventQueue(const TString& topic) { + if (!Queues.contains(topic)) { + Queues[topic] = std::make_shared(4_MB); + } + return Queues[topic]; + } + + void AddEvent(const TString& topic, NYdb::NTopic::TReadSessionEvent::TEvent&& e, size_t size) override { + GetEventQueue(topic)->Push(std::move(e), size); + } + +private: + std::unordered_map> Queues; + NActors::TTestActorRuntime& Runtime; + NActors::TActorId Notifier; +}; + +} + +NYdb::NTopic::TPartitionSession::TPtr CreatePartitionSession() { + return MakeIntrusive(); +} + +TIntrusivePtr CreateMockPqGateway( + NActors::TTestActorRuntime& runtime, + NActors::TActorId notifier) { + return MakeIntrusive(runtime, notifier); +} + +} diff --git a/ydb/tests/fq/pq_async_io/mock_pq_gateway.h b/ydb/tests/fq/pq_async_io/mock_pq_gateway.h new file mode 100644 index 000000000000..b6a05e170b7e --- /dev/null +++ b/ydb/tests/fq/pq_async_io/mock_pq_gateway.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include + +namespace NYql::NDq { + +struct TEvMockPqEvents { + enum EEv : ui32 { + EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvCreateSession = EvBegin, + EvEnd + }; + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + struct TEvCreateSession : public NActors::TEventLocal {}; +}; + +class IMockPqGateway : public NYql::IPqGateway { +public: + virtual void AddEvent(const TString& topic, NYdb::NTopic::TReadSessionEvent::TEvent&& e, size_t size) = 0; +}; + +NYdb::NTopic::TPartitionSession::TPtr CreatePartitionSession(); + +TIntrusivePtr CreateMockPqGateway( + NActors::TTestActorRuntime& runtime, + NActors::TActorId notifier); + +} diff --git a/ydb/tests/fq/pq_async_io/ya.make b/ydb/tests/fq/pq_async_io/ya.make index 487500ce4bab..75efb71a2b4f 100644 --- a/ydb/tests/fq/pq_async_io/ya.make +++ b/ydb/tests/fq/pq_async_io/ya.make @@ -1,12 +1,14 @@ LIBRARY() SRCS( + mock_pq_gateway.cpp ut_helpers.cpp ) PEERDIR( yql/essentials/minikql/computation/llvm14 ydb/library/yql/providers/common/ut_helpers + ydb/library/yql/providers/pq/gateway/dummy ydb/public/sdk/cpp/client/ydb_topic )