Skip to content

Commit

Permalink
signals to writing trace (#13763)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 24, 2025
1 parent 552d995 commit 3d22318
Show file tree
Hide file tree
Showing 18 changed files with 217 additions and 74 deletions.
25 changes: 12 additions & 13 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
TInsertedPortions writtenData = ev->Get()->DetachInsertedData();
if (ev->Get()->GetWriteStatus() == NKikimrProto::OK) {
const TMonotonic now = TMonotonic::Now();
for (auto&& i: writtenData.GetWriteResults()) {
for (auto&& i : writtenData.GetWriteResults()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", i.GetDataSize())("event", "data_write_finished")(
"writing_id", i.GetWriteMeta().GetId());
i.MutableWriteMeta().OnStage(NEvWrite::EWriteStage::Finished);
Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
}
Expand All @@ -111,6 +112,7 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", i.GetDataSize())("event", "data_write_error")(
"writing_id", i.GetWriteMeta().GetId());
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
i.MutableWriteMeta().OnStage(NEvWrite::EWriteStage::Finished);
}

Execute(new TTxBlobsWritingFailed(this, ev->Get()->GetWriteStatus(), std::move(writtenData)), ctx);
Expand All @@ -129,6 +131,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo

for (auto&& aggr : baseAggregations) {
const auto& writeMeta = aggr->GetWriteMeta();
aggr->MutableWriteMeta().OnStage(NEvWrite::EWriteStage::Finished);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "blobs_write_finished")("writing_size", aggr->GetSize())(
"writing_id", writeMeta.GetId())("status", putResult.GetPutStatus());
Counters.GetWritesMonitor()->OnFinishWrite(aggr->GetSize(), 1);
Expand Down Expand Up @@ -160,12 +163,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
} else {
const TMonotonic now = TMonotonic::Now();
Counters.OnWritePutBlobsSuccess(now - writeMeta.GetWriteStartInstant(), aggr->GetRows());
Counters.GetCSCounters().OnWriteMiddle1PutBlobsSuccess(now - writeMeta.GetWriteMiddle1StartInstant());
Counters.GetCSCounters().OnWriteMiddle2PutBlobsSuccess(now - writeMeta.GetWriteMiddle2StartInstant());
Counters.GetCSCounters().OnWriteMiddle3PutBlobsSuccess(now - writeMeta.GetWriteMiddle3StartInstant());
Counters.GetCSCounters().OnWriteMiddle4PutBlobsSuccess(now - writeMeta.GetWriteMiddle4StartInstant());
Counters.GetCSCounters().OnWriteMiddle5PutBlobsSuccess(now - writeMeta.GetWriteMiddle5StartInstant());
Counters.GetCSCounters().OnWriteMiddle6PutBlobsSuccess(now - writeMeta.GetWriteMiddle6StartInstant());
LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId()
<< (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "")
<< " at tablet " << TabletID());
Expand Down Expand Up @@ -195,7 +192,9 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
granuleShardingVersion = record.GetGranuleShardingVersion();
}

NEvWrite::TWriteMeta writeMeta(writeId, pathId, source, granuleShardingVersion, TGUID::CreateTimebased().AsGuidString());
auto writeMetaPtr = std::make_shared<NEvWrite::TWriteMeta>(writeId, pathId, source, granuleShardingVersion,
TGUID::CreateTimebased().AsGuidString(), Counters.GetCSCounters().WritingCounters->GetWriteFlowCounters());
auto& writeMeta = *writeMetaPtr;
if (record.HasModificationType()) {
writeMeta.SetModificationType(TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto(record.GetModificationType()));
}
Expand Down Expand Up @@ -243,7 +242,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::IncorrectSchema);
}

NEvWrite::TWriteData writeData(writeMeta, arrowData, snapshotSchema->GetIndexInfo().GetReplaceKey(),
NEvWrite::TWriteData writeData(writeMetaPtr, arrowData, snapshotSchema->GetIndexInfo().GetReplaceKey(),
StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING), false);
auto overloadStatus = CheckOverloadedImmediate(pathId);
if (overloadStatus == EOverloadStatus::None) {
Expand Down Expand Up @@ -273,13 +272,11 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
LOG_S_DEBUG("Write (blob) " << writeData.GetSize() << " bytes into pathId " << writeMeta.GetTableId()
<< (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : " ")
<< Counters.GetWritesMonitor()->DebugString() << " at tablet " << TabletID());
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());

