Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support automatic resizing for variable-size page blobs #329

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ struct RemoveRequestInfo
const uint MAX_UPLOAD_SIZE = 1024 * 1024;
const uint MAX_DOWNLOAD_SIZE = 1024 * 1024;

const long MAX_PAGEBLOB_SIZE = 512L * 1024 * 1024 * 1024; // set this at 512 GB for now TODO consider implications
// some blobs have no specific size; for example, they are used for the object log, and for checkpoints.
// we start them at 512 GB - there is no cost incurred for empty pages so why not.
// if that turns out to not be enough at some point, we enlarge the page blob automatically.
const long STARTING_DEFAULT_PAGEBLOB_SIZE = 512L * 1024 * 1024 * 1024;

/// <summary>
/// Constructs a new AzureStorageDevice instance, backed by Azure Page Blobs
Expand Down Expand Up @@ -425,7 +428,7 @@ public override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong desti
var pageBlob = this.pageBlobDirectory.GetPageBlobClient(this.GetSegmentBlobName(segmentId));

// If segment size is -1 we use a default
var size = this.segmentSize == -1 ? AzureStorageDevice.MAX_PAGEBLOB_SIZE : this.segmentSize;
var size = this.segmentSize == -1 ? AzureStorageDevice.STARTING_DEFAULT_PAGEBLOB_SIZE : this.segmentSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to also have a configurable max blob size? Sometimes it's good to fail fast if state is getting too large and to ask the user to deliberately opt into increasing some config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting a max size for page blobs in settings makes sense to me. Will add that.


