Skip to content

Commit

Permalink
Merge branch 'trinodb:master' into divya/truera-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
reetika-roy authored Feb 13, 2023
2 parents 1a77659 + 7c79c4c commit 0a718f4
Show file tree
Hide file tree
Showing 69 changed files with 1,126 additions and 300 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -662,14 +662,20 @@ jobs:
AWS_REGION: us-east-2
S3_BUCKET: trino-ci-test
GCP_CREDENTIALS_KEY: ${{ secrets.GCP_CREDENTIALS_KEY }}
ABFS_CONTAINER: ${{ secrets.AZURE_ABFS_CONTAINER }}
ABFS_ACCOUNT: ${{ secrets.AZURE_ABFS_ACCOUNT }}
ABFS_ACCESS_KEY: ${{ secrets.AZURE_ABFS_ACCESSKEY }}
if: >-
contains(matrix.modules, 'trino-iceberg') && contains(matrix.profile, 'cloud-tests') &&
(env.AWS_ACCESS_KEY_ID != '' || env.AWS_SECRET_ACCESS_KEY != '' || env.GCP_CREDENTIALS_KEY != '')
run: |
$MAVEN test ${MAVEN_TEST} -pl :trino-iceberg ${{ format('-P {0}', matrix.profile) }} \
-Ds3.bucket=${S3_BUCKET} \
-Dtesting.gcp-storage-bucket="trino-ci-test-us-east" \
-Dtesting.gcp-credentials-key="${GCP_CREDENTIALS_KEY}"
-Dtesting.gcp-credentials-key="${GCP_CREDENTIALS_KEY}" \
-Dhive.hadoop2.azure-abfs-container="${ABFS_CONTAINER}" \
-Dhive.hadoop2.azure-abfs-account="${ABFS_ACCOUNT}" \
-Dhive.hadoop2.azure-abfs-access-key="${ABFS_ACCESS_KEY}"
- name: Sanitize artifact name
if: always()
run: |
Expand Down
89 changes: 58 additions & 31 deletions core/trino-main/src/main/java/io/trino/execution/SqlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.net.URI;
import java.util.List;
Expand Down Expand Up @@ -90,7 +91,9 @@ public class SqlTask
private final AtomicReference<DateTime> lastHeartbeat = new AtomicReference<>(DateTime.now());
private final AtomicLong taskStatusVersion = new AtomicLong(TaskStatus.STARTING_VERSION);
private final FutureStateChange<?> taskStatusVersionChange = new FutureStateChange<>();

// Must be synchronized when updating the current task holder reference, but not when only reading the current reference value
private final Object taskHolderLock = new Object();
@GuardedBy("taskHolderLock")
private final AtomicReference<TaskHolder> taskHolderReference = new AtomicReference<>(new TaskHolder());
private final AtomicBoolean needsPlan = new AtomicBoolean(true);
private final AtomicReference<String> traceToken = new AtomicReference<>();
Expand Down Expand Up @@ -167,19 +170,18 @@ private void initialize(Consumer<SqlTask> onDone, CounterStat failedTasks)
}