NOlap::TWritingContext context(TabletID(), SelfId(), snapshotSchema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot(), std::make_shared<TAtomicCounter>(1), true,
BufferizationInsertionWriteActorId, BufferizationPortionsWriteActorId);
std::shared_ptr<NConveyor::ITask> task =
std::make_shared<NOlap::TBuildBatchesTask>(std::move(writeData), context);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(std::move(writeData), context);
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
}
}
Expand Down Expand Up @@ -571,7 +568,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, pathId, source, {}, TGUID::CreateTimebased().AsGuidString()),
OverloadWriteFail(overloadStatus,
NEvWrite::TWriteMeta(0, pathId, source, {}, TGUID::CreateTimebased().AsGuidString(),
Counters.GetCSCounters().WritingCounters->GetWriteFlowCounters()),
arrowData->GetSize(), cookie, std::move(result), ctx);
return;
}
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "common/owner.h"

#include <ydb/core/tx/columnshard/counters/tablet_counters.h>
#include <ydb/core/tx/data_events/common/signals_flow.h>

#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <util/generic/hash_set.h>
Expand All @@ -30,18 +31,19 @@ class TWriteCounters: public TCommonCountersOwner {
NMonitoring::THistogramPtr HistogramDurationQueueWait;
NMonitoring::THistogramPtr HistogramBatchDataCount;
NMonitoring::THistogramPtr HistogramBatchDataSize;
YDB_READONLY_DEF(std::shared_ptr<NEvWrite::TWriteFlowCounters>, WriteFlowCounters);

public:
const NMonitoring::TDynamicCounters::TCounterPtr QueueWaitSize;

void OnWritingTaskDequeue(const TDuration d){
void OnWritingTaskDequeue(const TDuration d) {
HistogramDurationQueueWait->Collect(d.MilliSeconds());
}

TWriteCounters(TCommonCountersOwner& owner)
: TBase(owner, "activity", "writing")
, QueueWaitSize(TBase::GetValue("Write/Queue/Size"))
{
, WriteFlowCounters(std::make_shared<NEvWrite::TWriteFlowCounters>())
, QueueWaitSize(TBase::GetValue("Write/Queue/Size")) {
VolumeWriteData = TBase::GetDeriviative("Write/Incoming/Bytes");
HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100));
HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/counters/counters_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,4 @@ class TCountersManager {
}
};

} // namespace NKikimr::NColumnShard
} // namespace NKikimr::NColumnShard
24 changes: 12 additions & 12 deletions ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,22 @@ std::shared_ptr<NKikimr::NOlap::TUserData> TWideSerializedBatch::BuildInsertionU
return std::make_shared<NOlap::TUserData>(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), SplittedBlobs.GetData());
}

