Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fadvise:AUTO_RANDOM mode #1243

Open
wants to merge 5 commits into
base: branch-2.2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gcs/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release Notes

## Next
1. Add AUTO_RANDOM as new fadvise mode.

## 2.2.25 - 2024-08-01
1. PR #1227 - Avoid registering subscriber class multiple times
Expand Down
16 changes: 16 additions & 0 deletions gcs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,22 @@ permissions (not authorized) to execute these requests.
streaming requests as soon as first backward read or forward read for
more than `fs.gs.inputstream.inplace.seek.limit` bytes was detected.

* `AUTO_RANDOM` - It is complementing `AUTO` mode which uses sequential
mode to start with and adapts to bounded range requests. `AUTO_RANDOM`
mode uses bounded channel initially and adapts to sequential requests if
consecutive requests are within `fs.gs.inputstream.min.range.request.size`.
gzip-encode object will bypass this adoption, it will always be a
streaming(unbounded) channel. This helps in cases where egress limits is
getting breached for customer because `AUTO` mode will always lead to
one unbounded channel for a file. `AUTO_RANDOM` will avoid such unwanted
unbounded channels.

* `fs.gs.fadvise.request.track.count` (default: `3`)
singhravidutt marked this conversation as resolved.
Show resolved Hide resolved
singhravidutt marked this conversation as resolved.
Show resolved Hide resolved

Self adaptive fadvise mode uses distance between the served requests to
decide the access pattern. This property controls how many such requests
need to be tracked. It is used when `AUTO_RANDOM` is selected.

* `fs.gs.inputstream.inplace.seek.limit` (default: `8388608`)