// store final task info
while (true) {
synchronized (taskHolderLock) {
TaskHolder taskHolder = taskHolderReference.get();
if (taskHolder.isFinished()) {
// another concurrent worker already set the final state
return;
}

if (taskHolderReference.compareAndSet(taskHolder, new TaskHolder(
TaskHolder newHolder = new TaskHolder(
createTaskInfo(taskHolder),
taskHolder.getIoStats(),
taskHolder.getDynamicFilterDomains()))) {
break;
}
taskHolder.getDynamicFilterDomains());
checkState(taskHolderReference.compareAndSet(taskHolder, newHolder), "unsynchronized concurrent task holder update");
}

// make sure buffers are cleaned up
Expand Down Expand Up @@ -433,44 +435,69 @@ public TaskInfo updateTask(
// a VALUES query).
outputBuffer.setOutputBuffers(outputBuffers);

// assure the task execution is only created once
SqlTaskExecution taskExecution;
synchronized (this) {
// is task already complete?
TaskHolder taskHolder = taskHolderReference.get();
if (taskHolder.isFinished()) {
return taskHolder.getFinalTaskInfo();
}
taskExecution = taskHolder.getTaskExecution();
if (taskExecution == null) {
checkState(fragment.isPresent(), "fragment must be present");
taskExecution = sqlTaskExecutionFactory.create(
session,
queryContext,
taskStateMachine,
outputBuffer,
fragment.get(),
this::notifyStatusChanged);
taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
needsPlan.set(false);
taskExecution.start();
}
// is task already complete?
TaskHolder taskHolder = taskHolderReference.get();
if (taskHolder.isFinished()) {
return taskHolder.getFinalTaskInfo();
}

taskExecution.addSplitAssignments(splitAssignments);
taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains);
SqlTaskExecution taskExecution = taskHolder.getTaskExecution();
if (taskExecution == null) {
checkState(fragment.isPresent(), "fragment must be present");
taskExecution = tryCreateSqlTaskExecution(session, fragment.get());
}
// taskExecution can still be null if the creation was skipped
if (taskExecution != null) {
taskExecution.addSplitAssignments(splitAssignments);
taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains);
}
}
catch (Error e) {
failed(e);
throw e;
}
catch (RuntimeException e) {
failed(e);
return failed(e);
}

return getTaskInfo();
}

@Nullable
private SqlTaskExecution tryCreateSqlTaskExecution(Session session, PlanFragment fragment)
{
synchronized (taskHolderLock) {
// Recheck holder for task execution after acquiring the lock
TaskHolder taskHolder = taskHolderReference.get();
if (taskHolder.isFinished()) {
return null;
}
SqlTaskExecution execution = taskHolder.getTaskExecution();
if (execution != null) {
return execution;
}

// Don't create a new execution if the task is already done
if (taskStateMachine.getState().isDone()) {
return null;
}

execution = sqlTaskExecutionFactory.create(
session,
queryContext,
taskStateMachine,
outputBuffer,
fragment,
this::notifyStatusChanged);
needsPlan.set(false);
execution.start();
// this must happen after taskExecution.start(), otherwise it could become visible to a
// concurrent update without being fully initialized
checkState(taskHolderReference.compareAndSet(taskHolder, new TaskHolder(execution)), "unsynchronized concurrent task holder update");
return execution;
}
}

public ListenableFuture<BufferResult> getTaskResults(PipelinedOutputBuffers.OutputBufferId bufferId, long startingSequenceId, DataSize maxSize)
{
requireNonNull(bufferId, "bufferId is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,22 @@ public SqlTaskExecution(
else {
taskHandle = null;
}

outputBuffer.addStateChangeListener(new CheckTaskCompletionOnBufferFinish(SqlTaskExecution.this));
}
}

