Skip to content

Commit

Permalink
Throw an error depending on the expiration date of TemporalConfigStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
TrsNium committed Dec 25, 2020
1 parent 432517d commit 626b0e0
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 5 deletions.
10 changes: 10 additions & 0 deletions digdag-spi/src/main/java/io/digdag/spi/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,14 @@ default Optional<DirectUploadHandle> getDirectUploadHandle(String key)
{
return Optional.absent();
}

default Optional<Long> getDirectDownloadExpiration()
{
return Optional.absent();
}

default Optional<Long> getDirectUploadExpiration()
{
return Optional.absent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ CommandStatus runOnKubernetes(final CommandContext context,
nextStatus.set("cluster_name", FACTORY.textNode(client.getConfig().getName()));
nextStatus.set("pod_name", FACTORY.textNode(pod.getName()));
nextStatus.set("pod_creation_timestamp", FACTORY.numberNode(pod.getCreationTimestamp()));
nextStatus.set("in_temporal_config_storage_expiration", FACTORY.numberNode(inConfigStorage.getDirectDownloadExpiration().get()));
nextStatus.set("out_temporal_config_storage_expiration", FACTORY.numberNode(outConfigStorage.getDirectUploadExpiration().get()));
nextStatus.set("io_directory", FACTORY.textNode(ioDirectoryPath.toString()));
nextStatus.set("executor_state", FACTORY.objectNode());
return createCommandStatus(pod, false, nextStatus);
Expand Down Expand Up @@ -256,6 +258,21 @@ CommandStatus getCommandStatusFromKubernetes(final CommandContext context,
log(logMessage, clog);
nextExecutorState.set("log_offset", FACTORY.numberNode(offset + logMessage.length())); // update log_offset
}
else if(isLaunchingLongerThanInConfigStorageExpiration(previousStatusJson)) {
// Throw error because launching pod time is longer than inTemporalConfigStorage expires.
TaskRequest request = context.getTaskRequest();
long attemptId = request.getAttemptId();
long taskId = request.getTaskId();

final String message = s("Pod launch timeout: attempt=%d, task=%d", attemptId, taskId);
logger.warn(message);

logger.info(s("Delete pod %d", pod.getName()));
client.deletePod(pod.getName());

// Throw exception to stop the task as failure
throw new TaskExecutionException(message);
}
else { // 'waiting'
// Write pod status to the command logger to avoid users confusing. For example, the container
// waits starting if it will take long time to download container images.
Expand All @@ -276,7 +293,7 @@ CommandStatus getCommandStatusFromKubernetes(final CommandContext context,
final InputStream in = outConfigStorage.getContentInputStream(outputArchiveKey);
ProjectArchives.extractTarArchive(context.getLocalProjectPath(), in); // runtime exception
}
else if (defaultPodTTL.isPresent() && isRunningLongerThanTTL(previousStatusJson)) {
else if (isRunningLongerThanOutConfigStorageExpiration(previousStatusJson) || (defaultPodTTL.isPresent() && isRunningLongerThanTTL(previousStatusJson))) {
TaskRequest request = context.getTaskRequest();
long attemptId = request.getAttemptId();
long taskId = request.getTaskId();
Expand Down Expand Up @@ -307,6 +324,22 @@ protected List<String> setArgumentsAfterScriptCommandLine()
return ImmutableList.of();
}

private boolean isLaunchingLongerThanInConfigStorageExpiration(final ObjectNode previousStatusJson)
{
long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong();
long inTemporalConfigStorageExpiration = previousStatusJson.get("in_temporal_config_storage_expiration").asLong();
long currentTimestamp = Instant.now().getEpochSecond();
return currentTimestamp > creationTimestamp + inTemporalConfigStorageExpiration;
}

private boolean isRunningLongerThanOutConfigStorageExpiration(final ObjectNode previousStatusJson)
{
long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong();
long outTemporalConfigStorageExpiration = previousStatusJson.get("out_temporal_config_storage_expiration").asLong();
long currentTimestamp = Instant.now().getEpochSecond();
return currentTimestamp > creationTimestamp + outTemporalConfigStorageExpiration;
}

private boolean isRunningLongerThanTTL(final ObjectNode previousStatusJson)
{
long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import com.google.common.base.Optional;

public class TemporalConfigStorage
{
Expand Down Expand Up @@ -75,4 +76,14 @@ public InputStream getContentInputStream(final String key)
throw Throwables.propagate(e);
}
}

public Optional<Long> getDirectDownloadExpiration()
{
return storage.getDirectDownloadExpiration();
}

public Optional<Long> getDirectUploadExpiration()
{
return storage.getDirectUploadExpiration();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void list(String objectPrefix, FileListing callback)
@Override
public Optional<DirectDownloadHandle> getDirectDownloadHandle(String object)
{
final long secondsToExpire = config.get("direct_download_expiration", Long.class, 10L*60);
final long secondsToExpire = getDirectDownloadExpiration().get();

BlobInfo blobInfo = BlobInfo.newBuilder(bucket, object).build();
URL signedUrl = this.storage.signUrl(blobInfo, secondsToExpire, TimeUnit.SECONDS, Storage.SignUrlOption.httpMethod(HttpMethod.GET), Storage.SignUrlOption.withV4Signature());
Expand All @@ -156,7 +156,7 @@ public Optional<DirectDownloadHandle> getDirectDownloadHandle(String object)
@Override
public Optional<DirectUploadHandle> getDirectUploadHandle(String object)
{
final long secondsToExpire = config.get("direct_upload_expiration", Long.class, 10L*60);
final long secondsToExpire = getDirectUploadExpiration().get();

BlobInfo blobInfo = BlobInfo.newBuilder(bucket, object).build();
URL signedUrl = this.storage.signUrl(blobInfo, secondsToExpire, TimeUnit.SECONDS, Storage.SignUrlOption.httpMethod(HttpMethod.PUT), Storage.SignUrlOption.withV4Signature());
Expand All @@ -165,6 +165,18 @@ public Optional<DirectUploadHandle> getDirectUploadHandle(String object)
return Optional.of(DirectUploadHandle.of(url));
}

@Override
public Optional<Long> getDirectDownloadExpiration()
{
return Optional.of(config.get("direct_download_expiration", Long.class, 10L*60));
}

@Override
public Optional<Long> getDirectUploadExpiration()
{
return Optional.of(config.get("direct_upload_expiration", Long.class, 10L*60));
}

private <T> T getWithRetry(String message, Callable<T> callable)
throws StorageException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void list(String keyPrefix, FileListing callback)
@Override
public Optional<DirectDownloadHandle> getDirectDownloadHandle(String key)
{
final long secondsToExpire = config.get("direct_download_expiration", Long.class, 10L*60);
final long secondsToExpire = getDirectDownloadExpiration().get();

GeneratePresignedUrlRequest req = new GeneratePresignedUrlRequest(bucket, key);
req.setExpiration(Date.from(Instant.now().plusSeconds(secondsToExpire)));
Expand All @@ -213,7 +213,8 @@ public Optional<DirectDownloadHandle> getDirectDownloadHandle(String key)
@Override
public Optional<DirectUploadHandle> getDirectUploadHandle(String key)
{
final long secondsToExpire = config.get("direct_upload_expiration", Long.class, 10L*60);
final long secondsToExpire = getDirectUploadExpiration().get();


GeneratePresignedUrlRequest req = new GeneratePresignedUrlRequest(bucket, key);
req.setMethod(HttpMethod.PUT);
Expand All @@ -224,6 +225,18 @@ public Optional<DirectUploadHandle> getDirectUploadHandle(String key)
return Optional.of(DirectUploadHandle.of(url));
}

@Override
public Optional<Long> getDirectDownloadExpiration()
{
return Optional.of(config.get("direct_download_expiration", Long.class, 10L*60));
}

@Override
public Optional<Long> getDirectUploadExpiration()
{
return Optional.of(config.get("direct_upload_expiration", Long.class, 10L*60));
}

private <T> T getWithRetry(String message, Callable<T> callable)
throws StorageFileNotFoundException
{
Expand Down

0 comments on commit 626b0e0

Please sign in to comment.