If forward seeks are within this many bytes of the current position, seeks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ public class GoogleHadoopFileSystemConfiguration {
public static final HadoopConfigurationProperty<Integer> GCS_BATCH_THREADS =
new HadoopConfigurationProperty<>("fs.gs.batch.threads", 15);

/**
* Configuration key for number of request to track for adapting the access pattern i.e. fadvise:
* AUTO & AUTO_RANDOM.
*/
public static final HadoopConfigurationProperty<Integer> GCS_FADVISE_REQUEST_TRACK_COUNT =
new HadoopConfigurationProperty<>("fs.gs.fadvise.request.track.count", 3);

/**
* Configuration key for enabling the use of Rewrite requests for copy operations. Rewrite request
* has the same effect as Copy request, but it can handle moving large objects that may
Expand Down Expand Up @@ -673,6 +680,8 @@ private static GoogleCloudStorageReadOptions getReadChannelOptions(Configuration
.setTraceLogTimeThreshold(GCS_TRACE_LOG_TIME_THRESHOLD_MS.get(config, config::getLong))
.setTraceLogExcludeProperties(
ImmutableSet.copyOf(GCS_TRACE_LOG_EXCLUDE_PROPERTIES.getStringCollection(config)))
.setBlockSize(BLOCK_SIZE.get(config, config::getLong))
.setFadviseRequestTrackCount(GCS_FADVISE_REQUEST_TRACK_COUNT.get(config, config::getInt))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public class GoogleHadoopFileSystemConfigurationTest {
"fs.gs.write.parallel.composite.upload.part.file.cleanup.type",
PartFileCleanupType.ALWAYS);
put("fs.gs.write.parallel.composite.upload.part.file.name.prefix", "");
put("fs.gs.fadvise.request.track.count", 3);
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.gcsio;

import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.GoogleLogger;

/**
* Manages the access pattern of object being read from cloud storage. For adaptive fadvise
* configurations it computes the access pattern based on previous requests.
*/
@VisibleForTesting
class FileAccessPatternManager {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final StorageResourceId resourceId;
private final GoogleCloudStorageReadOptions readOptions;
private boolean isPatternOverriden;
private boolean randomAccess;
private long lastServedIndex = -1;
// Keeps track of distance between consecutive requests
private int consecutiveSequentialCount = 0;

public FileAccessPatternManager(
StorageResourceId resourceId, GoogleCloudStorageReadOptions readOptions) {
this.isPatternOverriden = false;
this.resourceId = resourceId;
this.readOptions = readOptions;
this.randomAccess =
readOptions.getFadvise() == Fadvise.AUTO_RANDOM
|| readOptions.getFadvise() == Fadvise.RANDOM;
}

public void updateLastServedIndex(long position) {
this.lastServedIndex = position;
}

public boolean isRandomAccessPattern() {
return randomAccess;
}

public void updateAccessPattern(long currentPosition) {
if (isPatternOverriden) {
logger.atFiner().log(
"Will bypass computing access pattern as it's overriden for resource :%s", resourceId);
return;
}
if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) {
if (isSequentialAccessPattern(currentPosition)) {
unsetRandomAccess();
}
} else if (readOptions.getFadvise() == Fadvise.AUTO) {
if (isRandomAccessPattern(currentPosition)) {
setRandomAccess();
}
}
}

/**
* This provides a way to override the access isRandomPattern, once overridden it will not be
* recomputed for adaptive fadvise types.
*
* @param isRandomPattern, true, to override with random access else false
*/
public void overrideAccessPattern(boolean isRandomPattern) {
this.isPatternOverriden = true;
this.randomAccess = isRandomPattern;
logger.atInfo().log(
"Overriding the random access pattern to %s for fadvise:%s for resource: %s ",
isRandomPattern, readOptions.getFadvise(), resourceId);
}

private boolean isSequentialAccessPattern(long currentPosition) {
if (lastServedIndex != -1) {
long distance = currentPosition - lastServedIndex;
if (distance < 0 || distance > readOptions.getInplaceSeekLimit()) {
consecutiveSequentialCount = 0;

Check warning on line 92 in gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java

View check run for this annotation

Codecov / codecov/patch

gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java#L92

Added line #L92 was not covered by tests
} else {
consecutiveSequentialCount++;
}
}

if (!shouldDetectSequentialAccess()) {
return false;

Check warning on line 99 in gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java

View check run for this annotation

Codecov / codecov/patch

gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java#L99

Added line #L99 was not covered by tests
}

if (consecutiveSequentialCount < readOptions.getFadviseRequestTrackCount()) {
return false;
}
logger.atInfo().log(
"Detected %d consecutive read request within distance threshold %d with fadvise: %s switching to sequential IO for '%s'",
consecutiveSequentialCount,
readOptions.getInplaceSeekLimit(),
readOptions.getFadvise(),
resourceId);
return true;
}

private boolean isRandomAccessPattern(long currentPosition) {
if (!shouldDetectRandomAccess()) {
return false;
}
if (lastServedIndex == -1) {
return false;
}

if (currentPosition < lastServedIndex) {
logger.atFine().log(
"Detected backward read from %s to %s position, switching to random IO for '%s'",
lastServedIndex, currentPosition, resourceId);
return true;
}
if (lastServedIndex >= 0
&& lastServedIndex + readOptions.getInplaceSeekLimit() < currentPosition) {
logger.atFine().log(
"Detected forward read from %s to %s position over %s threshold,"
+ " switching to random IO for '%s'",
lastServedIndex, currentPosition, readOptions.getInplaceSeekLimit(), resourceId);
return true;
}
return false;

Check warning on line 136 in gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java

View check run for this annotation

Codecov / codecov/patch

gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java#L136

Added line #L136 was not covered by tests
}

private boolean shouldDetectSequentialAccess() {
return randomAccess && readOptions.getFadvise() == Fadvise.AUTO_RANDOM;
}

private boolean shouldDetectRandomAccess() {
return !randomAccess && readOptions.getFadvise() == Fadvise.AUTO;
}

private void setRandomAccess() {
randomAccess = true;
}

private void unsetRandomAccess() {
randomAccess = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,17 @@
// in-place seeks.
private byte[] skipBuffer = null;
private ReadableByteChannel byteChannel = null;
private boolean randomAccess;
private final FileAccessPatternManager fileAccessManager;

public ContentReadChannel(
GoogleCloudStorageReadOptions readOptions, StorageResourceId resourceId) {
this.blobId =
BlobId.of(
resourceId.getBucketName(), resourceId.getObjectName(), resourceId.getGenerationId());
this.randomAccess = readOptions.getFadvise() == Fadvise.RANDOM;
this.fileAccessManager = new FileAccessPatternManager(resourceId, readOptions);
if (gzipEncoded) {
fileAccessManager.overrideAccessPattern(false);

Check warning on line 222 in gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java

View check run for this annotation

Codecov / codecov/patch

gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java#L222

Added line #L222 was not covered by tests
}
}

public int readContent(ByteBuffer dst) throws IOException {
Expand Down Expand Up @@ -304,6 +307,7 @@
int partialBytes = partiallyReadBytes(remainingBeforeRead, dst);
totalBytesRead += partialBytes;
currentPosition += partialBytes;
contentChannelCurrentPosition += partialBytes;
logger.atFine().log(
"Closing contentChannel after %s exception for '%s'.", e.getMessage(), resourceId);
closeContentChannel();
Expand All @@ -321,14 +325,6 @@
return partialReadBytes;
}

private boolean shouldDetectRandomAccess() {
return !gzipEncoded && !randomAccess && readOptions.getFadvise() == Fadvise.AUTO;
}

private void setRandomAccess() {
randomAccess = true;
}

private ReadableByteChannel openByteChannel(long bytesToRead) throws IOException {
checkArgument(
bytesToRead > 0, "bytesToRead should be greater than 0, but was %s", bytesToRead);
Expand All @@ -341,6 +337,9 @@
return serveFooterContent();
}

// Should be updated only if content is not served from cached footer
fileAccessManager.updateAccessPattern(currentPosition);

setChannelBoundaries(bytesToRead);

ReadableByteChannel readableByteChannel =
Expand Down Expand Up @@ -426,12 +425,15 @@
if (gzipEncoded) {
return objectSize;
}

long endPosition = objectSize;
if (randomAccess) {
if (fileAccessManager.isRandomAccessPattern()) {
// opening a channel for whole object doesn't make sense as anyhow it will not be utilized
// for further reads.
endPosition = startPosition + max(bytesToRead, readOptions.getMinRangeRequestSize());
} else {
if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) {
endPosition = min(startPosition + readOptions.getBlockSize(), objectSize);
}
}
if (footerContent != null) {
// If footer is cached open just till footerStart.
Expand All @@ -451,6 +453,7 @@
"Got an exception on contentChannel.close() for '%s'; ignoring it.", resourceId);
} finally {
byteChannel = null;
fileAccessManager.updateLastServedIndex(contentChannelCurrentPosition);
reset();
}
}
Expand Down Expand Up @@ -521,39 +524,12 @@
if (isInRangeSeek()) {
skipInPlace();
} else {
if (isRandomAccessPattern()) {
setRandomAccess();
}
// close existing contentChannel as requested bytes can't be served from current
// contentChannel;
closeContentChannel();
}
}