public void start()
{
try (SetThreadName ignored = new SetThreadName("Task-%s", getTaskId())) {
// Task handle was not created because the task is already done, nothing to do
if (taskHandle == null) {
return;
}
// The scheduleDriversForTaskLifeCycle method calls enqueueDriverSplitRunner, which registers a callback with access to this object.
// The call back is accessed from another thread, so this code cannot be placed in the constructor.
// The call back is accessed from another thread, so this code cannot be placed in the constructor. This must also happen before outputBuffer
// callbacks are registered to prevent a task completion check before task lifecycle splits are created
scheduleDriversForTaskLifeCycle();
// Output buffer state change listener callback must not run in the constructor to avoid leaking a reference to "this" across to another thread
outputBuffer.addStateChangeListener(new CheckTaskCompletionOnBufferFinish(SqlTaskExecution.this));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,17 +401,6 @@ public ListenableFuture<TaskInfo> getTaskInfo(TaskId taskId, long currentVersion
return sqlTask.getTaskInfo(currentVersion);
}

/**
* Gets the unique instance id of a task. This can be used to detect a task
* that was destroyed and recreated.
*/
public String getTaskInstanceId(TaskId taskId)
{
SqlTask sqlTask = tasks.getUnchecked(taskId);
sqlTask.recordHeartbeat();
return sqlTask.getTaskInstanceId();
}

/**
* Gets future status for the task after the state changes from
* {@code current state}. If the task has not been created yet, an
Expand Down Expand Up @@ -508,14 +497,15 @@ private TaskInfo doUpdateTask(
* NOTE: this design assumes that only tasks and buffers that will
* eventually exist are queried.
*/
public ListenableFuture<BufferResult> getTaskResults(TaskId taskId, PipelinedOutputBuffers.OutputBufferId bufferId, long startingSequenceId, DataSize maxSize)
public SqlTaskWithResults getTaskResults(TaskId taskId, PipelinedOutputBuffers.OutputBufferId bufferId, long startingSequenceId, DataSize maxSize)
{
requireNonNull(taskId, "taskId is null");
requireNonNull(bufferId, "bufferId is null");
checkArgument(startingSequenceId >= 0, "startingSequenceId is negative");
requireNonNull(maxSize, "maxSize is null");

return tasks.getUnchecked(taskId).getTaskResults(bufferId, startingSequenceId, maxSize);
SqlTask task = tasks.getUnchecked(taskId);
return new SqlTaskWithResults(task, task.getTaskResults(bufferId, startingSequenceId, maxSize));
}

/**
Expand Down Expand Up @@ -778,4 +768,39 @@ private void failStuckSplitTasks()
}
}
}

public static final class SqlTaskWithResults
{
private final SqlTask task;
private final ListenableFuture<BufferResult> resultsFuture;

public SqlTaskWithResults(SqlTask task, ListenableFuture<BufferResult> resultsFuture)
{
this.task = requireNonNull(task, "task is null");
this.resultsFuture = requireNonNull(resultsFuture, "resultsFuture is null");
}

public void recordHeartbeat()
{
task.recordHeartbeat();
}

public String getTaskInstanceId()
{
return task.getTaskInstanceId();
}

public boolean isTaskFailed()
{
return switch (task.getTaskState()) {
case ABORTED, FAILED -> true;
default -> false;
};
}

public ListenableFuture<BufferResult> getResultsFuture()
{
return resultsFuture;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,10 @@ public ListenableFuture<BufferResult> get(OutputBufferId bufferId, long token, D
if (outputBuffer == null) {
synchronized (this) {
if (delegate == null) {
if (stateMachine.getState() == FINISHED) {
return immediateFuture(emptyResults(taskInstanceId, 0, true));
if (stateMachine.getState().isTerminal()) {
// only set complete when finished, otherwise
boolean complete = stateMachine.getState() == FINISHED;
return immediateFuture(emptyResults(taskInstanceId, 0, complete));
}

PendingRead pendingRead = new PendingRead(bufferId, token, maxSize);
Expand Down Expand Up @@ -310,19 +312,31 @@ public void destroy()
@Override
public void abort()
{
List<PendingRead> pendingReads = ImmutableList.of();
OutputBuffer outputBuffer = delegate;
if (outputBuffer == null) {
synchronized (this) {
if (delegate == null) {
// ignore abort if the buffer already in a terminal state.
stateMachine.abort();
if (!stateMachine.abort()) {
return;
}

// Do not free readers on fail
return;
pendingReads = ImmutableList.copyOf(this.pendingReads);
this.pendingReads.clear();
}
outputBuffer = delegate;
}
}

// if there is no output buffer, send an empty result without buffer completed signaled
if (outputBuffer == null) {
for (PendingRead pendingRead : pendingReads) {
pendingRead.getFutureResult().set(emptyResults(taskInstanceId, 0, false));
}
return;
}

outputBuffer.abort();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,8 +979,9 @@ private StateChangeListener<TaskStatus> createExchangeSinkInstanceHandleUpdateRe

private void loadMoreTaskDescriptorsIfNecessary()
{
if (schedulingQueue.getNonSpeculativeTaskCount() + nodeAcquisitions.size() < maxTasksWaitingForExecution) {
for (StageExecution stageExecution : stageExecutions.values()) {
boolean schedulingQueueIsFull = schedulingQueue.getNonSpeculativeTaskCount() >= maxTasksWaitingForExecution;
for (StageExecution stageExecution : stageExecutions.values()) {
if (!schedulingQueueIsFull || stageExecution.hasOpenTaskRunning()) {
stageExecution.loadMoreTaskDescriptors().ifPresent(future -> Futures.addCallback(future, new FutureCallback<>()
{
@Override
Expand Down Expand Up @@ -1147,6 +1148,7 @@ private static class StageExecution
private final Int2ObjectMap<StagePartition> partitions = new Int2ObjectOpenHashMap<>();
private boolean noMorePartitions;

private final IntSet runningPartitions = new IntOpenHashSet();
private final IntSet remainingPartitions = new IntOpenHashSet();

private ExchangeSourceOutputSelector.Builder sinkOutputSelectorBuilder;
Expand Down Expand Up @@ -1348,10 +1350,33 @@ public Optional<RemoteTask> schedule(int partitionId, ExchangeSinkInstanceHandle
splits,
noMoreSplits,
Optional.of(partition.getMemoryRequirements().getRequiredMemory()));
task.ifPresent(remoteTask -> partition.addTask(remoteTask, outputBuffers));
task.ifPresent(remoteTask -> {
partition.addTask(remoteTask, outputBuffers);
runningPartitions.add(partitionId);
});
return task;
}

public boolean hasOpenTaskRunning()
{
if (getState().isDone()) {
return false;
}

if (runningPartitions.isEmpty()) {
return false;
}

for (int partitionId : runningPartitions) {
StagePartition partition = getStagePartition(partitionId);
if (!partition.isSealed()) {
return true;
}
}

return false;
}

public Optional<ListenableFuture<AssignmentResult>> loadMoreTaskDescriptors()
{
if (getState().isDone() || taskDescriptorLoadingActive) {
Expand Down Expand Up @@ -1428,6 +1453,10 @@ public void taskFinished(TaskId taskId, TaskStatus taskStatus)
exchange.sinkFinished(partition.getExchangeSinkHandle(), taskId.getAttemptId());
SpoolingOutputStats.Snapshot outputStats = partition.taskFinished(taskId);

if (!partition.isRunning()) {
runningPartitions.remove(partitionId);
}

if (!remainingPartitions.remove(partitionId)) {
// a different task for the same partition finished before
return;
Expand Down Expand Up @@ -1476,6 +1505,10 @@ public List<PrioritizedScheduledTask> taskFailed(TaskId taskId, ExecutionFailure
StagePartition partition = getStagePartition(partitionId);
partition.taskFailed(taskId);

if (!partition.isRunning()) {
runningPartitions.remove(partitionId);
}

RuntimeException failure = failureInfo.toException();
ErrorCode errorCode = failureInfo.getErrorCode();
partitionMemoryEstimator.registerPartitionFinished(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,10 @@ public synchronized void schedulingComplete(PlanNodeId partitionedSource)
@Override
public synchronized void cancel()
{
stateMachine.transitionToCanceled();
getAllTasks().forEach(RemoteTask::cancel);
// Only send tasks a cancel command if the stage is successfully cancelled and not already failed
if (stateMachine.transitionToCanceled()) {
getAllTasks().forEach(RemoteTask::cancel);
}
}

@Override
Expand Down
Loading

0 comments on commit 0a718f4

Please sign in to comment.