void TWritingBuffer::InitReadyInstant(const TMonotonic instant) {
for (auto&& aggr : Aggregations) {
aggr->MutableWriteMeta().SetWriteMiddle5StartInstant(instant);
}
void TWritingBuffer::InitReadyInstant(const TMonotonic /*instant*/) {
// for (auto&& aggr : Aggregations) {
// aggr->MutableWriteMeta().SetWriteMiddle5StartInstant(instant);
// }
}

void TWritingBuffer::InitStartSending(const TMonotonic instant) {
for (auto&& aggr : Aggregations) {
aggr->MutableWriteMeta().SetWriteMiddle4StartInstant(instant);
}
void TWritingBuffer::InitStartSending(const TMonotonic /*instant*/) {
// for (auto&& aggr : Aggregations) {
// aggr->MutableWriteMeta().SetWriteMiddle4StartInstant(instant);
// }
}

void TWritingBuffer::InitReplyReceived(const TMonotonic instant) {
for (auto&& aggr : Aggregations) {
aggr->MutableWriteMeta().SetWriteMiddle6StartInstant(instant);
}
void TWritingBuffer::InitReplyReceived(const TMonotonic /*instant*/) {
// for (auto&& aggr : Aggregations) {
// aggr->MutableWriteMeta().SetWriteMiddle6StartInstant(instant);
// }
}

std::vector<NKikimr::NOlap::TWritingBlob> TWritingBuffer::GroupIntoBlobs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class TWritingBlob {

class TWriteAggregation {
private:
NEvWrite::TWriteMeta WriteMeta;
std::shared_ptr<NEvWrite::TWriteMeta> WriteMeta;
YDB_READONLY(ui64, SchemaVersion, 0);
YDB_READONLY(ui64, Size, 0);
YDB_READONLY(ui64, Rows, 0);
Expand All @@ -109,11 +109,15 @@ class TWriteAggregation {
}

const NEvWrite::TWriteMeta& GetWriteMeta() const {
return *WriteMeta;
}

const std::shared_ptr<NEvWrite::TWriteMeta>& GetWriteMetaPtr() const {
return WriteMeta;
}

NEvWrite::TWriteMeta& MutableWriteMeta() {
return WriteMeta;
return *WriteMeta;
}

void AddInsertWriteId(const TInsertWriteId id) {
Expand All @@ -122,12 +126,13 @@ class TWriteAggregation {

TWriteAggregation(const NEvWrite::TWriteData& writeData, std::vector<NArrow::TSerializedBatch>&& splittedBlobs,
const std::shared_ptr<arrow::RecordBatch>& batch)
: WriteMeta(writeData.GetWriteMeta())
: WriteMeta(writeData.GetWriteMetaPtr())
, SchemaVersion(writeData.GetData()->GetSchemaVersion())
, Size(writeData.GetSize())
, BlobsAction(writeData.GetBlobsAction())
, SchemaSubset(writeData.GetSchemaSubsetVerified())
, RecordBatch(batch) {
AFL_VERIFY(WriteMeta);
for (auto&& s : splittedBlobs) {
SplittedBlobs.emplace_back(std::move(s), *this);
}
Expand All @@ -137,7 +142,7 @@ class TWriteAggregation {
}

TWriteAggregation(const NEvWrite::TWriteData& writeData)
: WriteMeta(writeData.GetWriteMeta())
: WriteMeta(writeData.GetWriteMetaPtr())
, SchemaVersion(writeData.GetData()->GetSchemaVersion())
, Size(writeData.GetSize())
, BlobsAction(writeData.GetBlobsAction()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/operations/batch_builder/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class TBuildBatchesTask: public NConveyor::ITask, public NColumnShard::TMonitori
: WriteData(std::move(writeData))
, ActualSnapshot(context.GetApplyToSnapshot())
, Context(context) {
WriteData.MutableWriteMeta().OnStage(NEvWrite::EWriteStage::BuildBatch);
}
};
} // namespace NKikimr::NOlap
24 changes: 22 additions & 2 deletions ydb/core/tx/columnshard/operations/events.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "events.h"

#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>

namespace NKikimr::NColumnShard {

Expand All @@ -15,6 +15,26 @@ void TInsertedPortion::Finalize(TColumnShard* shard, NTabletFlatExecutor::TTrans
PortionInfoConstructor = nullptr;
}

TWriteResult::TWriteResult(const std::shared_ptr<NEvWrite::TWriteMeta>& writeMeta, const ui64 dataSize,
const std::shared_ptr<arrow::RecordBatch>& pkBatch,
const bool noDataToWrite, const ui32 recordsCount)
: WriteMeta(writeMeta)
, DataSize(dataSize)
, NoDataToWrite(noDataToWrite)
, PKBatch(pkBatch)
, RecordsCount(recordsCount) {
AFL_VERIFY(WriteMeta);
WriteMeta->OnStage(NEvWrite::EWriteStage::Result);
}

} // namespace NKikimr::NColumnShard

namespace NKikimr::NColumnShard::NPrivateEvents::NWrite {}
namespace NKikimr::NColumnShard::NPrivateEvents::NWrite {
TEvWritePortionResult::TEvWritePortionResult(const NKikimrProto::EReplyStatus writeStatus,
const std::shared_ptr<NOlap::IBlobsWritingAction>& writeAction, TInsertedPortions&& insertedData)
: WriteStatus(writeStatus)
, WriteAction(writeAction)
, InsertedData(std::move(insertedData)) {
}

} // namespace NKikimr::NColumnShard::NPrivateEvents::NWrite
28 changes: 13 additions & 15 deletions ydb/core/tx/columnshard/operations/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TInsertedPortion {

class TWriteResult {
private:
NEvWrite::TWriteMeta WriteMeta;
std::shared_ptr<NEvWrite::TWriteMeta> WriteMeta;
YDB_READONLY(ui64, DataSize, 0);
YDB_READONLY(bool, NoDataToWrite, false);
std::shared_ptr<arrow::RecordBatch> PKBatch;
Expand All @@ -42,18 +42,19 @@ class TWriteResult {
}

const NEvWrite::TWriteMeta& GetWriteMeta() const {
return WriteMeta;
return *WriteMeta;
}

NEvWrite::TWriteMeta& MutableWriteMeta() const {
return *WriteMeta;
}

TWriteResult(const NEvWrite::TWriteMeta& writeMeta, const ui64 dataSize, const std::shared_ptr<arrow::RecordBatch>& pkBatch,
const bool noDataToWrite, const ui32 recordsCount)
: WriteMeta(writeMeta)
, DataSize(dataSize)
, NoDataToWrite(noDataToWrite)
, PKBatch(pkBatch)
, RecordsCount(recordsCount)
{
const std::shared_ptr<NEvWrite::TWriteMeta>& GetWriteMetaPtr() const {
return WriteMeta;
}

TWriteResult(const std::shared_ptr<NEvWrite::TWriteMeta>& writeMeta, const ui64 dataSize, const std::shared_ptr<arrow::RecordBatch>& pkBatch,
const bool noDataToWrite, const ui32 recordsCount);
};

class TInsertedPortions {
Expand All @@ -69,6 +70,7 @@ class TInsertedPortions {
AFL_VERIFY(WriteResults.size());
std::optional<ui64> pathId;
for (auto&& i : WriteResults) {
i.GetWriteMeta().OnStage(NEvWrite::EWriteStage::Finished);
AFL_VERIFY(!i.GetWriteMeta().HasLongTxId());
if (!pathId) {
pathId = i.GetWriteMeta().GetTableId();
Expand Down Expand Up @@ -100,11 +102,7 @@ class TEvWritePortionResult: public TEventLocal<TEvWritePortionResult, TEvPrivat
}

TEvWritePortionResult(const NKikimrProto::EReplyStatus writeStatus, const std::shared_ptr<NOlap::IBlobsWritingAction>& writeAction,
TInsertedPortions&& insertedData)
: WriteStatus(writeStatus)
, WriteAction(writeAction)
, InsertedData(std::move(insertedData)) {
}
TInsertedPortions&& insertedData);
};

} // namespace NKikimr::NColumnShard::NPrivateEvents::NWrite
11 changes: 5 additions & 6 deletions ydb/core/tx/columnshard/operations/slice_builder/builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ class TPortionWriteController: public NColumnShard::IWriteController,
};

TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) {
const NActors::TLogContextGuard g = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_WRITE)("tablet_id", TabletId)("parent_id",
Context.GetTabletActorId())("write_id", WriteData.GetWriteMeta().GetWriteId())("table_id", WriteData.GetWriteMeta().GetTableId());
const NActors::TLogContextGuard g = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_WRITE)("tablet_id", TabletId)(
"parent_id", Context.GetTabletActorId())("write_id", WriteData.GetWriteMeta().GetWriteId())(
"table_id", WriteData.GetWriteMeta().GetTableId());
if (!Context.IsActive()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "abort_execution");
ReplyError("execution aborted", NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal);
Expand All @@ -113,7 +114,7 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta
}
if (WriteData.GetWritePortions()) {
if (OriginalBatch->num_rows() == 0) {
NColumnShard::TWriteResult wResult(WriteData.GetWriteMeta(), WriteData.GetSize(), nullptr, true, 0);
NColumnShard::TWriteResult wResult(WriteData.GetWriteMetaPtr(), WriteData.GetSize(), nullptr, true, 0);
NColumnShard::TInsertedPortions pack({ wResult }, {});
auto result = std::make_unique<NColumnShard::NPrivateEvents::NWrite::TEvWritePortionResult>(
NKikimrProto::EReplyStatus::OK, nullptr, std::move(pack));
Expand All @@ -123,7 +124,7 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta
NArrow::TColumnOperator().Extract(OriginalBatch, Context.GetActualSchema()->GetIndexInfo().GetPrimaryKey()->fields());
auto batches = NArrow::NMerger::TRWSortableBatchPosition::SplitByBordersInIntervalPositions(OriginalBatch,
Context.GetActualSchema()->GetIndexInfo().GetPrimaryKey()->field_names(), WriteData.GetData()->GetSeparationPoints());
NColumnShard::TWriteResult wResult(WriteData.GetWriteMeta(), WriteData.GetSize(), pkBatch, false, OriginalBatch->num_rows());
NColumnShard::TWriteResult wResult(WriteData.GetWriteMetaPtr(), WriteData.GetSize(), pkBatch, false, OriginalBatch->num_rows());
std::vector<TPortionWriteController::TInsertPortion> portions;
for (auto&& batch : batches) {
if (!batch) {
Expand Down Expand Up @@ -174,9 +175,7 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta
OriginalBatch = NArrow::ToBatch(normalized->BuildTableVerified(), true);
}
}
WriteData.MutableWriteMeta().SetWriteMiddle2StartInstant(TMonotonic::Now());
auto batches = BuildSlices();
WriteData.MutableWriteMeta().SetWriteMiddle3StartInstant(TMonotonic::Now());
if (batches) {
auto writeDataPtr = std::make_shared<NEvWrite::TWriteData>(std::move(WriteData));
writeDataPtr->SetSchemaSubset(std::move(subset));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/operations/slice_builder/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class TBuildSlicesTask: public NConveyor::ITask, public NColumnShard::TMonitorin
, TabletId(WriteData.GetWriteMeta().GetTableId())
, OriginalBatch(batch)
, Context(context) {
WriteData.MutableWriteMeta().OnStage(NEvWrite::EWriteStage::BuildSlices);
}
};
} // namespace NKikimr::NOlap
Loading

0 comments on commit 3d22318

Please sign in to comment.