private boolean isRandomAccessPattern() {
if (!shouldDetectRandomAccess()) {
return false;
}
if (currentPosition < contentChannelCurrentPosition) {
logger.atFine().log(
"Detected backward read from %s to %s position, switching to random IO for '%s'",
contentChannelCurrentPosition, currentPosition, resourceId);
return true;
}
if (contentChannelCurrentPosition >= 0
&& contentChannelCurrentPosition + readOptions.getInplaceSeekLimit() < currentPosition) {
logger.atFine().log(
"Detected forward read from %s to %s position over %s threshold,"
+ " switching to random IO for '%s'",
contentChannelCurrentPosition,
currentPosition,
readOptions.getInplaceSeekLimit(),
resourceId);
return true;
}
return false;
}

private ReadableByteChannel getStorageReadChannel(long seek, long limit) throws IOException {
ReadChannel readChannel = storage.reader(blobId, generateReadOptions(blobId));
try {
Expand Down Expand Up @@ -595,7 +571,7 @@

@VisibleForTesting
boolean randomAccessStatus() {
return contentReadChannel.randomAccess;
return contentReadChannel.fileAccessManager.isRandomAccessPattern();
}

private static void validate(GoogleCloudStorageItemInfo itemInfo) throws IOException {
Expand Down
Loading