Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support automatic resizing for variable-size page blobs #329

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,18 @@ NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(s
this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault;
this.options.MaxEntityOperationBatchSize = this.options.MaxEntityOperationBatchSize ?? maxEntityOperationBatchSizeDefault;

// copy all applicable fields from both the options and the storageProvider options
JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options), netheriteSettings);
JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options.StorageProvider), netheriteSettings);

// collect settings from the two places that we want to import into the Netherite settings
string durableExtensionSettings = JsonConvert.SerializeObject(this.options);
string storageProviderSettings = JsonConvert.SerializeObject(this.options.StorageProvider);

// copy all applicable settings into the Netherite settings, based on matching the names
JsonConvert.PopulateObject(durableExtensionSettings, netheriteSettings);
JsonConvert.PopulateObject(storageProviderSettings, netheriteSettings);

// copy extension settings to FASTER tuning parameters, based on matching the names
netheriteSettings.FasterTuningParameters ??= new Faster.BlobManager.FasterTuningParameters();
JsonConvert.PopulateObject(storageProviderSettings, netheriteSettings.FasterTuningParameters);

// configure the cache size if not already configured
netheriteSettings.InstanceCacheSizeMB ??= (this.inConsumption ? 100 : 200 * Environment.ProcessorCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ struct RemoveRequestInfo
const uint MAX_UPLOAD_SIZE = 1024 * 1024;
const uint MAX_DOWNLOAD_SIZE = 1024 * 1024;

const long MAX_PAGEBLOB_SIZE = 512L * 1024 * 1024 * 1024; // set this at 512 GB for now TODO consider implications

/// <summary>
/// Constructs a new AzureStorageDevice instance, backed by Azure Page Blobs
/// </summary>
Expand Down Expand Up @@ -424,8 +422,8 @@ public override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong desti
{
var pageBlob = this.pageBlobDirectory.GetPageBlobClient(this.GetSegmentBlobName(segmentId));

// If segment size is -1 we use a default
var size = this.segmentSize == -1 ? AzureStorageDevice.MAX_PAGEBLOB_SIZE : this.segmentSize;
// If segment size is -1 we use the default starting size for auto-expanding page blobs.
var size = this.segmentSize == -1 ? this.BlobManager.StartingPageBlobSize : this.segmentSize;

// If no blob exists for the segment, we must first create the segment asynchronouly. (Create call takes ~70 ms by measurement)
// After creation is done, we can call write.
Expand Down Expand Up @@ -469,15 +467,55 @@ await this.BlobManager.PerformWithRetriesAsync(
{
var client = numAttempts > 2 ? blobEntry.PageBlob.Default : blobEntry.PageBlob.Aggressive;

var response = await client.UploadPagesAsync(
content: stream,
offset: destinationAddress + offset,
transactionalContentHash: null,
conditions: this.underLease ? new PageBlobRequestConditions() { IfMatch = blobEntry.ETag } : null,
progressHandler: null,
cancellationToken: this.PartitionErrorHandler.Token).ConfigureAwait(false);
try
{
var response = await client.UploadPagesAsync(
content: stream,
offset: destinationAddress + offset,
transactionalContentHash: null,
conditions: this.underLease ? new PageBlobRequestConditions() { IfMatch = blobEntry.ETag } : null,
progressHandler: null,
cancellationToken: this.PartitionErrorHandler.Token).ConfigureAwait(false);

blobEntry.ETag = response.Value.ETag;
}
catch (Azure.RequestFailedException e) when (e.ErrorCode == "InvalidPageRange")
{
// this kind of error can indicate that the page blob is too small.
// from the perspective of FASTER, this storage device is infinite, so it may write past the end of the blob.
// To deal with this situation, we dynamically enlarge this device as needed.

// first, compute desired size to request
long currentSize = (await client.GetPropertiesAsync().ConfigureAwait(false)).Value.ContentLength;
long sizeToRequest = currentSize;
long sizeToAccommodate = destinationAddress + offset + length + 1;
while (sizeToAccommodate > sizeToRequest)
{
sizeToRequest <<= 1;
}

blobEntry.ETag = response.Value.ETag;
if (sizeToRequest <= currentSize)
{
throw e; // blob is already big enough, so this exception was thrown for some other reason
}
else
{
if (sizeToRequest > this.BlobManager.MaxPageBlobSize)
{
throw new InvalidOperationException($"cannot expand page blob {blobEntry.PageBlob.Default.Name} beyond maximum size {this.BlobManager.MaxPageBlobSize}");
}

// enlarge the blob to accommodate the size
await client.ResizeAsync(
sizeToRequest,
conditions: this.underLease ? new PageBlobRequestConditions() { IfMatch = blobEntry.ETag } : null,
cancellationToken: this.PartitionErrorHandler.Token).ConfigureAwait(false);

// force retry
// this also generates a warning in the traces, containing the information about what happened
throw new BlobUtils.ForceRetryException($"page blob was enlarged from {currentSize} to {sizeToRequest}", e);
}
}
}

return (long)length;
Expand All @@ -500,11 +538,17 @@ async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlob
{
using (stream)
{
// we use this to prevent reading past the end of the page blob
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't immediately understand why enlarging the blobs can suddenly cause reads to go past the end of the page blob. Can you please call out the connection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because these blobs don't actually have a fixed size from the perspective of FASTER, the FASTER reads may easily read past the end of the blob. This is not new - we just never saw it because 512GB was always way larger than needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think it would be good to write that in the inlined comment being referenced here. This also answers my question here: #329 (comment)

// but for performance reasons (Azure storage access required to determine current size of page blob)
// we set it lazily, i.e. only after a request failed
long? readCap = null;

long offset = 0;
while (readLength > 0)
{
var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE);

// determine how much we are going to try to read in this portion
var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE);

await this.BlobManager.PerformWithRetriesAsync(
BlobManager.AsynchronousStorageReadMaxConcurrency,
true,
Expand All @@ -516,36 +560,94 @@ await this.BlobManager.PerformWithRetriesAsync(
true,
async (numAttempts) =>
{
if (numAttempts > 0)
{
stream.Seek(offset, SeekOrigin.Begin); // must go back to original position before retrying
}
stream.Seek(offset, SeekOrigin.Begin);

if (length > 0)
{
var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive;
long requestedLength = length;

var response = await client.DownloadStreamingAsync(
range: new Azure.HttpRange(sourceAddress + offset, length),
conditions: null,
rangeGetContentHash: false,
cancellationToken: this.PartitionErrorHandler.Token)
.ConfigureAwait(false);
if (readCap.HasValue && sourceAddress + offset + requestedLength > readCap.Value)
{
requestedLength = readCap.Value - (sourceAddress + offset);

using (var streamingResult = response.Value)
if (requestedLength <= 0)
{
await streamingResult.Content.CopyToAsync(stream).ConfigureAwait(false);
requestedLength = 0;
}
}

if (stream.Position != offset + length)
if (requestedLength > 0)
{
throw new InvalidDataException($"wrong amount of data received from page blob, expected={length}, actual={stream.Position}");
var client = (numAttempts > 1 || requestedLength == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment on what this controls? What is blob.Aggressive?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they control the length of the timeout (with newer storage SDK the timeout can no longer be passed as a parameter to the operation, since it is a constructor parameter for the client, so we need multiple clients to control the timeout length).

BTW this was not modified in this PR, the diff just does not show this well.


try
{
var response = await client.DownloadStreamingAsync(
range: new Azure.HttpRange(sourceAddress + offset, requestedLength),
conditions: null,
rangeGetContentHash: false,
cancellationToken: this.PartitionErrorHandler.Token)
.ConfigureAwait(false);

using (var streamingResult = response.Value)
{
await streamingResult.Content.CopyToAsync(stream).ConfigureAwait(false);
}

// We have observed that we may get 206 (Partial Response) codes where the actual length is less than the requested length
// The Azure storage client SDK handles the http codes transparently, but we may still observe that fewer bytes than
// requested were returned by the streamingResult.
long actualLength = (stream.Position - offset);

if (actualLength < requestedLength)
{
this.BlobManager.StorageTracer?.FasterStorageProgress($"PageBlob.DownloadStreamingAsync id={id} returned partial response range={response.Value.Details.ContentRange} requestedLength={requestedLength} actualLength={actualLength}");

if (actualLength == 0)
{
throw new InvalidDataException($"PageBlob.DownloadStreamingAsync returned empty response, range={response.Value.Details.ContentRange} requestedLength={requestedLength} ");
}
else if (actualLength % 512 != 0)
{
throw new InvalidDataException($"PageBlob.DownloadStreamingAsync returned unaligned response, range={response.Value.Details.ContentRange} requestedLength={requestedLength} actualLength=${actualLength}");
}
else
{
length = (uint)actualLength; // adjust length to actual length read so the next read will start where this read ended
}
}
else if (actualLength > requestedLength)
{
throw new InvalidDataException($"PageBlob.DownloadStreamingAsync returned too much data, range={response.Value.Details.ContentRange} requestedLength={requestedLength} actualLength=${actualLength}");
}
}
catch (Azure.RequestFailedException e) when (e.ErrorCode == "InvalidRange")
{
// from the perspective of FASTER, this storage device is infinite, so it may read past the end of the blob.
// But even though it requests more data than what it wrote, it will only actually use what it wrote before.
// so we can deal with this situation by just copying fewer bytes from the blob into the buffer.

// first, determine current page blob size.
var properties = await client.GetPropertiesAsync().ConfigureAwait(false);
readCap = properties.Value.ContentLength;

if (sourceAddress + offset + requestedLength <= readCap.Value)
{
// page blob is big enough, so this exception was thrown for some other reason
throw e;
}
else
{
// page blob was indeed too small; now that we have set a read cap, force a retry
// so we can read an adjusted portion
throw new BlobUtils.ForceRetryException($"reads now capped at {readCap}", e);
}
}
}

return length;
});

// adjust how much we have to read, and where to read from, in the next iteration
// based on how much was actually read in this iteration.
readLength -= length;
offset += length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager
public IDevice HybridLogDevice { get; private set; }
public IDevice ObjectLogDevice { get; private set; }

internal long StartingPageBlobSize { get; private set; }
internal long MaxPageBlobSize { get; private set; }

public DateTime IncarnationTimestamp { get; private set; }

public string ContainerName { get; }
Expand Down Expand Up @@ -90,6 +93,8 @@ public class FasterTuningParameters
public double? StoreLogMutableFraction;
public int? EstimatedAverageObjectSize;
public int? NumPagesToPreload;
public int? StartingPageBlobSizeBits;
public int? MaxPageBlobSizeBits;
}

public FasterLogSettings GetEventLogSettings(bool useSeparatePageBlobStorage, FasterTuningParameters tuningParameters)
Expand Down Expand Up @@ -333,6 +338,13 @@ public BlobManager(
this.CheckpointInfo = new CheckpointInfo();
this.CheckpointInfoETag = default;

// some page blobs have no specific size; for example, they are used for the object log, and for checkpoints.
// by default, we start those at 512 GB (by default) - there is no cost incurred for empty pages so why not.
// if that turns out to not be enough at some point, we enlarge them automatically,
// up to a maximum of 2TB (by default). For maximum for Azure Storage is 8TB.
this.StartingPageBlobSize = 1L << (this.settings.FasterTuningParameters?.StartingPageBlobSizeBits ?? 29 /* 512 GB */);
this.MaxPageBlobSize = 1L << (this.settings.FasterTuningParameters?.MaxPageBlobSizeBits ?? 31 /* 2 TB */);

if (!string.IsNullOrEmpty(settings.UseLocalDirectoryForPartitionStorage))
{
this.UseLocalFiles = true;
Expand Down
26 changes: 26 additions & 0 deletions src/DurableTask.Netherite/Util/BlobUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public static bool IsTransientStorageError(Exception exception)
return true;
}

if (exception is ForceRetryException)
{
return true;
}

// Empirically observed: timeouts on synchronous calls
if (exception.InnerException is TimeoutException)
{
Expand Down Expand Up @@ -126,6 +131,27 @@ static bool httpStatusIndicatesTransientError(int? statusCode) =>
|| statusCode == 504); //504 Gateway Timeout


/// <summary>
/// A custom exception class that we use to explicitly force a retry after a transient error.
/// By using an exception we ensure that we stay under the total retry count and generate the proper tracing.
/// </summary>
public class ForceRetryException : Exception
{
public ForceRetryException()
{
}

public ForceRetryException(string message)
: base(message)
{
}

public ForceRetryException(string message, Exception inner)
: base(message, inner)
{
}
}

// Lease error codes are documented at https://docs.microsoft.com/en-us/rest/api/storageservices/lease-blob

public static bool LeaseConflictOrExpired(StorageException e)
Expand Down
4 changes: 4 additions & 0 deletions test/PerformanceTests/host.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@
// or to "ClientOnly" to run only the client
"PartitionManagement": "EventProcessorHost",

// can use the following to test the auto-expansion of page blobs
//"StartingPageBlobSizeBits": 13, // 8 kB
//"MaxPageBlobSizeBits": 22, // 4 GB

// set this to "Local" to disable the global activity distribution algorithm
// options: "Local", "Static", "Locavore"
"ActivityScheduler": "Locavore",
Expand Down