// If no blob exists for the segment, we must first create the segment asynchronouly. (Create call takes ~70 ms by measurement)
// After creation is done, we can call write.
Expand Down Expand Up @@ -469,15 +472,46 @@ await this.BlobManager.PerformWithRetriesAsync(
{
var client = numAttempts > 2 ? blobEntry.PageBlob.Default : blobEntry.PageBlob.Aggressive;

var response = await client.UploadPagesAsync(
content: stream,
offset: destinationAddress + offset,
transactionalContentHash: null,
conditions: this.underLease ? new PageBlobRequestConditions() { IfMatch = blobEntry.ETag } : null,
progressHandler: null,
cancellationToken: this.PartitionErrorHandler.Token).ConfigureAwait(false);
try
{
var response = await client.UploadPagesAsync(
content: stream,
offset: destinationAddress + offset,
transactionalContentHash: null,
conditions: this.underLease ? new PageBlobRequestConditions() { IfMatch = blobEntry.ETag } : null,
progressHandler: null,
cancellationToken: this.PartitionErrorHandler.Token).ConfigureAwait(false);

blobEntry.ETag = response.Value.ETag;
}
catch (Azure.RequestFailedException e) when (e.ErrorCode == "InvalidPageRange")
{
// this kind of error can indicate that the page blob is too small.
// first, compute desired size to request
long currentSize = (await client.GetPropertiesAsync().ConfigureAwait(false)).Value.ContentLength;
long sizeToRequest = currentSize;
long sizeToAccommodate = destinationAddress + offset + length + 1;
while (sizeToAccommodate > sizeToRequest)
{
sizeToRequest <<= 1;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log the current page size anywhere? Or at least that we're increasing the blog size? I can imagine this is a good signal of a potentially unhealthy app: if we have to continuously increase the size, something may be wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The page size increase does already get logged as a warning, because the ForceRetryException that we throw.


blobEntry.ETag = response.Value.ETag;
if (sizeToRequest <= currentSize)
{
throw e; // blob is already big enough, so this exception was thrown for some other reason
}
else
{
// enlarge the blob to accommodate the size
await client.ResizeAsync(
sizeToRequest,
conditions: this.underLease ? new PageBlobRequestConditions() { IfMatch = blobEntry.ETag } : null,
cancellationToken: this.PartitionErrorHandler.Token).ConfigureAwait(false);

// force retry
throw new BlobUtils.ForceRetryException($"page blob was enlarged from {currentSize} to {sizeToRequest}", e);
}
}
}

return (long)length;
Expand All @@ -500,11 +534,17 @@ async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlob
{
using (stream)
{
// we use this to prevent reading past the end of the page blob
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't immediately understand why enlarging the blobs can suddenly cause reads to go past the end of the page blob. Can you please call out the connection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because these blobs don't actually have a fixed size from the perspective of FASTER, the FASTER reads may easily read past the end of the blob. This is not new - we just never saw it because 512GB was always way larger than needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think it would be good to write that in the inlined comment being referenced here. This also answers my question here: #329 (comment)

// but for performance reasons (Azure storage access required to determine current size of page blob)
// we set it lazily, i.e. only after a request failed
long? readCap = null;

long offset = 0;
while (readLength > 0)
{
var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE);

// determine how much we are going to try to read in this portion
var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE);

await this.BlobManager.PerformWithRetriesAsync(
BlobManager.AsynchronousStorageReadMaxConcurrency,
true,
Expand All @@ -516,31 +556,83 @@ await this.BlobManager.PerformWithRetriesAsync(
true,
async (numAttempts) =>
{
if (numAttempts > 0)
{
stream.Seek(offset, SeekOrigin.Begin); // must go back to original position before retrying
}
stream.Seek(offset, SeekOrigin.Begin);

if (length > 0)
{
var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive;
long requestedLength = length;

var response = await client.DownloadStreamingAsync(
range: new Azure.HttpRange(sourceAddress + offset, length),
conditions: null,
rangeGetContentHash: false,
cancellationToken: this.PartitionErrorHandler.Token)
.ConfigureAwait(false);
if (readCap.HasValue && sourceAddress + offset + requestedLength > readCap.Value)
{
requestedLength = readCap.Value - (sourceAddress + offset);

using (var streamingResult = response.Value)
if (requestedLength <= 0)
{
await streamingResult.Content.CopyToAsync(stream).ConfigureAwait(false);
requestedLength = 0;
}
}

if (stream.Position != offset + length)
if (requestedLength > 0)
{
throw new InvalidDataException($"wrong amount of data received from page blob, expected={length}, actual={stream.Position}");
var client = (numAttempts > 1 || requestedLength == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment on what this controls? What is blob.Aggressive?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they control the length of the timeout (with newer storage SDK the timeout can no longer be passed as a parameter to the operation, since it is a constructor parameter for the client, so we need multiple clients to control the timeout length).

BTW this was not modified in this PR, the diff just does not show this well.


try
{
var response = await client.DownloadStreamingAsync(
range: new Azure.HttpRange(sourceAddress + offset, requestedLength),
conditions: null,
rangeGetContentHash: false,
cancellationToken: this.PartitionErrorHandler.Token)
.ConfigureAwait(false);

using (var streamingResult = response.Value)
{
await streamingResult.Content.CopyToAsync(stream).ConfigureAwait(false);
}

// we have observed that we may get 206 responses where the actual length is less than the requested length
// this seems to only happen if blob was enlarged in the past
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what are we doing if we get a 206? I actually don't see us checking the response status explicitly, should we?
nit: let's also add in parenthesis the meaning of 206: "partial response", right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SDK internally takes care of interpreting the status response when I access the streaming response (response.Value). From my perspective, what I see in this case is that the stream returned has fewer bytes than requested.

BTW, I have seen a few 206 during testing early on but later no longer saw them. I figured it makes sense to handle them just in case they pop up. It is pretty easy to handle them by just adjusting the next portion to start where the 206 ended.

long actualLength = (stream.Position - offset);

if (actualLength < requestedLength)
{
this.BlobManager.StorageTracer?.FasterStorageProgress($"PageBlob.DownloadStreamingAsync id={id} returned partial response range={response.Value.Details.ContentRange} requestedLength={requestedLength} actualLength={actualLength}");

if (actualLength == 0)
{
throw new InvalidDataException($"PageBlob.DownloadStreamingAsync returned empty response, range={response.Value.Details.ContentRange} requestedLength={requestedLength} ");
}
else if (actualLength % 512 != 0)
{
throw new InvalidDataException($"PageBlob.DownloadStreamingAsync returned unaligned response, range={response.Value.Details.ContentRange} requestedLength={requestedLength} actualLength=${actualLength}");
}
else
{
length = (uint)actualLength; // adjust length to actual length read so the next read will start where this read ended
}
}
else if (actualLength > requestedLength)
{
throw new InvalidDataException($"PageBlob.DownloadStreamingAsync returned too much data, range={response.Value.Details.ContentRange} requestedLength={requestedLength} actualLength=${actualLength}");
}
}
catch (Azure.RequestFailedException e) when (e.ErrorCode == "InvalidRange")
{
// this type of error can be caused by the page blob being smaller than where FASTER is reading from.
// to handle this, first determine current page blob size.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I fully understand this comment. What does it mean to be smaller than where FASTER is reading from? Can we add more detail?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FASTER may be reading a range that extends past the end of the blob.

var properties = await client.GetPropertiesAsync().ConfigureAwait(false);
readCap = properties.Value.ContentLength;

if (sourceAddress + offset + requestedLength <= readCap.Value)
{
// page blob is big enough, so this exception was thrown for some other reason
throw e;
}
else
{
// page blob was indeed too small; now that we have set a read cap, force a retry
// so we can read an adjusted portion
throw new BlobUtils.ForceRetryException($"reads now capped at {readCap}", e);
}
}
}

return length;
Expand Down
26 changes: 26 additions & 0 deletions src/DurableTask.Netherite/Util/BlobUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public static bool IsTransientStorageError(Exception exception)
return true;
}

if (exception is ForceRetryException)
{
return true;
}

// Empirically observed: timeouts on synchronous calls
if (exception.InnerException is TimeoutException)
{
Expand Down Expand Up @@ -126,6 +131,27 @@ static bool httpStatusIndicatesTransientError(int? statusCode) =>
|| statusCode == 504); //504 Gateway Timeout


/// <summary>
/// A custom exception class that we use to explicitly force a retry after a transient error.
/// By using an exception we ensure that we stay under the total retry count and generate the proper tracing.
/// </summary>
public class ForceRetryException : Exception
{
public ForceRetryException()
{
}

public ForceRetryException(string message)
: base(message)
{
}

public ForceRetryException(string message, Exception inner)
: base(message, inner)
{
}
}

// Lease error codes are documented at https://docs.microsoft.com/en-us/rest/api/storageservices/lease-blob

public static bool LeaseConflictOrExpired(StorageException e)
Expand Down
Loading