diff --git a/cloud/blockstore/libs/storage/volume/model/merge.cpp b/cloud/blockstore/libs/storage/volume/model/merge.cpp index f6a3beed449..c88b66b9f9f 100644 --- a/cloud/blockstore/libs/storage/volume/model/merge.cpp +++ b/cloud/blockstore/libs/storage/volume/model/merge.cpp @@ -44,4 +44,120 @@ void MergeStripedBitMask( } } +void MergeDescribeBlocksResponse( + NProto::TDescribeBlocksResponse& src, + NProto::TDescribeBlocksResponse& dst, + const ui32 blocksPerStripe, + const ui32 blockSize, + const ui32 partitionsCount, + const ui32 partitionId) +{ + for (const auto& freshBlockRange: src.GetFreshBlockRanges()) { + SplitFreshBlockRangeFromRelativeToGlobalIndices( + freshBlockRange, + blocksPerStripe, + blockSize, + partitionsCount, + partitionId, + &dst); + } + + const auto& srcBlobPieces = src.GetBlobPieces(); + + for (const auto& blobPiece: srcBlobPieces) { + NProto::TBlobPiece dstBlobPiece; + dstBlobPiece.MutableBlobId()->CopyFrom(blobPiece.GetBlobId()); + dstBlobPiece.SetBSGroupId(blobPiece.GetBSGroupId()); + + for (const auto& srcRange: blobPiece.GetRanges()) { + SplitBlobPieceRangeFromRelativeToGlobalIndices( + srcRange, + blocksPerStripe, + partitionsCount, + partitionId, + &dstBlobPiece); + } + dst.MutableBlobPieces()->Add(std::move(dstBlobPiece)); + } +} + +void SplitFreshBlockRangeFromRelativeToGlobalIndices( + const NProto::TFreshBlockRange& srcRange, + const ui32 blocksPerStripe, + const ui32 blockSize, + const ui32 partitionsCount, + const ui32 partitionId, + NProto::TDescribeBlocksResponse* dst) +{ + const ui32 startIndex = srcRange.GetStartIndex(); + ui32 blocksCount = 0; + + const char* srcRangePtr = srcRange.GetBlocksContent().Data(); + while (blocksCount < srcRange.GetBlocksCount()) { + const auto index = RelativeToGlobalIndex( + blocksPerStripe, + startIndex + blocksCount, + partitionsCount, + partitionId); + + const auto stripeRange = CalculateStripeRange(blocksPerStripe, index); + + const auto rangeBlocksCount = std::min( + static_cast(stripeRange.End) - index + 1, + static_cast(srcRange.GetBlocksCount()) - blocksCount); + + NProto::TFreshBlockRange dstRange; + dstRange.SetStartIndex(index); + dstRange.SetBlocksCount(rangeBlocksCount); + + const ui64 bytesCount = rangeBlocksCount * blockSize; + dstRange.MutableBlocksContent()->resize(bytesCount); + char* dstRangePtr = dstRange.MutableBlocksContent()->begin(); + std::memcpy( + dstRangePtr, + srcRangePtr, + bytesCount); + + srcRangePtr += bytesCount; + blocksCount += rangeBlocksCount; + + dst->MutableFreshBlockRanges()->Add(std::move(dstRange)); + } +} + +void SplitBlobPieceRangeFromRelativeToGlobalIndices( + const NProto::TRangeInBlob& srcRange, + const ui32 blocksPerStripe, + const ui32 partitionsCount, + const ui32 partitionId, + NProto::TBlobPiece* dstBlobPiece) +{ + const ui32 blobOffset = srcRange.GetBlobOffset(); + const ui32 blockIndex = srcRange.GetBlockIndex(); + ui32 blocksCount = 0; + + while (blocksCount < srcRange.GetBlocksCount()) { + const auto index = RelativeToGlobalIndex( + blocksPerStripe, + blockIndex + blocksCount, + partitionsCount, + partitionId); + + const auto stripeRange = CalculateStripeRange(blocksPerStripe, index); + + const auto rangeBlocksCount = std::min( + static_cast(stripeRange.End) - index + 1, + static_cast(srcRange.GetBlocksCount()) - blocksCount); + + NProto::TRangeInBlob dstRange; + dstRange.SetBlobOffset(blobOffset + blocksCount); + dstRange.SetBlockIndex(index); + dstRange.SetBlocksCount(rangeBlocksCount); + + blocksCount += rangeBlocksCount; + + dstBlobPiece->MutableRanges()->Add(std::move(dstRange)); + } +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/model/merge.h b/cloud/blockstore/libs/storage/volume/model/merge.h index b00cb681de5..ecd833ab376 100644 --- a/cloud/blockstore/libs/storage/volume/model/merge.h +++ b/cloud/blockstore/libs/storage/volume/model/merge.h @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace NCloud::NBlockStore::NStorage { @@ -15,4 +17,27 @@ void MergeStripedBitMask( const TString& srcMask, TString& dstMask); +void MergeDescribeBlocksResponse( + NProto::TDescribeBlocksResponse& src, + NProto::TDescribeBlocksResponse& dst, + const ui32 blocksPerStripe, + const ui32 blockSize, + const ui32 partitionsCount, + const ui32 partitionId); + +void SplitFreshBlockRangeFromRelativeToGlobalIndices( + const NProto::TFreshBlockRange& srcRange, + const ui32 blocksPerStripe, + const ui32 blockSize, + const ui32 partitionsCount, + const ui32 partitionId, + NProto::TDescribeBlocksResponse* dst); + +void SplitBlobPieceRangeFromRelativeToGlobalIndices( + const NProto::TRangeInBlob& srcRange, + const ui32 blocksPerStripe, + const ui32 partitionsCount, + const ui32 partitionId, + NProto::TBlobPiece* dstBlobPiece); + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/model/merge_ut.cpp b/cloud/blockstore/libs/storage/volume/model/merge_ut.cpp index 70d3d9b1829..d6a8a665079 100644 --- a/cloud/blockstore/libs/storage/volume/model/merge_ut.cpp +++ b/cloud/blockstore/libs/storage/volume/model/merge_ut.cpp @@ -130,6 +130,71 @@ Y_UNIT_TEST_SUITE(TMergeTest) } } + Y_UNIT_TEST(ShouldSplitFreshBlockRangeFromRelativeToGlobalIndices) + { + NProto::TFreshBlockRange freshData; + freshData.SetStartIndex(5); + freshData.SetBlocksCount(8); + freshData.MutableBlocksContent()->append(TString(1024, 'X')); + + NProto::TDescribeBlocksResponse dst; + SplitFreshBlockRangeFromRelativeToGlobalIndices( + freshData, + 4, // blocksPerStripe + 128, // blockSize + 2, // partitionsCount + 0, // partitionId + &dst); + + UNIT_ASSERT_VALUES_EQUAL(3, dst.FreshBlockRangesSize()); + + const auto& range1 = dst.GetFreshBlockRanges(0); + UNIT_ASSERT_VALUES_EQUAL(9, range1.GetStartIndex()); + UNIT_ASSERT_VALUES_EQUAL(3, range1.GetBlocksCount()); + + const auto& range2 = dst.GetFreshBlockRanges(1); + UNIT_ASSERT_VALUES_EQUAL(16, range2.GetStartIndex()); + UNIT_ASSERT_VALUES_EQUAL(4, range2.GetBlocksCount()); + + TString actualContent; + for (size_t i = 0; i < dst.FreshBlockRangesSize(); ++i) { + const auto& freshRange = dst.GetFreshBlockRanges(i); + actualContent += freshRange.GetBlocksContent(); + } + + UNIT_ASSERT_VALUES_EQUAL(1024, actualContent.size()); + for (size_t i = 0; i < actualContent.size(); i++) { + UNIT_ASSERT_VALUES_EQUAL('X', actualContent[i]); + } + } + + Y_UNIT_TEST(ShouldSplitBlobPieceRangeFromRelativeToGlobalIndices) + { + NProto::TRangeInBlob rangeInBlob; + rangeInBlob.SetBlobOffset(10); + rangeInBlob.SetBlockIndex(13); + rangeInBlob.SetBlocksCount(1024); + + NProto::TBlobPiece dst; + SplitBlobPieceRangeFromRelativeToGlobalIndices( + rangeInBlob, + 4, // blocksPerStripe + 2, // partitionsCount + 0, // partitionId + &dst); + + UNIT_ASSERT_VALUES_EQUAL(257, dst.RangesSize()); + + const auto& range1 = dst.GetRanges(0); + UNIT_ASSERT_VALUES_EQUAL(10, range1.GetBlobOffset()); + UNIT_ASSERT_VALUES_EQUAL(25, range1.GetBlockIndex()); + UNIT_ASSERT_VALUES_EQUAL(3, range1.GetBlocksCount()); + + const auto& range2 = dst.GetRanges(1); + UNIT_ASSERT_VALUES_EQUAL(13, range2.GetBlobOffset()); + UNIT_ASSERT_VALUES_EQUAL(32, range2.GetBlockIndex()); + UNIT_ASSERT_VALUES_EQUAL(4, range2.GetBlocksCount()); + } } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/model/stripe.cpp b/cloud/blockstore/libs/storage/volume/model/stripe.cpp index f2b514b590f..3b7b3b1cc52 100644 --- a/cloud/blockstore/libs/storage/volume/model/stripe.cpp +++ b/cloud/blockstore/libs/storage/volume/model/stripe.cpp @@ -49,6 +49,16 @@ ui64 RelativeToGlobalIndex( return stripe * blocksPerStripe + offsetInStripe; } +TBlockRange64 CalculateStripeRange( + const ui32 blocksPerStripe, + const ui64 globalIndex) +{ + const auto stripeInd = globalIndex / blocksPerStripe; + return TBlockRange64::WithLength( + stripeInd * blocksPerStripe, + blocksPerStripe); +} + ui32 CalculateRequestCount( const ui32 blocksPerStripe, const TBlockRange64& original, diff --git a/cloud/blockstore/libs/storage/volume/model/stripe.h b/cloud/blockstore/libs/storage/volume/model/stripe.h index 3cc7c6dfedf..2856c2c8b47 100644 --- a/cloud/blockstore/libs/storage/volume/model/stripe.h +++ b/cloud/blockstore/libs/storage/volume/model/stripe.h @@ -31,6 +31,10 @@ ui64 RelativeToGlobalIndex( const ui32 partitionCount, const ui32 partitionId); +TBlockRange64 CalculateStripeRange( + const ui32 blocksPerStripe, + const ui64 globalIndex); + ui32 CalculateRequestCount( const ui32 blocksPerStripe, const TBlockRange64& original, diff --git a/cloud/blockstore/libs/storage/volume/model/stripe_ut.cpp b/cloud/blockstore/libs/storage/volume/model/stripe_ut.cpp index 8fa779b74b1..276c14e4493 100644 --- a/cloud/blockstore/libs/storage/volume/model/stripe_ut.cpp +++ b/cloud/blockstore/libs/storage/volume/model/stripe_ut.cpp @@ -152,6 +152,25 @@ Y_UNIT_TEST_SUITE(TStripeTest) UNIT_ASSERT_VALUES_EQUAL(75, RelativeToGlobalIndex(10, 25, 3, 1)); } + Y_UNIT_TEST(ShouldCalculateStripeRange) + { + UNIT_ASSERT_VALUES_EQUAL( + DescribeRange(TBlockRange64::WithLength(0, 10)), + DescribeRange(CalculateStripeRange(10, 0))); + + UNIT_ASSERT_VALUES_EQUAL( + DescribeRange(TBlockRange64::WithLength(0, 10)), + DescribeRange(CalculateStripeRange(10, 5))); + + UNIT_ASSERT_VALUES_EQUAL( + DescribeRange(TBlockRange64::WithLength(10, 10)), + DescribeRange(CalculateStripeRange(10, 15))); + + UNIT_ASSERT_VALUES_EQUAL( + DescribeRange(TBlockRange64::WithLength(10, 10)), + DescribeRange(CalculateStripeRange(10, 19))); + } + Y_UNIT_TEST(ShouldCalculateRequestCount) { UNIT_ASSERT_VALUES_EQUAL( diff --git a/cloud/blockstore/libs/storage/volume/partition_requests.h b/cloud/blockstore/libs/storage/volume/partition_requests.h index a68d30c6088..fc4a153194a 100644 --- a/cloud/blockstore/libs/storage/volume/partition_requests.h +++ b/cloud/blockstore/libs/storage/volume/partition_requests.h @@ -678,6 +678,7 @@ class TPartitionRequestActor final const ui64 VolumeRequestId; const TBlockRange64 OriginalRange; const ui32 BlocksPerStripe; + const ui32 BlockSize; const ui32 PartitionsCount; TVector> PartitionRequests; const TRequestTraceInfo TraceInfo; @@ -695,6 +696,7 @@ class TPartitionRequestActor final ui64 volumeRequestId, TBlockRange64 originalRange, ui32 blocksPerStripe, + ui32 blockSize, ui32 partitionsCount, TVector> partitionRequests, TRequestTraceInfo traceInfo); @@ -865,6 +867,25 @@ class TPartitionRequestActor final } } + void Merge( + NProto::TDescribeBlocksResponse& src, + ui32 requestNo, + NProto::TDescribeBlocksResponse& dst) + { + if (FAILED(src.GetError().GetCode())) { + *dst.MutableError() = std::move(*src.MutableError()); + return; + } + + MergeDescribeBlocksResponse( + src, + dst, + BlocksPerStripe, + BlockSize, + PartitionsCount, + PartitionRequests[requestNo].PartitionId); + } + template void Merge( T& src, @@ -908,6 +929,7 @@ TPartitionRequestActor::TPartitionRequestActor( ui64 volumeRequestId, TBlockRange64 originalRange, ui32 blocksPerStripe, + ui32 blockSize, ui32 partitionsCount, TVector> partitionRequests, TRequestTraceInfo traceInfo) @@ -916,6 +938,7 @@ TPartitionRequestActor::TPartitionRequestActor( , VolumeRequestId(volumeRequestId) , OriginalRange(originalRange) , BlocksPerStripe(blocksPerStripe) + , BlockSize(blockSize) , PartitionsCount(partitionsCount) , PartitionRequests(std::move(partitionRequests)) , TraceInfo(std::move(traceInfo)) diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp index cce3aab7c17..3e9f777a8ca 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp @@ -27,6 +27,12 @@ namespace { //////////////////////////////////////////////////////////////////////////////// +template +constexpr bool IsDescribeBlocksMethod = + std::is_same_v; + +//////////////////////////////////////////////////////////////////////////////// + template bool CanForwardToPartition(ui32 partitionCount) { @@ -127,7 +133,11 @@ bool TVolumeActor::HandleRequest( return false; } - if (partitionRequests.size() == 1) { + // Should always forward request via TPartitionRequestActor for + // DesribeBlocks method and multi-partitioned volume. + if (State->GetPartitions().size() == 1 || + (partitionRequests.size() == 1 && !IsDescribeBlocksMethod)) + { ev->Get()->Record = std::move(partitionRequests.front().Event->Record); SendRequestToPartition( ctx, @@ -166,6 +176,7 @@ bool TVolumeActor::HandleRequest( volumeRequestId, blockRange, blocksPerStripe, + State->GetBlockSize(), State->GetPartitions().size(), std::move(partitionRequests), TRequestTraceInfo(isTraced, traceTs, TraceSerializer)); diff --git a/cloud/blockstore/libs/storage/volume/volume_ut.cpp b/cloud/blockstore/libs/storage/volume/volume_ut.cpp index 92c1395384f..ec7329f944d 100644 --- a/cloud/blockstore/libs/storage/volume/volume_ut.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_ut.cpp @@ -3632,6 +3632,115 @@ Y_UNIT_TEST_SUITE(TVolumeTest) } } + Y_UNIT_TEST(ShouldHandleDescribeBlocksRequestForMultipartitionVolume) + { + auto runtime = PrepareTestActorRuntime(); + + TVolumeClient volume(*runtime); + volume.UpdateVolumeConfig( + 0, // maxBandwidth + 0, // maxIops + 0, // burstPercentage + 0, // maxPostponedWeight + false, // throttlingEnabled + 1, // version + NCloud::NProto::EStorageMediaKind::STORAGE_MEDIA_HDD, + 8192, // block count per partition + "vol0", // diskId + "cloud", // cloudId + "folder", // folderId + 2, // partitions count + 2 // blocksPerStripe + ); + volume.WaitReady(); + + auto clientInfo = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + + volume.AddClient(clientInfo); + + auto range = TBlockRange64::WithLength(1, 8192); + volume.WriteBlocks(range, clientInfo.GetClientId(), 'X'); + + auto request = volume.CreateDescribeBlocksRequest( + range, + clientInfo.GetClientId() + ); + + volume.SendToPipe(std::move(request)); + const auto response1 = volume.RecvResponse(); + const auto& message1 = response1->Record; + + UNIT_ASSERT(SUCCEEDED(response1->GetStatus())); + UNIT_ASSERT_VALUES_EQUAL(0, message1.FreshBlockRangesSize()); + UNIT_ASSERT_VALUES_EQUAL(8, message1.BlobPiecesSize()); + + const auto& blobPiece1 = message1.GetBlobPieces(0); + UNIT_ASSERT_VALUES_EQUAL(513, blobPiece1.RangesSize()); + const auto& range1 = blobPiece1.GetRanges(0); + UNIT_ASSERT_VALUES_EQUAL(0, range1.GetBlobOffset()); + UNIT_ASSERT_VALUES_EQUAL(1, range1.GetBlockIndex()); + UNIT_ASSERT_VALUES_EQUAL(1, range1.GetBlocksCount()); + + const auto& range2 = blobPiece1.GetRanges(1); + UNIT_ASSERT_VALUES_EQUAL(1, range2.GetBlobOffset()); + UNIT_ASSERT_VALUES_EQUAL(4, range2.GetBlockIndex()); + UNIT_ASSERT_VALUES_EQUAL(2, range2.GetBlocksCount()); + + range = TBlockRange64::WithLength(9000, 256); + volume.WriteBlocks(range, clientInfo.GetClientId(), 'Y'); + + request = volume.CreateDescribeBlocksRequest( + range, + clientInfo.GetClientId() + ); + + volume.SendToPipe(std::move(request)); + const auto response2 = volume.RecvResponse(); + const auto& message2 = response2->Record; + + UNIT_ASSERT(SUCCEEDED(response2->GetStatus())); + UNIT_ASSERT_VALUES_EQUAL(256, message2.FreshBlockRangesSize()); + UNIT_ASSERT_VALUES_EQUAL(0, message2.BlobPiecesSize()); + + const auto& freshBlockRange1 = message2.GetFreshBlockRanges(0); + UNIT_ASSERT_VALUES_EQUAL(9000, freshBlockRange1.GetStartIndex()); + UNIT_ASSERT_VALUES_EQUAL(1, freshBlockRange1.GetBlocksCount()); + + TString actualContent; + for (size_t i = 0; i < message2.FreshBlockRangesSize(); ++i) { + const auto& freshRange = message2.GetFreshBlockRanges(i); + actualContent += freshRange.GetBlocksContent(); + } + + UNIT_ASSERT_VALUES_EQUAL(range.Size() * DefaultBlockSize, actualContent.size()); + for (size_t i = 0; i < actualContent.size(); i++) { + UNIT_ASSERT_VALUES_EQUAL('Y', actualContent[i]); + } + + range = TBlockRange64::WithLength(10000, 1); + volume.WriteBlocks(range, clientInfo.GetClientId(), 'Z'); + + request = volume.CreateDescribeBlocksRequest( + range, + clientInfo.GetClientId() + ); + + volume.SendToPipe(std::move(request)); + const auto response3 = volume.RecvResponse(); + const auto& message3 = response3->Record; + + UNIT_ASSERT(SUCCEEDED(response3->GetStatus())); + UNIT_ASSERT_VALUES_EQUAL(1, message3.FreshBlockRangesSize()); + UNIT_ASSERT_VALUES_EQUAL(0, message3.BlobPiecesSize()); + + const auto& freshBlockRange2 = message3.GetFreshBlockRanges(0); + UNIT_ASSERT_VALUES_EQUAL(10000, freshBlockRange2.GetStartIndex()); + UNIT_ASSERT_VALUES_EQUAL(1, freshBlockRange2.GetBlocksCount()); + } + Y_UNIT_TEST(ShouldHandleGetUsedBlocksRequest) { auto runtime = PrepareTestActorRuntime();