diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a2c650a7b6d..d94fdeff9823 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -662,6 +662,9 @@ jobs: AWS_REGION: us-east-2 S3_BUCKET: trino-ci-test GCP_CREDENTIALS_KEY: ${{ secrets.GCP_CREDENTIALS_KEY }} + ABFS_CONTAINER: ${{ secrets.AZURE_ABFS_CONTAINER }} + ABFS_ACCOUNT: ${{ secrets.AZURE_ABFS_ACCOUNT }} + ABFS_ACCESS_KEY: ${{ secrets.AZURE_ABFS_ACCESSKEY }} if: >- contains(matrix.modules, 'trino-iceberg') && contains(matrix.profile, 'cloud-tests') && (env.AWS_ACCESS_KEY_ID != '' || env.AWS_SECRET_ACCESS_KEY != '' || env.GCP_CREDENTIALS_KEY != '') @@ -669,7 +672,10 @@ jobs: $MAVEN test ${MAVEN_TEST} -pl :trino-iceberg ${{ format('-P {0}', matrix.profile) }} \ -Ds3.bucket=${S3_BUCKET} \ -Dtesting.gcp-storage-bucket="trino-ci-test-us-east" \ - -Dtesting.gcp-credentials-key="${GCP_CREDENTIALS_KEY}" + -Dtesting.gcp-credentials-key="${GCP_CREDENTIALS_KEY}" \ + -Dhive.hadoop2.azure-abfs-container="${ABFS_CONTAINER}" \ + -Dhive.hadoop2.azure-abfs-account="${ABFS_ACCOUNT}" \ + -Dhive.hadoop2.azure-abfs-access-key="${ABFS_ACCESS_KEY}" - name: Sanitize artifact name if: always() run: | diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTask.java b/core/trino-main/src/main/java/io/trino/execution/SqlTask.java index 975c6d310d8f..ffb4a1832472 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTask.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTask.java @@ -43,6 +43,7 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.net.URI; import java.util.List; @@ -90,7 +91,9 @@ public class SqlTask private final AtomicReference lastHeartbeat = new AtomicReference<>(DateTime.now()); private final AtomicLong taskStatusVersion = new AtomicLong(TaskStatus.STARTING_VERSION); private final FutureStateChange taskStatusVersionChange = new FutureStateChange<>(); - + // Must be synchronized when updating the current task holder reference, but not when only reading the current reference value + private final Object taskHolderLock = new Object(); + @GuardedBy("taskHolderLock") private final AtomicReference taskHolderReference = new AtomicReference<>(new TaskHolder()); private final AtomicBoolean needsPlan = new AtomicBoolean(true); private final AtomicReference traceToken = new AtomicReference<>(); @@ -167,19 +170,18 @@ private void initialize(Consumer onDone, CounterStat failedTasks) } // store final task info - while (true) { + synchronized (taskHolderLock) { TaskHolder taskHolder = taskHolderReference.get(); if (taskHolder.isFinished()) { // another concurrent worker already set the final state return; } - if (taskHolderReference.compareAndSet(taskHolder, new TaskHolder( + TaskHolder newHolder = new TaskHolder( createTaskInfo(taskHolder), taskHolder.getIoStats(), - taskHolder.getDynamicFilterDomains()))) { - break; - } + taskHolder.getDynamicFilterDomains()); + checkState(taskHolderReference.compareAndSet(taskHolder, newHolder), "unsynchronized concurrent task holder update"); } // make sure buffers are cleaned up @@ -433,44 +435,69 @@ public TaskInfo updateTask( // a VALUES query). outputBuffer.setOutputBuffers(outputBuffers); - // assure the task execution is only created once - SqlTaskExecution taskExecution; - synchronized (this) { - // is task already complete? - TaskHolder taskHolder = taskHolderReference.get(); - if (taskHolder.isFinished()) { - return taskHolder.getFinalTaskInfo(); - } - taskExecution = taskHolder.getTaskExecution(); - if (taskExecution == null) { - checkState(fragment.isPresent(), "fragment must be present"); - taskExecution = sqlTaskExecutionFactory.create( - session, - queryContext, - taskStateMachine, - outputBuffer, - fragment.get(), - this::notifyStatusChanged); - taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution)); - needsPlan.set(false); - taskExecution.start(); - } + // is task already complete? + TaskHolder taskHolder = taskHolderReference.get(); + if (taskHolder.isFinished()) { + return taskHolder.getFinalTaskInfo(); } - taskExecution.addSplitAssignments(splitAssignments); - taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains); + SqlTaskExecution taskExecution = taskHolder.getTaskExecution(); + if (taskExecution == null) { + checkState(fragment.isPresent(), "fragment must be present"); + taskExecution = tryCreateSqlTaskExecution(session, fragment.get()); + } + // taskExecution can still be null if the creation was skipped + if (taskExecution != null) { + taskExecution.addSplitAssignments(splitAssignments); + taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains); + } } catch (Error e) { failed(e); throw e; } catch (RuntimeException e) { - failed(e); + return failed(e); } return getTaskInfo(); } + @Nullable + private SqlTaskExecution tryCreateSqlTaskExecution(Session session, PlanFragment fragment) + { + synchronized (taskHolderLock) { + // Recheck holder for task execution after acquiring the lock + TaskHolder taskHolder = taskHolderReference.get(); + if (taskHolder.isFinished()) { + return null; + } + SqlTaskExecution execution = taskHolder.getTaskExecution(); + if (execution != null) { + return execution; + } + + // Don't create a new execution if the task is already done + if (taskStateMachine.getState().isDone()) { + return null; + } + + execution = sqlTaskExecutionFactory.create( + session, + queryContext, + taskStateMachine, + outputBuffer, + fragment, + this::notifyStatusChanged); + needsPlan.set(false); + execution.start(); + // this must happen after taskExecution.start(), otherwise it could become visible to a + // concurrent update without being fully initialized + checkState(taskHolderReference.compareAndSet(taskHolder, new TaskHolder(execution)), "unsynchronized concurrent task holder update"); + return execution; + } + } + public ListenableFuture getTaskResults(PipelinedOutputBuffers.OutputBufferId bufferId, long startingSequenceId, DataSize maxSize) { requireNonNull(bufferId, "bufferId is null"); diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java index b3a021c2d5ae..11992e742b0a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java @@ -163,17 +163,22 @@ public SqlTaskExecution( else { taskHandle = null; } - - outputBuffer.addStateChangeListener(new CheckTaskCompletionOnBufferFinish(SqlTaskExecution.this)); } } public void start() { try (SetThreadName ignored = new SetThreadName("Task-%s", getTaskId())) { + // Task handle was not created because the task is already done, nothing to do + if (taskHandle == null) { + return; + } // The scheduleDriversForTaskLifeCycle method calls enqueueDriverSplitRunner, which registers a callback with access to this object. - // The call back is accessed from another thread, so this code cannot be placed in the constructor. + // The call back is accessed from another thread, so this code cannot be placed in the constructor. This must also happen before outputBuffer + // callbacks are registered to prevent a task completion check before task lifecycle splits are created scheduleDriversForTaskLifeCycle(); + // Output buffer state change listener callback must not run in the constructor to avoid leaking a reference to "this" across to another thread + outputBuffer.addStateChangeListener(new CheckTaskCompletionOnBufferFinish(SqlTaskExecution.this)); } } diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java index 1d84e55b1dcd..b16c161549d0 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java @@ -401,17 +401,6 @@ public ListenableFuture getTaskInfo(TaskId taskId, long currentVersion return sqlTask.getTaskInfo(currentVersion); } - /** - * Gets the unique instance id of a task. This can be used to detect a task - * that was destroyed and recreated. - */ - public String getTaskInstanceId(TaskId taskId) - { - SqlTask sqlTask = tasks.getUnchecked(taskId); - sqlTask.recordHeartbeat(); - return sqlTask.getTaskInstanceId(); - } - /** * Gets future status for the task after the state changes from * {@code current state}. If the task has not been created yet, an @@ -508,14 +497,15 @@ private TaskInfo doUpdateTask( * NOTE: this design assumes that only tasks and buffers that will * eventually exist are queried. */ - public ListenableFuture getTaskResults(TaskId taskId, PipelinedOutputBuffers.OutputBufferId bufferId, long startingSequenceId, DataSize maxSize) + public SqlTaskWithResults getTaskResults(TaskId taskId, PipelinedOutputBuffers.OutputBufferId bufferId, long startingSequenceId, DataSize maxSize) { requireNonNull(taskId, "taskId is null"); requireNonNull(bufferId, "bufferId is null"); checkArgument(startingSequenceId >= 0, "startingSequenceId is negative"); requireNonNull(maxSize, "maxSize is null"); - return tasks.getUnchecked(taskId).getTaskResults(bufferId, startingSequenceId, maxSize); + SqlTask task = tasks.getUnchecked(taskId); + return new SqlTaskWithResults(task, task.getTaskResults(bufferId, startingSequenceId, maxSize)); } /** @@ -778,4 +768,39 @@ private void failStuckSplitTasks() } } } + + public static final class SqlTaskWithResults + { + private final SqlTask task; + private final ListenableFuture resultsFuture; + + public SqlTaskWithResults(SqlTask task, ListenableFuture resultsFuture) + { + this.task = requireNonNull(task, "task is null"); + this.resultsFuture = requireNonNull(resultsFuture, "resultsFuture is null"); + } + + public void recordHeartbeat() + { + task.recordHeartbeat(); + } + + public String getTaskInstanceId() + { + return task.getTaskInstanceId(); + } + + public boolean isTaskFailed() + { + return switch (task.getTaskState()) { + case ABORTED, FAILED -> true; + default -> false; + }; + } + + public ListenableFuture getResultsFuture() + { + return resultsFuture; + } + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java index 9dd4bd9fc532..173e76756dd3 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/LazyOutputBuffer.java @@ -209,8 +209,10 @@ public ListenableFuture get(OutputBufferId bufferId, long token, D if (outputBuffer == null) { synchronized (this) { if (delegate == null) { - if (stateMachine.getState() == FINISHED) { - return immediateFuture(emptyResults(taskInstanceId, 0, true)); + if (stateMachine.getState().isTerminal()) { + // only set complete when finished, otherwise + boolean complete = stateMachine.getState() == FINISHED; + return immediateFuture(emptyResults(taskInstanceId, 0, complete)); } PendingRead pendingRead = new PendingRead(bufferId, token, maxSize); @@ -310,19 +312,31 @@ public void destroy() @Override public void abort() { + List pendingReads = ImmutableList.of(); OutputBuffer outputBuffer = delegate; if (outputBuffer == null) { synchronized (this) { if (delegate == null) { // ignore abort if the buffer already in a terminal state. - stateMachine.abort(); + if (!stateMachine.abort()) { + return; + } - // Do not free readers on fail - return; + pendingReads = ImmutableList.copyOf(this.pendingReads); + this.pendingReads.clear(); } outputBuffer = delegate; } } + + // if there is no output buffer, send an empty result without buffer completed signaled + if (outputBuffer == null) { + for (PendingRead pendingRead : pendingReads) { + pendingRead.getFutureResult().set(emptyResults(taskInstanceId, 0, false)); + } + return; + } + outputBuffer.abort(); } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java index 9913528031b2..4dc73071d94f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenFaultTolerantQueryScheduler.java @@ -979,8 +979,9 @@ private StateChangeListener createExchangeSinkInstanceHandleUpdateRe private void loadMoreTaskDescriptorsIfNecessary() { - if (schedulingQueue.getNonSpeculativeTaskCount() + nodeAcquisitions.size() < maxTasksWaitingForExecution) { - for (StageExecution stageExecution : stageExecutions.values()) { + boolean schedulingQueueIsFull = schedulingQueue.getNonSpeculativeTaskCount() >= maxTasksWaitingForExecution; + for (StageExecution stageExecution : stageExecutions.values()) { + if (!schedulingQueueIsFull || stageExecution.hasOpenTaskRunning()) { stageExecution.loadMoreTaskDescriptors().ifPresent(future -> Futures.addCallback(future, new FutureCallback<>() { @Override @@ -1147,6 +1148,7 @@ private static class StageExecution private final Int2ObjectMap partitions = new Int2ObjectOpenHashMap<>(); private boolean noMorePartitions; + private final IntSet runningPartitions = new IntOpenHashSet(); private final IntSet remainingPartitions = new IntOpenHashSet(); private ExchangeSourceOutputSelector.Builder sinkOutputSelectorBuilder; @@ -1348,10 +1350,33 @@ public Optional schedule(int partitionId, ExchangeSinkInstanceHandle splits, noMoreSplits, Optional.of(partition.getMemoryRequirements().getRequiredMemory())); - task.ifPresent(remoteTask -> partition.addTask(remoteTask, outputBuffers)); + task.ifPresent(remoteTask -> { + partition.addTask(remoteTask, outputBuffers); + runningPartitions.add(partitionId); + }); return task; } + public boolean hasOpenTaskRunning() + { + if (getState().isDone()) { + return false; + } + + if (runningPartitions.isEmpty()) { + return false; + } + + for (int partitionId : runningPartitions) { + StagePartition partition = getStagePartition(partitionId); + if (!partition.isSealed()) { + return true; + } + } + + return false; + } + public Optional> loadMoreTaskDescriptors() { if (getState().isDone() || taskDescriptorLoadingActive) { @@ -1428,6 +1453,10 @@ public void taskFinished(TaskId taskId, TaskStatus taskStatus) exchange.sinkFinished(partition.getExchangeSinkHandle(), taskId.getAttemptId()); SpoolingOutputStats.Snapshot outputStats = partition.taskFinished(taskId); + if (!partition.isRunning()) { + runningPartitions.remove(partitionId); + } + if (!remainingPartitions.remove(partitionId)) { // a different task for the same partition finished before return; @@ -1476,6 +1505,10 @@ public List taskFailed(TaskId taskId, ExecutionFailure StagePartition partition = getStagePartition(partitionId); partition.taskFailed(taskId); + if (!partition.isRunning()) { + runningPartitions.remove(partitionId); + } + RuntimeException failure = failureInfo.toException(); ErrorCode errorCode = failureInfo.getErrorCode(); partitionMemoryEstimator.registerPartitionFinished( diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java index 203b5dcab5bf..927efb0aa7e2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java @@ -248,8 +248,10 @@ public synchronized void schedulingComplete(PlanNodeId partitionedSource) @Override public synchronized void cancel() { - stateMachine.transitionToCanceled(); - getAllTasks().forEach(RemoteTask::cancel); + // Only send tasks a cancel command if the stage is successfully cancelled and not already failed + if (stateMachine.transitionToCanceled()) { + getAllTasks().forEach(RemoteTask::cancel); + } } @Override diff --git a/core/trino-main/src/main/java/io/trino/operator/StreamingDirectExchangeBuffer.java b/core/trino-main/src/main/java/io/trino/operator/StreamingDirectExchangeBuffer.java index ff937ad6d08b..770bb306f569 100644 --- a/core/trino-main/src/main/java/io/trino/operator/StreamingDirectExchangeBuffer.java +++ b/core/trino-main/src/main/java/io/trino/operator/StreamingDirectExchangeBuffer.java @@ -32,6 +32,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfUnchecked; +import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; import static io.trino.spi.StandardErrorCode.REMOTE_TASK_FAILED; import static java.lang.Math.max; @@ -52,7 +53,7 @@ public class StreamingDirectExchangeBuffer @GuardedBy("this") private volatile long maxBufferRetainedSizeInBytes; @GuardedBy("this") - private volatile SettableFuture blocked = SettableFuture.create(); + private Queue> blocked = new ArrayDeque<>(); @GuardedBy("this") private final Set activeTasks = new HashSet<>(); @GuardedBy("this") @@ -69,9 +70,15 @@ public StreamingDirectExchangeBuffer(Executor executor, DataSize bufferCapacity) } @Override - public ListenableFuture isBlocked() + public synchronized ListenableFuture isBlocked() { - return nonCancellationPropagating(blocked); + if (!bufferedPages.isEmpty() || isFailed() || (noMoreTasks && activeTasks.isEmpty())) { + return immediateVoidFuture(); + } + SettableFuture callback = SettableFuture.create(); + blocked.add(callback); + + return nonCancellationPropagating(callback); } @Override @@ -87,10 +94,6 @@ public synchronized Slice pollPage() bufferRetainedSizeInBytes -= page.getRetainedSize(); checkState(bufferRetainedSizeInBytes >= 0, "unexpected bufferRetainedSizeInBytes: %s", bufferRetainedSizeInBytes); } - // if buffer is empty block future calls - if (bufferedPages.isEmpty() && !isFinished() && blocked.isDone()) { - blocked = SettableFuture.create(); - } return page; } @@ -119,7 +122,8 @@ public void addPages(TaskId taskId, List pages) bufferedPages.addAll(pages); bufferRetainedSizeInBytes += pagesRetainedSizeInBytes; maxBufferRetainedSizeInBytes = max(maxBufferRetainedSizeInBytes, bufferRetainedSizeInBytes); - unblockIfNecessary(blocked); + // Unblock the same number of consumers as pages to reduce the possibility of a thread waking up with an empty pull from the buffer. + unblock(pages.size()); } } @@ -131,8 +135,8 @@ public synchronized void taskFinished(TaskId taskId) } checkState(activeTasks.contains(taskId), "taskId not registered: %s", taskId); activeTasks.remove(taskId); - if (noMoreTasks && activeTasks.isEmpty() && !blocked.isDone()) { - unblockIfNecessary(blocked); + if (noMoreTasks && activeTasks.isEmpty()) { + unblockAll(); } } @@ -153,15 +157,15 @@ public synchronized void taskFailed(TaskId taskId, Throwable t) failure = t; activeTasks.remove(taskId); - unblockIfNecessary(blocked); + unblockAll(); } @Override public synchronized void noMoreTasks() { noMoreTasks = true; - if (activeTasks.isEmpty() && !blocked.isDone()) { - unblockIfNecessary(blocked); + if (activeTasks.isEmpty()) { + unblockAll(); } } @@ -224,16 +228,26 @@ public synchronized void close() activeTasks.clear(); noMoreTasks = true; closed = true; - unblockIfNecessary(blocked); + unblockAll(); } - private void unblockIfNecessary(SettableFuture blocked) + private synchronized void unblock(int unblock) { - if (!blocked.isDone()) { - executor.execute(() -> blocked.set(null)); + for (int i = 0; i < unblock; i++) { + SettableFuture callback = blocked.poll(); + if (callback == null) { + break; + } + executor.execute(() -> callback.set(null)); } } + private synchronized void unblockAll() + { + unblock(blocked.size()); + checkState(blocked.isEmpty(), "blocked callbacks is not empty"); + } + private synchronized void throwIfFailed() { if (failure != null) { diff --git a/core/trino-main/src/main/java/io/trino/server/TaskResource.java b/core/trino-main/src/main/java/io/trino/server/TaskResource.java index 146253243196..f1c60c641a28 100644 --- a/core/trino-main/src/main/java/io/trino/server/TaskResource.java +++ b/core/trino-main/src/main/java/io/trino/server/TaskResource.java @@ -27,9 +27,9 @@ import io.trino.execution.FailureInjector; import io.trino.execution.FailureInjector.InjectedFailure; import io.trino.execution.SqlTaskManager; +import io.trino.execution.SqlTaskManager.SqlTaskWithResults; import io.trino.execution.TaskId; import io.trino.execution.TaskInfo; -import io.trino.execution.TaskState; import io.trino.execution.TaskStatus; import io.trino.execution.buffer.BufferResult; import io.trino.execution.buffer.PipelinedOutputBuffers; @@ -70,6 +70,7 @@ import static io.airlift.concurrent.MoreFutures.addTimeout; import static io.airlift.jaxrs.AsyncResponseHandler.bindAsyncResponse; import static io.trino.TrinoMediaTypes.TRINO_PAGES; +import static io.trino.execution.buffer.BufferResult.emptyResults; import static io.trino.server.InternalHeaders.TRINO_BUFFER_COMPLETE; import static io.trino.server.InternalHeaders.TRINO_CURRENT_VERSION; import static io.trino.server.InternalHeaders.TRINO_MAX_SIZE; @@ -189,11 +190,14 @@ public void getTaskInfo( } Duration waitTime = randomizeWaitTime(maxWait); - ListenableFuture futureTaskInfo = addTimeout( - taskManager.getTaskInfo(taskId, currentVersion), - () -> taskManager.getTaskInfo(taskId), - waitTime, - timeoutExecutor); + ListenableFuture futureTaskInfo = taskManager.getTaskInfo(taskId, currentVersion); + if (!futureTaskInfo.isDone()) { + futureTaskInfo = addTimeout( + futureTaskInfo, + () -> taskManager.getTaskInfo(taskId), + waitTime, + timeoutExecutor); + } if (shouldSummarize(uriInfo)) { futureTaskInfo = Futures.transform(futureTaskInfo, TaskInfo::summarize, directExecutor()); @@ -232,11 +236,14 @@ public void getTaskStatus( // TODO: With current implementation, a newly completed driver group won't trigger immediate HTTP response, // leading to a slight delay of approx 1 second, which is not a major issue for any query that are heavy weight enough // to justify group-by-group execution. In order to fix this, REST endpoint /v1/{task}/status will need change. - ListenableFuture futureTaskStatus = addTimeout( - taskManager.getTaskStatus(taskId, currentVersion), - () -> taskManager.getTaskStatus(taskId), - waitTime, - timeoutExecutor); + ListenableFuture futureTaskStatus = taskManager.getTaskStatus(taskId, currentVersion); + if (!futureTaskStatus.isDone()) { + futureTaskStatus = addTimeout( + futureTaskStatus, + () -> taskManager.getTaskStatus(taskId), + waitTime, + timeoutExecutor); + } // For hard timeout, add an additional time to max wait for thread scheduling contention and GC Duration timeout = new Duration(waitTime.toMillis() + ADDITIONAL_WAIT_TIME.toMillis(), MILLISECONDS); @@ -322,52 +329,26 @@ public void getResults( return; } - TaskState state = taskManager.getTaskStatus(taskId).getState(); - boolean taskFailed = state == TaskState.ABORTED || state == TaskState.FAILED; - long start = System.nanoTime(); - ListenableFuture bufferResultFuture = taskManager.getTaskResults(taskId, bufferId, token, maxSize); + SqlTaskWithResults taskWithResults = taskManager.getTaskResults(taskId, bufferId, token, maxSize); + ListenableFuture bufferResultFuture = taskWithResults.getResultsFuture(); + BufferResult emptyBufferResults = emptyResults(taskWithResults.getTaskInstanceId(), token, false); + Duration waitTime = randomizeWaitTime(DEFAULT_MAX_WAIT_TIME); - bufferResultFuture = addTimeout( - bufferResultFuture, - () -> BufferResult.emptyResults(taskManager.getTaskInstanceId(taskId), token, false), - waitTime, - timeoutExecutor); - - ListenableFuture responseFuture = Futures.transform(bufferResultFuture, result -> { - List serializedPages = result.getSerializedPages(); - - GenericEntity entity = null; - Status status; - if (serializedPages.isEmpty()) { - status = Status.NO_CONTENT; - } - else { - entity = new GenericEntity<>(serializedPages, new TypeToken>() {}.getType()); - status = Status.OK; - } + if (!bufferResultFuture.isDone()) { + bufferResultFuture = addTimeout( + bufferResultFuture, + () -> emptyBufferResults, + waitTime, + timeoutExecutor); + } - return Response.status(status) - .entity(entity) - .header(TRINO_TASK_INSTANCE_ID, result.getTaskInstanceId()) - .header(TRINO_PAGE_TOKEN, result.getToken()) - .header(TRINO_PAGE_NEXT_TOKEN, result.getNextToken()) - .header(TRINO_BUFFER_COMPLETE, result.isBufferComplete()) - .header(TRINO_TASK_FAILED, taskFailed) - .build(); - }, directExecutor()); + ListenableFuture responseFuture = Futures.transform(bufferResultFuture, results -> createBufferResultResponse(taskWithResults, results), directExecutor()); // For hard timeout, add an additional time to max wait for thread scheduling contention and GC Duration timeout = new Duration(waitTime.toMillis() + ADDITIONAL_WAIT_TIME.toMillis(), MILLISECONDS); bindAsyncResponse(asyncResponse, responseFuture, responseExecutor) - .withTimeout(timeout, - Response.status(Status.NO_CONTENT) - .header(TRINO_TASK_INSTANCE_ID, taskManager.getTaskInstanceId(taskId)) - .header(TRINO_PAGE_TOKEN, token) - .header(TRINO_PAGE_NEXT_TOKEN, token) - .header(TRINO_BUFFER_COMPLETE, false) - .header(TRINO_TASK_FAILED, taskFailed) - .build()); + .withTimeout(timeout, () -> createBufferResultResponse(taskWithResults, emptyBufferResults)); responseFuture.addListener(() -> readFromOutputBufferTime.add(Duration.nanosSince(start)), directExecutor()); asyncResponse.register((CompletionCallback) throwable -> resultsRequestTime.add(Duration.nanosSince(start))); @@ -516,4 +497,32 @@ private static Duration randomizeWaitTime(Duration waitTime) long halfWaitMillis = waitTime.toMillis() / 2; return new Duration(halfWaitMillis + ThreadLocalRandom.current().nextLong(halfWaitMillis), MILLISECONDS); } + + private static Response createBufferResultResponse(SqlTaskWithResults taskWithResults, BufferResult result) + { + // This response may have been created as the result of a timeout, so refresh the task heartbeat + taskWithResults.recordHeartbeat(); + + List serializedPages = result.getSerializedPages(); + + GenericEntity entity = null; + Status status; + if (serializedPages.isEmpty()) { + status = Status.NO_CONTENT; + } + else { + entity = new GenericEntity<>(serializedPages, new TypeToken>() {}.getType()); + status = Status.OK; + } + + return Response.status(status) + .entity(entity) + .header(TRINO_TASK_INSTANCE_ID, result.getTaskInstanceId()) + .header(TRINO_PAGE_TOKEN, result.getToken()) + .header(TRINO_PAGE_NEXT_TOKEN, result.getNextToken()) + .header(TRINO_BUFFER_COMPLETE, result.isBufferComplete()) + // check for task failure after getting the result to ensure it's consistent with isBufferComplete() + .header(TRINO_TASK_FAILED, taskWithResults.isTaskFailed()) + .build(); + } } diff --git a/core/trino-main/src/main/java/io/trino/server/security/oauth2/NimbusOAuth2Client.java b/core/trino-main/src/main/java/io/trino/server/security/oauth2/NimbusOAuth2Client.java index 9860703bdf59..fcb985d60223 100644 --- a/core/trino-main/src/main/java/io/trino/server/security/oauth2/NimbusOAuth2Client.java +++ b/core/trino-main/src/main/java/io/trino/server/security/oauth2/NimbusOAuth2Client.java @@ -378,7 +378,7 @@ private Optional queryUserInfo(String accessToken) try { UserInfoResponse response = httpClient.execute(new UserInfoRequest(userinfoUrl.get(), new BearerAccessToken(accessToken)), UserInfoResponse::parse); if (!response.indicatesSuccess()) { - LOG.error("Received bad response from userinfo endpoint: " + response.toErrorResponse().getErrorObject()); + LOG.error("Received bad response from userinfo endpoint: %s", response.toErrorResponse().getErrorObject()); return Optional.empty(); } return Optional.of(response.toSuccessResponse().getUserInfo().toJWTClaimsSet()); diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index 6cb9cce5f858..299f19ce5dc5 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -1580,6 +1580,16 @@ protected Scope visitTableFunctionInvocation(TableFunctionInvocation node, Optio // so the function's analyze() method should not return the proper columns descriptor. throw semanticException(AMBIGUOUS_RETURN_TYPE, node, "Returned relation type for table function %s is ambiguous", node.getName()); } + if (function.getArguments().stream() + .filter(TableArgumentSpecification.class::isInstance) + .map(TableArgumentSpecification.class::cast) + .noneMatch(TableArgumentSpecification::isPassThroughColumns)) { + // According to SQL standard ISO/IEC 9075-2, 10.4 , p. 764, + // if there is no generic table parameter that specifies PASS THROUGH, then number of proper columns shall be positive. + // For GENERIC_TABLE and DescribedTable returned types, this is enforced by the Descriptor constructor, which requires positive number of fields. + // Here we enforce it for the remaining returned type specification: ONLY_PASS_THROUGH. + throw new TrinoException(FUNCTION_IMPLEMENTATION_ERROR, "A table function with ONLY_PASS_THROUGH return type must have a table argument with pass-through columns."); + } properColumnsDescriptor = null; } else if (returnTypeSpecification == GENERIC_TABLE) { diff --git a/core/trino-main/src/test/java/io/trino/connector/TestingTableFunctions.java b/core/trino-main/src/test/java/io/trino/connector/TestingTableFunctions.java index 3a94bacd9509..a2c849a59dcd 100644 --- a/core/trino-main/src/test/java/io/trino/connector/TestingTableFunctions.java +++ b/core/trino-main/src/test/java/io/trino/connector/TestingTableFunctions.java @@ -267,6 +267,7 @@ public OnlyPassThroughFunction() ImmutableList.of( TableArgumentSpecification.builder() .name("INPUT") + .passThroughColumns() .keepWhenEmpty() .build()), ONLY_PASS_THROUGH); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java index ad1f09186f5b..61aa5671e9ce 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java @@ -145,13 +145,13 @@ public void testSimpleQuery() TaskInfo taskInfo = sqlTaskManager.getTaskInfo(taskId, TaskStatus.STARTING_VERSION).get(); assertEquals(taskInfo.getTaskStatus().getState(), TaskState.FLUSHING); - BufferResult results = sqlTaskManager.getTaskResults(taskId, OUT, 0, DataSize.of(1, Unit.MEGABYTE)).get(); + BufferResult results = sqlTaskManager.getTaskResults(taskId, OUT, 0, DataSize.of(1, Unit.MEGABYTE)).getResultsFuture().get(); assertFalse(results.isBufferComplete()); assertEquals(results.getSerializedPages().size(), 1); assertEquals(getSerializedPagePositionCount(results.getSerializedPages().get(0)), 1); for (boolean moreResults = true; moreResults; moreResults = !results.isBufferComplete()) { - results = sqlTaskManager.getTaskResults(taskId, OUT, results.getToken() + results.getSerializedPages().size(), DataSize.of(1, Unit.MEGABYTE)).get(); + results = sqlTaskManager.getTaskResults(taskId, OUT, results.getToken() + results.getSerializedPages().size(), DataSize.of(1, Unit.MEGABYTE)).getResultsFuture().get(); } assertTrue(results.isBufferComplete()); assertEquals(results.getSerializedPages().size(), 0); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestStreamingDirectExchangeBuffer.java b/core/trino-main/src/test/java/io/trino/operator/TestStreamingDirectExchangeBuffer.java index 2b5d86291874..eec605a4da6d 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestStreamingDirectExchangeBuffer.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestStreamingDirectExchangeBuffer.java @@ -47,22 +47,23 @@ public void testHappyPath() { try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(directExecutor(), DataSize.of(1, KILOBYTE))) { assertFalse(buffer.isFinished()); - assertFalse(buffer.isBlocked().isDone()); + ListenableFuture blocked = buffer.isBlocked(); + assertFalse(blocked.isDone()); assertNull(buffer.pollPage()); buffer.addTask(TASK_0); assertFalse(buffer.isFinished()); - assertFalse(buffer.isBlocked().isDone()); + assertFalse(blocked.isDone()); assertNull(buffer.pollPage()); buffer.addTask(TASK_1); assertFalse(buffer.isFinished()); - assertFalse(buffer.isBlocked().isDone()); + assertFalse(blocked.isDone()); assertNull(buffer.pollPage()); buffer.noMoreTasks(); assertFalse(buffer.isFinished()); - assertFalse(buffer.isBlocked().isDone()); + assertFalse(blocked.isDone()); assertNull(buffer.pollPage()); buffer.addPages(TASK_0, ImmutableList.of(PAGE_0)); @@ -71,13 +72,14 @@ public void testHappyPath() assertEquals(buffer.getMaxRetainedSizeInBytes(), PAGE_0.getRetainedSize()); assertEquals(buffer.getRemainingCapacityInBytes(), DataSize.of(1, KILOBYTE).toBytes() - PAGE_0.getRetainedSize()); assertFalse(buffer.isFinished()); - assertTrue(buffer.isBlocked().isDone()); + assertTrue(blocked.isDone()); assertEquals(buffer.pollPage(), PAGE_0); + blocked = buffer.isBlocked(); assertEquals(buffer.getRetainedSizeInBytes(), 0); assertEquals(buffer.getMaxRetainedSizeInBytes(), PAGE_0.getRetainedSize()); assertEquals(buffer.getRemainingCapacityInBytes(), DataSize.of(1, KILOBYTE).toBytes()); assertFalse(buffer.isFinished()); - assertFalse(buffer.isBlocked().isDone()); + assertFalse(blocked.isDone()); buffer.taskFinished(TASK_0); assertFalse(buffer.isFinished()); @@ -100,7 +102,7 @@ public void testHappyPath() buffer.taskFinished(TASK_1); assertTrue(buffer.isFinished()); - assertTrue(buffer.isBlocked().isDone()); + assertTrue(blocked.isDone()); } } @@ -128,47 +130,50 @@ public void testIsFinished() // 0 tasks try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(directExecutor(), DataSize.of(1, KILOBYTE))) { assertFalse(buffer.isFinished()); - assertFalse(buffer.isBlocked().isDone()); + ListenableFuture blocked = buffer.isBlocked(); + assertFalse(blocked.isDone()); buffer.noMoreTasks(); assertTrue(buffer.isFinished()); - assertTrue(buffer.isBlocked().isDone()); + assertTrue(blocked.isDone()); } // single task try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(directExecutor(), DataSize.of(1, KILOBYTE))) { assertFalse(buffer.isFinished()); - assertFalse(buffer.isBlocked().isDone()); + ListenableFuture blocked = buffer.isBlocked(); + assertFalse(blocked.isDone()); buffer.addTask(TASK_0); buffer.noMoreTasks(); assertFalse(buffer.isFinished()); - assertFalse(buffer.isBlocked().isDone()); + assertFalse(blocked.isDone()); buffer.taskFinished(TASK_0); assertTrue(buffer.isFinished()); - assertTrue(buffer.isBlocked().isDone()); + assertTrue(blocked.isDone()); } // single failed task try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(directExecutor(), DataSize.of(1, KILOBYTE))) { assertFalse(buffer.isFinished()); - assertFalse(buffer.isBlocked().isDone()); + ListenableFuture blocked = buffer.isBlocked(); + assertFalse(blocked.isDone()); buffer.addTask(TASK_0); assertFalse(buffer.isFinished()); - assertFalse(buffer.isBlocked().isDone()); + assertFalse(blocked.isDone()); RuntimeException error = new RuntimeException(); buffer.taskFailed(TASK_0, error); assertFalse(buffer.isFinished()); assertTrue(buffer.isFailed()); - assertTrue(buffer.isBlocked().isDone()); + assertTrue(blocked.isDone()); assertThatThrownBy(buffer::pollPage).isEqualTo(error); } } @@ -224,4 +229,29 @@ public void testRemoteTaskFailedError() assertNull(buffer.pollPage()); } } + + @Test + public void testSingleWakeUp() + { + try (StreamingDirectExchangeBuffer buffer = new StreamingDirectExchangeBuffer(directExecutor(), DataSize.of(1, KILOBYTE))) { + assertFalse(buffer.isFinished()); + ListenableFuture blocked1 = buffer.isBlocked(); + ListenableFuture blocked2 = buffer.isBlocked(); + assertFalse(blocked1.isDone()); + assertFalse(blocked2.isDone()); + + buffer.addTask(TASK_0); + + buffer.addPages(TASK_0, ImmutableList.of(PAGE_0)); + buffer.pollPage(); + + assertTrue(blocked1.isDone()); + assertFalse(blocked2.isDone()); + + buffer.addPages(TASK_0, ImmutableList.of(PAGE_0)); + buffer.pollPage(); + + assertTrue(blocked2.isDone()); + } + } } diff --git a/core/trino-spi/src/main/java/io/trino/spi/PageBuilder.java b/core/trino-spi/src/main/java/io/trino/spi/PageBuilder.java index dc00e76f472b..b62fc0630385 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/PageBuilder.java +++ b/core/trino-spi/src/main/java/io/trino/spi/PageBuilder.java @@ -45,7 +45,7 @@ public class PageBuilder * therefore it can resize frequently while appending new rows. *

* This constructor should only be used to get the initial PageBuilder. - * Once the PageBuilder is full use reset() or createPageBuilderLike() to create a new + * Once the PageBuilder is full use reset() or newPageBuilderLike() to create a new * PageBuilder instance with its size estimated based on previous data. */ public PageBuilder(List types) diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlockEncoding.java index dcb137eacbf7..eded7b4585bf 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlockEncoding.java @@ -18,9 +18,11 @@ import io.airlift.slice.SliceOutput; import io.airlift.slice.Slices; -import static io.airlift.slice.SizeOf.SIZE_OF_INT; +import java.util.Arrays; + import static io.trino.spi.block.EncoderUtil.decodeNullBits; import static io.trino.spi.block.EncoderUtil.encodeNullsAsBits; +import static java.lang.String.format; public class VariableWidthBlockEncoding implements BlockEncoding @@ -71,26 +73,59 @@ public Block readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput sliceIn int positionCount = sliceInput.readInt(); int nonNullsCount = sliceInput.readInt(); + if (nonNullsCount > positionCount) { + throw new IllegalArgumentException(format("nonNullsCount must be <= positionCount, found: %s > %s", nonNullsCount, positionCount)); + } + int[] offsets = new int[positionCount + 1]; - int[] lengths = new int[nonNullsCount]; - sliceInput.readBytes(Slices.wrappedIntArray(lengths), 0, nonNullsCount * SIZE_OF_INT); + // Read the lengths array into the end of the offsets array, since nonNullsCount <= positionCount + int lengthIndex = offsets.length - nonNullsCount; + sliceInput.readBytes(Slices.wrappedIntArray(offsets, lengthIndex, nonNullsCount)); boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null); + // Transform lengths back to offsets + if (valueIsNull == null) { + if (positionCount != nonNullsCount || lengthIndex != 1) { + throw new IllegalArgumentException(format("nonNullsCount must equal positionCount, found: %s <> %s", nonNullsCount, positionCount)); + } + // Simplified loop for no nulls present + for (int i = 1; i < offsets.length; i++) { + offsets[i] += offsets[i - 1]; + } + } + else { + computeOffsetsFromLengths(offsets, valueIsNull, lengthIndex); + } int blockSize = sliceInput.readInt(); Slice slice = Slices.allocate(blockSize); sliceInput.readBytes(slice); - int nonNullPosition = 0; - int offset = 0; + return new VariableWidthBlock(0, positionCount, slice, offsets, valueIsNull); + } - for (int i = 0; i < positionCount; i++) { - if (valueIsNull == null || !valueIsNull[i]) { - offset += lengths[nonNullPosition]; - nonNullPosition++; + private static void computeOffsetsFromLengths(int[] offsets, boolean[] valueIsNull, int lengthIndex) + { + if (lengthIndex < 0 || lengthIndex > offsets.length) { + throw new IllegalArgumentException(format("Invalid lengthIndex %s for offsets %s", lengthIndex, offsets.length)); + } + int currentOffset = 0; + for (int i = 1; i < offsets.length; i++) { + if (lengthIndex == offsets.length) { + // Populate remaining null elements + Arrays.fill(offsets, i, offsets.length, currentOffset); + break; } - offsets[i + 1] = offset; + boolean isNull = valueIsNull[i - 1]; + // must be accessed unconditionally, otherwise CMOV optimization isn't applied due to + // ArrayIndexOutOfBoundsException checks + int length = offsets[lengthIndex]; + lengthIndex += isNull ? 0 : 1; + currentOffset += isNull ? 0 : length; + offsets[i] = currentOffset; + } + if (lengthIndex != offsets.length) { + throw new IllegalArgumentException(format("Failed to consume all length entries, found %s <> %s", lengthIndex, offsets.length)); } - return new VariableWidthBlock(0, positionCount, slice, offsets, valueIsNull); } } diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index 3499f70057cc..85b39e5f8cd3 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -139,7 +139,9 @@ When using any database besides PostgreSQL, a JDBC driver jar file must be place connector.name=iceberg iceberg.catalog.type=jdbc iceberg.jdbc-catalog.catalog-name=test - iceberg.jdbc-catalog.connection-url=jdbc:postgresql://example.net:5432/database?user=admin&password=test + iceberg.jdbc-catalog.connection-url=jdbc:postgresql://example.net:5432/database + iceberg.jdbc-catalog.connection-user=admin + iceberg.jdbc-catalog.connection-password=test iceberg.jdbc-catalog.default-warehouse-dir=s3://bucket diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java index d07446ff6920..9855ccb5a7da 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java @@ -1134,7 +1134,7 @@ private AWSCredentialsProvider createAwsCredentialsProvider(URI uri, Configurati region = regionProviderChain.getRegion(); } catch (SdkClientException ex) { - log.warn("Falling back to default AWS region " + US_EAST_1); + log.warn("Falling back to default AWS region %s", US_EAST_1); region = US_EAST_1.getName(); } } diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index ff027db2f0fd..731b1282172c 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -549,6 +549,7 @@ **/TestIcebergGlueTableOperationsInsertFailure.java **/TestIcebergGlueCatalogSkipArchive.java **/TestIcebergGcsConnectorSmokeTest.java + **/TestIcebergAbfsConnectorSmokeTest.java io/trino/plugin/iceberg/catalog/jdbc/Test*.java io/trino/plugin/iceberg/catalog/rest/Test*.java **/Test*FailureRecoveryTest.java @@ -611,6 +612,7 @@ **/TestIcebergGlueTableOperationsInsertFailure.java **/TestIcebergGlueCatalogSkipArchive.java **/TestIcebergGcsConnectorSmokeTest.java + **/TestIcebergAbfsConnectorSmokeTest.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 3dadd09d8f29..b8abb23e421c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -76,6 +76,7 @@ import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.RowChangeParadigm; +import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.connector.SystemTable; @@ -700,6 +701,10 @@ public Optional getNewTableLayout(ConnectorSession session public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode) { verify(transaction == null, "transaction already set"); + String schemaName = tableMetadata.getTable().getSchemaName(); + if (!schemaExists(session, schemaName)) { + throw new SchemaNotFoundException(schemaName); + } transaction = newCreateTableTransaction(catalog, tableMetadata, session); String location = transaction.table().location(); TrinoFileSystem fileSystem = fileSystemFactory.create(session); @@ -1310,7 +1315,7 @@ public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecu IcebergSessionProperties.REMOVE_ORPHAN_FILES_MIN_RETENTION); if (table.currentSnapshot() == null) { - log.debug("Skipping remove_orphan_files procedure for empty table " + table); + log.debug("Skipping remove_orphan_files procedure for empty table %s", table); return; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 734ec767ab57..d7b9904c8d18 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -195,6 +195,18 @@ public List listNamespaces(ConnectorSession session) } } + private List listNamespaces(ConnectorSession session, Optional namespace) + { + if (namespace.isPresent()) { + if (isHiveSystemSchema(namespace.get())) { + // TODO https://github.com/trinodb/trino/issues/1559 information_schema should be handled by the engine fully + return ImmutableList.of(); + } + return ImmutableList.of(namespace.get()); + } + return listNamespaces(session); + } + @Override public void dropNamespace(ConnectorSession session, String namespace) { @@ -289,7 +301,7 @@ public List listTables(ConnectorSession session, Optional tables = ImmutableList.builder(); try { - List namespaces = namespace.map(List::of).orElseGet(() -> listNamespaces(session)); + List namespaces = listNamespaces(session, namespace); for (String glueNamespace : namespaces) { try { // Add all tables from a namespace together, in case it is removed while fetching paginated results @@ -663,7 +675,7 @@ public List listViews(ConnectorSession session, Optional views = ImmutableList.builder(); try { - List namespaces = namespace.map(List::of).orElseGet(() -> listNamespaces(session)); + List namespaces = listNamespaces(session, namespace); for (String glueNamespace : namespaces) { try { views.addAll(getPaginatedResults( @@ -775,7 +787,7 @@ public List listMaterializedViews(ConnectorSession session, Opt { ImmutableList.Builder materializedViews = ImmutableList.builder(); try { - List namespaces = namespace.map(List::of).orElseGet(() -> listNamespaces(session)); + List namespaces = listNamespaces(session, namespace); for (String glueNamespace : namespaces) { try { materializedViews.addAll(getPaginatedResults( diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index ca1cb44fea47..d1faf40cba0f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -602,6 +602,7 @@ private List listNamespaces(ConnectorSession session, Optional n { if (namespace.isPresent()) { if (isHiveSystemSchema(namespace.get())) { + // TODO https://github.com/trinodb/trino/issues/1559 information_schema should be handled by the engine fully return ImmutableList.of(); } return ImmutableList.of(namespace.get()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogConfig.java index 8f1a203f94c6..1c3bae1cadd2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogConfig.java @@ -18,10 +18,15 @@ import io.airlift.configuration.ConfigSecuritySensitive; import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; + +import java.util.Optional; public class IcebergJdbcCatalogConfig { private String connectionUrl; + private String connectionUser; + private String connectionPassword; private String catalogName; private String defaultWarehouseDir; @@ -39,6 +44,35 @@ public IcebergJdbcCatalogConfig setConnectionUrl(String connectionUrl) return this; } + @NotNull + public Optional getConnectionUser() + { + return Optional.ofNullable(connectionUser); + } + + @Config("iceberg.jdbc-catalog.connection-user") + @ConfigDescription("User name for JDBC client") + public IcebergJdbcCatalogConfig setConnectionUser(String connectionUser) + { + this.connectionUser = connectionUser; + return this; + } + + @NotNull + public Optional getConnectionPassword() + { + return Optional.ofNullable(connectionPassword); + } + + @Config("iceberg.jdbc-catalog.connection-password") + @ConfigDescription("Password for JDBC client") + @ConfigSecuritySensitive + public IcebergJdbcCatalogConfig setConnectionPassword(String connectionPassword) + { + this.connectionPassword = connectionPassword; + return this; + } + @NotEmpty public String getCatalogName() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java index f89441e02bad..213dc3c0cb22 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcCatalogModule.java @@ -43,7 +43,7 @@ protected void setup(Binder binder) public static IcebergJdbcClient createIcebergJdbcClient(IcebergJdbcCatalogConfig config) { return new IcebergJdbcClient( - new IcebergJdbcConnectionFactory(config.getConnectionUrl()), + new IcebergJdbcConnectionFactory(config.getConnectionUrl(), config.getConnectionUser(), config.getConnectionPassword()), config.getCatalogName()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConnectionFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConnectionFactory.java index b59ce22f3740..fc4b5d4babe5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConnectionFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcConnectionFactory.java @@ -18,6 +18,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.Optional; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; @@ -26,17 +27,21 @@ public class IcebergJdbcConnectionFactory implements ConnectionFactory { private final String connectionUrl; + private final Optional user; + private final Optional password; - public IcebergJdbcConnectionFactory(String connectionUrl) + public IcebergJdbcConnectionFactory(String connectionUrl, Optional user, Optional password) { this.connectionUrl = requireNonNull(connectionUrl, "connectionUrl is null"); + this.user = requireNonNull(user, "user is null"); + this.password = requireNonNull(password, "password is null"); } @Override public Connection openConnection() throws SQLException { - Connection connection = DriverManager.getConnection(connectionUrl); + Connection connection = DriverManager.getConnection(connectionUrl, user.orElse(null), password.orElse(null)); checkState(connection != null, "Driver returned null connection, make sure the connection URL is valid"); return connection; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java index ea8bbaa9a5ac..2821e3ea6bc8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java @@ -27,9 +27,12 @@ import javax.annotation.concurrent.GuardedBy; import javax.inject.Inject; +import java.util.Optional; + import static java.util.Objects.requireNonNull; import static org.apache.iceberg.CatalogProperties.URI; import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION; +import static org.apache.iceberg.jdbc.JdbcCatalog.PROPERTY_PREFIX; public class TrinoJdbcCatalogFactory implements TrinoCatalogFactory @@ -40,6 +43,8 @@ public class TrinoJdbcCatalogFactory private final TrinoFileSystemFactory fileSystemFactory; private final String jdbcCatalogName; private final String connectionUrl; + private final Optional connectionUser; + private final Optional connectionPassword; private final String defaultWarehouseDir; private final boolean isUniqueTableLocation; @@ -62,6 +67,8 @@ public TrinoJdbcCatalogFactory( this.isUniqueTableLocation = requireNonNull(icebergConfig, "icebergConfig is null").isUniqueTableLocation(); this.jdbcCatalogName = jdbcConfig.getCatalogName(); this.connectionUrl = jdbcConfig.getConnectionUrl(); + this.connectionUser = jdbcConfig.getConnectionUser(); + this.connectionPassword = jdbcConfig.getConnectionPassword(); this.defaultWarehouseDir = jdbcConfig.getDefaultWarehouseDir(); } @@ -85,10 +92,12 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity) private JdbcCatalog createJdbcCatalog() { JdbcCatalog jdbcCatalog = new JdbcCatalog(); - jdbcCatalog.initialize(jdbcCatalogName, ImmutableMap.builder() - .put(URI, connectionUrl) - .put(WAREHOUSE_LOCATION, defaultWarehouseDir) - .buildOrThrow()); + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.put(URI, connectionUrl); + properties.put(WAREHOUSE_LOCATION, defaultWarehouseDir); + connectionUser.ifPresent(user -> properties.put(PROPERTY_PREFIX + "user", user)); + connectionPassword.ifPresent(password -> properties.put(PROPERTY_PREFIX + "password", password)); + jdbcCatalog.initialize(jdbcCatalogName, properties.buildOrThrow()); return jdbcCatalog; } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index 447e2706bbaf..0667c161d743 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -395,6 +395,25 @@ public void testUnregisterTableAccessControl() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testCreateTableWithNonExistingSchemaVerifyLocation() + { + String schemaName = "non_existing_schema_" + randomNameSuffix(); + String tableName = "test_create_table_in_non_existent_schema_" + randomNameSuffix(); + String tableLocation = schemaPath() + "/" + tableName; + assertQueryFails( + "CREATE TABLE " + schemaName + "." + tableName + " (a int, b int) WITH (location = '" + tableLocation + "')", + "Schema (.*) not found"); + assertThat(locationExists(tableLocation)) + .as("location should not exist").isFalse(); + + assertQueryFails( + "CREATE TABLE " + schemaName + "." + tableName + " (a, b) WITH (location = '" + tableLocation + "') AS VALUES (1, 2), (3, 4)", + "Schema (.*) not found"); + assertThat(locationExists(tableLocation)) + .as("location should not exist").isFalse(); + } + private String getTableLocation(String tableName) { return (String) computeScalar("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*/[^/]*$', '') FROM " + tableName); @@ -413,4 +432,8 @@ protected String getColumnComment(String tableName, String columnName) protected abstract void dropTableFromMetastore(String tableName); protected abstract String getMetadataLocation(String tableName); + + protected abstract String schemaPath(); + + protected abstract boolean locationExists(String location); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java index d2a1d432edf7..3dde19c49bc5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java @@ -235,6 +235,19 @@ protected String getMetadataLocation(String tableName) .getParameters().get("metadata_location"); } + @Override + protected String schemaPath() + { + return format("s3://%s/%s", bucketName, schemaName); + } + + @Override + protected boolean locationExists(String location) + { + String prefix = "s3://" + bucketName + "/"; + return !hiveMinioDataLake.listFiles(location.substring(prefix.length())).isEmpty(); + } + @Override protected void deleteDirectory(String location) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index 5f8a8c351d52..1a5e57b4aceb 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -15,8 +15,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; import io.airlift.http.server.testing.TestingHttpServer; import io.airlift.log.Logger; +import io.trino.plugin.hive.containers.HiveHadoop; import io.trino.plugin.hive.containers.HiveMinioDataLake; import io.trino.plugin.tpch.TpchPlugin; import io.trino.testing.DistributedQueryRunner; @@ -27,9 +29,13 @@ import java.io.File; import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; import static com.google.common.base.Preconditions.checkState; import static io.airlift.testing.Closeables.closeAllSuppress; @@ -37,12 +43,11 @@ import static io.trino.testing.TestingSession.testSessionBuilder; import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; public final class IcebergQueryRunner { - private static final Logger log = Logger.get(IcebergQueryRunner.class); - public static final String ICEBERG_CATALOG = "iceberg"; private IcebergQueryRunner() {} @@ -130,9 +135,6 @@ public DistributedQueryRunner build() icebergProperties.put("iceberg.catalog.type", "TESTING_FILE_METASTORE"); icebergProperties.put("hive.metastore.catalog.dir", dataDir.toString()); } - if ("jdbc".equalsIgnoreCase(catalogType) && !icebergProperties.containsKey("iceberg.jdbc-catalog.default-warehouse-dir")) { - icebergProperties.put("iceberg.jdbc-catalog.default-warehouse-dir", dataDir.toString()); - } queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties); schemaInitializer.orElseGet(() -> SchemaInitializer.builder().build()).accept(queryRunner); @@ -174,7 +176,7 @@ public static void main(String[] args) .setInitialTables(TpchTable.getTables()) .build(); - Logger log = Logger.get(IcebergQueryRunner.class); + Logger log = Logger.get(IcebergRestQueryRunnerMain.class); log.info("======== SERVER STARTED ========"); log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); } @@ -194,7 +196,7 @@ public static void main(String[] args) .setIcebergProperties(ImmutableMap.of("iceberg.catalog.type", "glue")) .build(); - Logger log = Logger.get(IcebergQueryRunner.class); + Logger log = Logger.get(IcebergGlueQueryRunnerMain.class); log.info("======== SERVER STARTED ========"); log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); } @@ -232,7 +234,63 @@ public static void main(String[] args) .build(); Thread.sleep(10); - Logger log = Logger.get(IcebergQueryRunner.class); + Logger log = Logger.get(IcebergMinIoHiveMetastoreQueryRunnerMain.class); + log.info("======== SERVER STARTED ========"); + log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); + } + } + + public static final class IcebergAzureQueryRunnerMain + { + private IcebergAzureQueryRunnerMain() {} + + public static void main(String[] args) + throws Exception + { + String azureContainer = requireNonNull( + System.getProperty("hive.hadoop2.azure-abfs-container"), + "System property hive.hadoop2.azure-abfs-container must be provided"); + String azureAccount = requireNonNull( + System.getProperty("hive.hadoop2.azure-abfs-account"), + "System property hive.hadoop2.azure-abfs-account must be provided"); + String azureAccessKey = requireNonNull( + System.getProperty("hive.hadoop2.azure-abfs-access-key"), + "System property hive.hadoop2.azure-abfs-access-key must be provided"); + + String abfsSpecificCoreSiteXmlContent = Resources.toString(Resources.getResource("hdp3.1-core-site.xml.abfs-template"), UTF_8) + .replace("%ABFS_ACCESS_KEY%", azureAccessKey) + .replace("%ABFS_ACCOUNT%", azureAccount); + + FileAttribute> posixFilePermissions = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")); + Path hadoopCoreSiteXmlTempFile = java.nio.file.Files.createTempFile("core-site", ".xml", posixFilePermissions); + hadoopCoreSiteXmlTempFile.toFile().deleteOnExit(); + java.nio.file.Files.writeString(hadoopCoreSiteXmlTempFile, abfsSpecificCoreSiteXmlContent); + + @SuppressWarnings("resource") + HiveHadoop hiveHadoop = HiveHadoop.builder() + .withImage(HiveHadoop.HIVE3_IMAGE) + .withFilesToMount(ImmutableMap.of("/etc/hadoop/conf/core-site.xml", hadoopCoreSiteXmlTempFile.normalize().toAbsolutePath().toString())) + .build(); + hiveHadoop.start(); + + DistributedQueryRunner queryRunner = IcebergQueryRunner.builder() + .setCoordinatorProperties(Map.of( + "http-server.http.port", "8080")) + .setIcebergProperties(Map.of( + "iceberg.catalog.type", "HIVE_METASTORE", + "hive.metastore.uri", "thrift://" + hiveHadoop.getHiveMetastoreEndpoint(), + "hive.azure.abfs-storage-account", azureAccount, + "hive.azure.abfs-access-key", azureAccessKey)) + .setSchemaInitializer( + SchemaInitializer.builder() + .withSchemaName("tpch") + .withClonedTpchTables(TpchTable.getTables()) + .withSchemaProperties(Map.of("location", "'abfs://%s@%s.dfs.core.windows.net/test-bucket/'".formatted(azureContainer, azureAccount))) + .build()) + .build(); + + Thread.sleep(10); + Logger log = Logger.get(IcebergAzureQueryRunnerMain.class); log.info("======== SERVER STARTED ========"); log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); } @@ -245,6 +303,7 @@ private DefaultIcebergQueryRunnerMain() {} public static void main(String[] args) throws Exception { + Logger log = Logger.get(DefaultIcebergQueryRunnerMain.class); DistributedQueryRunner queryRunner = null; try { queryRunner = IcebergQueryRunner.builder() @@ -257,7 +316,6 @@ public static void main(String[] args) System.exit(1); } Thread.sleep(10); - Logger log = Logger.get(IcebergQueryRunner.class); log.info("======== SERVER STARTED ========"); log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAbfsConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAbfsConnectorSmokeTest.java new file mode 100644 index 000000000000..8b27ba3a2695 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAbfsConnectorSmokeTest.java @@ -0,0 +1,165 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; +import io.trino.plugin.hive.containers.HiveHadoop; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore; +import io.trino.testing.QueryRunner; +import org.testng.annotations.Parameters; +import org.testng.annotations.Test; + +import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.Map; +import java.util.Set; + +import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.FileFormat.ORC; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestIcebergAbfsConnectorSmokeTest + extends BaseIcebergConnectorSmokeTest +{ + private final String container; + private final String account; + private final String accessKey; + private final String schemaName; + private final String bucketName; + + private HiveHadoop hiveHadoop; + + @Parameters({ + "hive.hadoop2.azure-abfs-container", + "hive.hadoop2.azure-abfs-account", + "hive.hadoop2.azure-abfs-access-key"}) + public TestIcebergAbfsConnectorSmokeTest(String container, String account, String accessKey) + { + super(ORC); + this.container = requireNonNull(container, "container is null"); + this.account = requireNonNull(account, "account is null"); + this.accessKey = requireNonNull(accessKey, "accessKey is null"); + this.schemaName = "tpch_" + format.name().toLowerCase(ENGLISH); + this.bucketName = "test-iceberg-smoke-test-" + randomNameSuffix(); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + String abfsSpecificCoreSiteXmlContent = Resources.toString(Resources.getResource("hdp3.1-core-site.xml.abfs-template"), UTF_8) + .replace("%ABFS_ACCESS_KEY%", accessKey) + .replace("%ABFS_ACCOUNT%", account); + + FileAttribute> posixFilePermissions = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")); + Path hadoopCoreSiteXmlTempFile = java.nio.file.Files.createTempFile("core-site", ".xml", posixFilePermissions); + hadoopCoreSiteXmlTempFile.toFile().deleteOnExit(); + java.nio.file.Files.writeString(hadoopCoreSiteXmlTempFile, abfsSpecificCoreSiteXmlContent); + + this.hiveHadoop = closeAfterClass(HiveHadoop.builder() + .withImage(HiveHadoop.HIVE3_IMAGE) + .withFilesToMount(ImmutableMap.of("/etc/hadoop/conf/core-site.xml", hadoopCoreSiteXmlTempFile.normalize().toAbsolutePath().toString())) + .build()); + this.hiveHadoop.start(); + + return IcebergQueryRunner.builder() + .setIcebergProperties( + ImmutableMap.builder() + .put("iceberg.file-format", format.name()) + .put("iceberg.catalog.type", "HIVE_METASTORE") + .put("hive.metastore.uri", "thrift://" + hiveHadoop.getHiveMetastoreEndpoint()) + .put("hive.metastore-timeout", "1m") // read timed out sometimes happens with the default timeout + .put("hive.azure.abfs-storage-account", account) + .put("hive.azure.abfs-access-key", accessKey) + .put("iceberg.register-table-procedure.enabled", "true") + .buildOrThrow()) + .setSchemaInitializer( + SchemaInitializer.builder() + .withSchemaName(schemaName) + .withClonedTpchTables(REQUIRED_TPCH_TABLES) + .withSchemaProperties(Map.of("location", "'" + formatAbfsUrl(container, account, bucketName) + schemaName + "'")) + .build()) + .build(); + } + + @Override + protected String createSchemaSql(String schemaName) + { + return "CREATE SCHEMA IF NOT EXISTS " + schemaName + " WITH (location = '" + formatAbfsUrl(container, account, bucketName) + schemaName + "')"; + } + + @Test + @Override + public void testRenameSchema() + { + assertQueryFails( + format("ALTER SCHEMA %s RENAME TO %s", schemaName, schemaName + randomNameSuffix()), + "Hive metastore does not support renaming schemas"); + } + + @Override + protected void dropTableFromMetastore(String tableName) + { + HiveMetastore metastore = new BridgingHiveMetastore( + testingThriftHiveMetastoreBuilder() + .metastoreClient(hiveHadoop.getHiveMetastoreEndpoint()) + .build()); + metastore.dropTable(schemaName, tableName, false); + assertThat(metastore.getTable(schemaName, tableName)).isEmpty(); + } + + @Override + protected String getMetadataLocation(String tableName) + { + HiveMetastore metastore = new BridgingHiveMetastore( + testingThriftHiveMetastoreBuilder() + .metastoreClient(hiveHadoop.getHiveMetastoreEndpoint()) + .build()); + return metastore + .getTable(schemaName, tableName).orElseThrow() + .getParameters().get("metadata_location"); + } + + @Override + protected String schemaPath() + { + return formatAbfsUrl(container, account, bucketName) + schemaName; + } + + @Override + protected boolean locationExists(String location) + { + return hiveHadoop.executeInContainer("hadoop", "fs", "-test", "-d", location).getExitCode() == 0; + } + + @Override + protected void deleteDirectory(String location) + { + hiveHadoop.executeInContainerFailOnError("hadoop", "fs", "-rm", "-f", "-r", location); + } + + private static String formatAbfsUrl(String container, String account, String bucketName) + { + return format("abfs://%s@%s.dfs.core.windows.net/%s/", container, account, bucketName); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java index 54bf31d7151b..5458471b01a3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java @@ -27,6 +27,7 @@ import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; +import static java.lang.String.format; import static org.apache.iceberg.FileFormat.ORC; import static org.assertj.core.api.Assertions.assertThat; @@ -79,6 +80,18 @@ protected String getMetadataLocation(String tableName) .getParameters().get("metadata_location"); } + @Override + protected String schemaPath() + { + return format("%s/%s", metastoreDir, getSession().getSchema().orElseThrow()); + } + + @Override + protected boolean locationExists(String location) + { + return Files.exists(Path.of(location)); + } + @Override protected void deleteDirectory(String location) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java index e59e8a28d9e6..aa8b9aa0fe52 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java @@ -103,7 +103,7 @@ protected QueryRunner createQueryRunner() Configuration configuration = ConfigurationInstantiator.newEmptyConfiguration(); new GoogleGcsConfigurationInitializer(gcsConfig).initializeConfiguration(configuration); - this.fileSystem = FileSystem.newInstance(new URI(schemaUrl()), configuration); + this.fileSystem = FileSystem.newInstance(new URI(schemaPath()), configuration); } catch (IOException e) { throw new UncheckedIOException(e); @@ -124,7 +124,7 @@ protected QueryRunner createQueryRunner() SchemaInitializer.builder() .withClonedTpchTables(REQUIRED_TPCH_TABLES) .withSchemaName(schema) - .withSchemaProperties(ImmutableMap.of("location", "'" + schemaUrl() + "'")) + .withSchemaProperties(ImmutableMap.of("location", "'" + schemaPath() + "'")) .build()) .build(); } @@ -134,11 +134,11 @@ public void removeTestData() { if (fileSystem != null) { try { - fileSystem.delete(new org.apache.hadoop.fs.Path(schemaUrl()), true); + fileSystem.delete(new org.apache.hadoop.fs.Path(schemaPath()), true); } catch (IOException e) { // The GCS bucket should be configured to expire objects automatically. Clean up issues do not need to fail the test. - LOG.warn(e, "Failed to clean up GCS test directory: %s", schemaUrl()); + LOG.warn(e, "Failed to clean up GCS test directory: %s", schemaPath()); } fileSystem = null; } @@ -160,14 +160,26 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) @Override protected String createSchemaSql(String schema) { - return format("CREATE SCHEMA %1$s WITH (location = '%2$s%1$s')", schema, schemaUrl()); + return format("CREATE SCHEMA %1$s WITH (location = '%2$s%1$s')", schema, schemaPath()); } - private String schemaUrl() + @Override + protected String schemaPath() { return format("gs://%s/%s/", gcpStorageBucket, schema); } + @Override + protected boolean locationExists(String location) + { + try { + return fileSystem.exists(new org.apache.hadoop.fs.Path(location)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @Test @Override public void testRenameSchema() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java index 3d78d6916061..9cbf6e6e098d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java @@ -311,7 +311,7 @@ public void testView() catalog.dropNamespace(SESSION, namespace); } catch (Exception e) { - LOG.warn("Failed to clean up namespace: " + namespace); + LOG.warn("Failed to clean up namespace: %s", namespace); } } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java index 41b6eb737b0f..3104324db393 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java @@ -144,6 +144,7 @@ public void testCreateTable() ImmutableMultiset.builder() .add(CREATE_TABLE) .add(GET_DATABASE) + .add(GET_DATABASE) .add(GET_TABLE) .build()); } @@ -158,6 +159,7 @@ public void testCreateTableAsSelect() try { assertGlueMetastoreApiInvocations("CREATE TABLE test_ctas AS SELECT 1 AS age", ImmutableMultiset.builder() + .add(GET_DATABASE) .add(GET_DATABASE) .add(CREATE_TABLE) .add(GET_TABLE) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java index 9a4bca3d38dc..e1d799b5f47e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java @@ -224,8 +224,22 @@ protected void deleteDirectory(String location) assertThat(s3.listObjects(bucketName, location).getObjectSummaries()).isEmpty(); } - private String schemaPath() + @Override + protected String schemaPath() { return format("s3://%s/%s", bucketName, schemaName); } + + @Override + protected boolean locationExists(String location) + { + String prefix = "s3://" + bucketName + "/"; + AmazonS3 s3 = AmazonS3ClientBuilder.standard().build(); + ListObjectsV2Request request = new ListObjectsV2Request() + .withBucketName(bucketName) + .withPrefix(location.substring(prefix.length())) + .withMaxKeys(1); + return !s3.listObjectsV2(request) + .getObjectSummaries().isEmpty(); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConfig.java index a028559989a2..b25ec6f6986e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConfig.java @@ -29,6 +29,8 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(IcebergJdbcCatalogConfig.class) .setConnectionUrl(null) + .setConnectionUser(null) + .setConnectionPassword(null) .setCatalogName(null) .setDefaultWarehouseDir(null)); } @@ -38,12 +40,16 @@ public void testExplicitPropertyMappings() { Map properties = ImmutableMap.builder() .put("iceberg.jdbc-catalog.connection-url", "jdbc:postgresql://localhost:5432/test") + .put("iceberg.jdbc-catalog.connection-user", "foo") + .put("iceberg.jdbc-catalog.connection-password", "bar") .put("iceberg.jdbc-catalog.catalog-name", "test") .put("iceberg.jdbc-catalog.default-warehouse-dir", "s3://bucket") .buildOrThrow(); IcebergJdbcCatalogConfig expected = new IcebergJdbcCatalogConfig() .setConnectionUrl("jdbc:postgresql://localhost:5432/test") + .setConnectionUser("foo") + .setConnectionPassword("bar") .setCatalogName("test") .setDefaultWarehouseDir("s3://bucket"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java index 73009d603ff7..1c2a35c8e88d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java @@ -20,11 +20,22 @@ import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorBehavior; +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD; +import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestIcebergJdbcCatalogConnectorSmokeTest extends BaseIcebergConnectorSmokeTest { + private File warehouseLocation; + public TestIcebergJdbcCatalogConnectorSmokeTest() { super(new IcebergConfig().getFileFormat().toIceberg()); @@ -46,6 +57,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) protected QueryRunner createQueryRunner() throws Exception { + warehouseLocation = Files.createTempDirectory("test_iceberg_jdbc_catalog_smoke_test").toFile(); + closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE)); TestingIcebergJdbcServer server = closeAfterClass(new TestingIcebergJdbcServer()); return IcebergQueryRunner.builder() .setIcebergProperties( @@ -53,8 +66,11 @@ protected QueryRunner createQueryRunner() .put("iceberg.file-format", format.name()) .put("iceberg.catalog.type", "jdbc") .put("iceberg.jdbc-catalog.connection-url", server.getJdbcUrl()) + .put("iceberg.jdbc-catalog.connection-user", USER) + .put("iceberg.jdbc-catalog.connection-password", PASSWORD) .put("iceberg.jdbc-catalog.catalog-name", "tpch") .put("iceberg.register-table-procedure.enabled", "true") + .put("iceberg.jdbc-catalog.default-warehouse-dir", warehouseLocation.getAbsolutePath()) .buildOrThrow()) .setInitialTables(REQUIRED_TPCH_TABLES) .build(); @@ -94,6 +110,18 @@ protected String getMetadataLocation(String tableName) throw new UnsupportedOperationException("metadata location for register_table is not supported"); } + @Override + protected String schemaPath() + { + return format("%s/%s", warehouseLocation, getSession().getSchema().orElseThrow()); + } + + @Override + protected boolean locationExists(String location) + { + return Files.exists(Path.of(location)); + } + @Override public void testRegisterTableWithTableLocation() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java index fa5abd31ea25..f802a99f54ce 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java @@ -23,8 +23,15 @@ import io.trino.tpch.TpchTable; import org.testng.annotations.Test; +import java.io.File; +import java.nio.file.Files; +import java.util.Optional; import java.util.OptionalInt; +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD; +import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER; import static io.trino.tpch.TpchTable.LINE_ITEM; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -32,6 +39,8 @@ public class TestIcebergJdbcConnectorTest extends BaseIcebergConnectorTest { + private File warehouseLocation; + public TestIcebergJdbcConnectorTest() { super(new IcebergConfig().getFileFormat()); @@ -41,14 +50,20 @@ public TestIcebergJdbcConnectorTest() protected QueryRunner createQueryRunner() throws Exception { + warehouseLocation = Files.createTempDirectory("test_iceberg_jdbc_connector_test").toFile(); + closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE)); TestingIcebergJdbcServer server = closeAfterClass(new TestingIcebergJdbcServer()); return IcebergQueryRunner.builder() + .setBaseDataDir(Optional.of(warehouseLocation.toPath())) .setIcebergProperties( ImmutableMap.builder() .put("iceberg.file-format", format.name()) .put("iceberg.catalog.type", "jdbc") .put("iceberg.jdbc-catalog.connection-url", server.getJdbcUrl()) + .put("iceberg.jdbc-catalog.connection-user", USER) + .put("iceberg.jdbc-catalog.connection-password", PASSWORD) .put("iceberg.jdbc-catalog.catalog-name", "tpch") + .put("iceberg.jdbc-catalog.default-warehouse-dir", warehouseLocation.toPath().resolve("iceberg_data").toFile().getAbsolutePath()) .buildOrThrow()) .setInitialTables(ImmutableList.>builder() .addAll(REQUIRED_TPCH_TABLES) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java index e5c39a3c39b1..6e797a672034 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestingIcebergJdbcServer.java @@ -29,8 +29,8 @@ public class TestingIcebergJdbcServer implements Closeable { - private static final String USER = "test"; - private static final String PASSWORD = "test"; + public static final String USER = "test"; + public static final String PASSWORD = "test"; private static final String DATABASE = "tpch"; private final PostgreSQLContainer dockerContainer; @@ -52,7 +52,7 @@ public TestingIcebergJdbcServer() public void execute(@Language("SQL") String sql) { - try (Connection connection = DriverManager.getConnection(getJdbcUrl()); + try (Connection connection = DriverManager.getConnection(getJdbcUrl(), USER, PASSWORD); Statement statement = connection.createStatement()) { statement.execute(sql); } @@ -64,12 +64,10 @@ public void execute(@Language("SQL") String sql) public String getJdbcUrl() { return format( - "jdbc:postgresql://%s:%s/%s?user=%s&password=%s¤tSchema=tpch,public", + "jdbc:postgresql://%s:%s/%s", dockerContainer.getHost(), dockerContainer.getMappedPort(POSTGRESQL_PORT), - DATABASE, - USER, - PASSWORD); + DATABASE); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConnectorSmokeTest.java index 42d2e3296adc..a290e565147d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConnectorSmokeTest.java @@ -25,16 +25,20 @@ import org.assertj.core.util.Files; import java.io.File; +import java.nio.file.Path; import java.util.Optional; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestIcebergRestCatalogConnectorSmokeTest extends BaseIcebergConnectorSmokeTest { + private File warehouseLocation; + public TestIcebergRestCatalogConnectorSmokeTest() { super(new IcebergConfig().getFileFormat().toIceberg()); @@ -56,7 +60,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) protected QueryRunner createQueryRunner() throws Exception { - File warehouseLocation = Files.newTemporaryFolder(); + warehouseLocation = Files.newTemporaryFolder(); closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE)); Catalog backend = backendCatalog(warehouseLocation); @@ -116,6 +120,18 @@ protected String getMetadataLocation(String tableName) throw new UnsupportedOperationException("metadata location for register_table is not supported"); } + @Override + protected String schemaPath() + { + return format("%s/%s", warehouseLocation, getSession().getSchema()); + } + + @Override + protected boolean locationExists(String location) + { + return java.nio.file.Files.exists(Path.of(location)); + } + @Override public void testRegisterTableWithTableLocation() { diff --git a/plugin/trino-iceberg/src/test/resources/hdp3.1-core-site.xml.abfs-template b/plugin/trino-iceberg/src/test/resources/hdp3.1-core-site.xml.abfs-template new file mode 100644 index 000000000000..66d5347271fb --- /dev/null +++ b/plugin/trino-iceberg/src/test/resources/hdp3.1-core-site.xml.abfs-template @@ -0,0 +1,99 @@ + + + + + fs.defaultFS + hdfs://hadoop-master:9000 + + + + fs.azure.account.key.%ABFS_ACCOUNT%.dfs.core.windows.net + %ABFS_ACCESS_KEY% + + + + + hadoop.proxyuser.oozie.hosts + * + + + hadoop.proxyuser.oozie.groups + * + + + + + hadoop.proxyuser.httpfs.hosts + * + + + hadoop.proxyuser.httpfs.groups + * + + + + + hadoop.proxyuser.llama.hosts + * + + + hadoop.proxyuser.llama.groups + * + + + + + hadoop.proxyuser.hue.hosts + * + + + hadoop.proxyuser.hue.groups + * + + + + + hadoop.proxyuser.mapred.hosts + * + + + hadoop.proxyuser.mapred.groups + * + + + + + hadoop.proxyuser.hive.hosts + * + + + + hadoop.proxyuser.hive.groups + * + + + + + hadoop.proxyuser.hdfs.groups + * + + + + hadoop.proxyuser.hdfs.hosts + * + + + diff --git a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryMetadata.java b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryMetadata.java index 539626db2b5a..2a2a1e91ad10 100644 --- a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryMetadata.java +++ b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryMetadata.java @@ -203,10 +203,13 @@ public Map> listTableColumns(ConnectorSess @Override public synchronized Iterator streamTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { - return tables.values().stream() + // This list must be materialized before returning, otherwise the iterator could throw a ConcurrentModificationException + // if another thread modifies the tables map before the iterator is fully consumed + List columnsMetadata = tables.values().stream() .filter(table -> prefix.matches(table.getSchemaTableName())) .map(tableInfo -> TableColumnsMetadata.forTable(tableInfo.getSchemaTableName(), tableInfo.getMetadata().getColumns())) - .iterator(); + .collect(toImmutableList()); + return columnsMetadata.iterator(); } @Override diff --git a/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClient.java b/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClient.java index 2f14125eed2f..7423209b93b6 100644 --- a/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClient.java +++ b/plugin/trino-mysql/src/main/java/io/trino/plugin/mysql/MySqlClient.java @@ -89,6 +89,7 @@ import java.sql.SQLSyntaxErrorException; import java.sql.Types; import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.AbstractMap.SimpleEntry; import java.util.Collection; import java.util.List; @@ -496,7 +497,7 @@ public boolean isNull(ResultSet resultSet, int columnIndex) throws SQLException { // super calls ResultSet#getObject(), which for TIMESTAMP type returns java.sql.Timestamp, for which the conversion can fail if the value isn't a valid instant in server's time zone. - resultSet.getObject(columnIndex, String.class); + resultSet.getObject(columnIndex, LocalDateTime.class); return resultSet.wasNull(); } diff --git a/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlTypeMapping.java b/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlTypeMapping.java index 00a41b0efbac..0f1b41b2b2f5 100644 --- a/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlTypeMapping.java +++ b/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/TestMySqlTypeMapping.java @@ -19,6 +19,7 @@ import io.trino.plugin.jdbc.UnsupportedTypeHandling; import io.trino.spi.type.TimeZoneKey; import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; import io.trino.testing.TestingSession; import io.trino.testing.datatype.CreateAndInsertDataSetup; @@ -34,8 +35,12 @@ import org.testng.annotations.Test; import java.math.RoundingMode; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; import java.time.LocalDate; import java.time.ZoneId; +import java.util.Map; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; @@ -68,6 +73,7 @@ import static java.math.RoundingMode.UNNECESSARY; import static java.time.ZoneOffset.UTC; import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestMySqlTypeMapping @@ -930,6 +936,34 @@ public Object[][] sessionZonesDataProvider() }; } + @Test + public void testZeroTimestamp() + throws Exception + { + String connectionUrl = mySqlServer.getJdbcUrl() + "&zeroDateTimeBehavior=convertToNull"; + + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(getSession()).build(); + queryRunner.installPlugin(new MySqlPlugin()); + Map properties = ImmutableMap.builder() + .put("connection-url", connectionUrl) + .put("connection-user", mySqlServer.getUsername()) + .put("connection-password", mySqlServer.getPassword()) + .buildOrThrow(); + queryRunner.createCatalog("mysql", "mysql", properties); + + try (Connection connection = DriverManager.getConnection(connectionUrl, mySqlServer.getUsername(), mySqlServer.getPassword()); + Statement statement = connection.createStatement()) { + statement.execute("CREATE TABLE tpch.test_zero_ts(col_dt datetime, col_ts timestamp)"); + statement.execute("SET sql_mode=''"); + statement.execute("INSERT INTO tpch.test_zero_ts(col_dt, col_ts) VALUES ('0000-00-00 00:00:00', '0000-00-00 00:00:00')"); + + assertThat(queryRunner.execute("SELECT col_dt FROM test_zero_ts").getOnlyValue()).isNull(); + assertThat(queryRunner.execute("SELECT col_ts FROM test_zero_ts").getOnlyValue()).isNull(); + + statement.execute("DROP TABLE tpch.test_zero_ts"); + } + } + @Test public void testJson() { diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionTest.java index f042544bd998..643c2334ac9a 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/hive/TestHiveFaultTolerantExecutionTest.java @@ -20,6 +20,7 @@ import io.trino.plugin.hive.HiveQueryRunner; import io.trino.testing.FaultTolerantExecutionConnectorTestHelper; import io.trino.testing.QueryRunner; +import org.testng.annotations.Test; import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; import static io.trino.testing.TestingNames.randomNameSuffix; @@ -56,4 +57,35 @@ protected Session getSession() .setCatalogSessionProperty(session.getCatalog().orElseThrow(), "non_transactional_optimize_enabled", "true") .build(); } + + @Test(timeOut = 120_000) + public void testPotentialDeadlocks() + { + // create a highly granular table to ensure the number of splits is high + assertUpdate(""" + CREATE TABLE lineitem_bucketed_partitioned + WITH (format = 'TEXTFILE', partitioned_by = ARRAY['p'], bucketed_by=array['b'], bucket_count=3) + AS + SELECT *, partkey b, orderkey % 100 p + FROM tpch.tiny.lineitem + """, + 60175); + // execute a query that schedules many concurrent stages in parallel to detect potential scheduler deadlocks + try { + assertQuery( + """ + SELECT + (SELECT count(orderkey) FROM lineitem_bucketed_partitioned) + + (SELECT count(linenumber) FROM lineitem_bucketed_partitioned) + + (SELECT count(quantity) FROM lineitem_bucketed_partitioned) + + (SELECT count(extendedprice) FROM lineitem_bucketed_partitioned) + + (SELECT count(DISTINCT partkey) FROM lineitem_bucketed_partitioned) + + (SELECT count(DISTINCT suppkey) FROM lineitem_bucketed_partitioned) c + """, + "SELECT 242800"); + } + finally { + assertUpdate("DROP TABLE lineitem_bucketed_partitioned"); + } + } } diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeCassandra.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeCassandra.java similarity index 83% rename from testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeCassandra.java rename to testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeCassandra.java index 808e4db943a4..cd3786c87d01 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeCassandra.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeCassandra.java @@ -18,7 +18,7 @@ import io.trino.tests.product.launcher.env.DockerContainer; import io.trino.tests.product.launcher.env.Environment; import io.trino.tests.product.launcher.env.EnvironmentProvider; -import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.StandardMultinode; import io.trino.tests.product.launcher.env.common.TestsEnvironment; import io.trino.tests.product.launcher.testcontainers.PortBinder; import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; @@ -28,24 +28,21 @@ import java.time.Duration; import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts; -import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC; import static java.util.Objects.requireNonNull; import static org.testcontainers.utility.MountableFile.forHostPath; @TestsEnvironment -public final class EnvSinglenodeCassandra +public final class EnvMultinodeCassandra extends EnvironmentProvider { private final DockerFiles dockerFiles; private final PortBinder portBinder; - - public static final String CONTAINER_TRINO_CASSANDRA_PROPERTIES = CONTAINER_TRINO_ETC + "/catalog/cassandra.properties"; public static final int CASSANDRA_PORT = 9042; @Inject - protected EnvSinglenodeCassandra(DockerFiles dockerFiles, PortBinder portBinder, Standard standard) + protected EnvMultinodeCassandra(DockerFiles dockerFiles, PortBinder portBinder, StandardMultinode standardMultinode) { - super(ImmutableList.of(standard)); + super(ImmutableList.of(standardMultinode)); this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null"); this.portBinder = requireNonNull(portBinder, "portBinder is null"); } @@ -54,7 +51,7 @@ protected EnvSinglenodeCassandra(DockerFiles dockerFiles, PortBinder portBinder, public void extendEnvironment(Environment.Builder builder) { builder.addContainer(createCassandra()); - builder.addConnector("cassandra", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-cassandra/cassandra.properties"))); + builder.addConnector("cassandra", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-cassandra/cassandra.properties"))); } private DockerContainer createCassandra() diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeMysql.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeMysql.java similarity index 88% rename from testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeMysql.java rename to testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeMysql.java index b880336019cb..2f78771d7d48 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeMysql.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeMysql.java @@ -18,7 +18,7 @@ import io.trino.tests.product.launcher.env.DockerContainer; import io.trino.tests.product.launcher.env.Environment; import io.trino.tests.product.launcher.env.EnvironmentProvider; -import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.StandardMultinode; import io.trino.tests.product.launcher.env.common.TestsEnvironment; import io.trino.tests.product.launcher.testcontainers.PortBinder; import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; @@ -30,7 +30,7 @@ import static org.testcontainers.utility.MountableFile.forHostPath; @TestsEnvironment -public final class EnvSinglenodeMysql +public final class EnvMultinodeMysql extends EnvironmentProvider { // Use non-default MySQL port to avoid conflicts with locally installed MySQL if any. @@ -40,9 +40,9 @@ public final class EnvSinglenodeMysql private final PortBinder portBinder; @Inject - public EnvSinglenodeMysql(Standard standard, DockerFiles dockerFiles, PortBinder portBinder) + public EnvMultinodeMysql(StandardMultinode standardMultinode, DockerFiles dockerFiles, PortBinder portBinder) { - super(ImmutableList.of(standard)); + super(ImmutableList.of(standardMultinode)); this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null"); this.portBinder = requireNonNull(portBinder, "portBinder is null"); } @@ -50,7 +50,7 @@ public EnvSinglenodeMysql(Standard standard, DockerFiles dockerFiles, PortBinder @Override public void extendEnvironment(Environment.Builder builder) { - builder.addConnector("mysql", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-mysql/mysql.properties"))); + builder.addConnector("mysql", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-mysql/mysql.properties"))); builder.addContainer(createMySql()); } diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodePostgresql.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodePostgresql.java similarity index 88% rename from testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodePostgresql.java rename to testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodePostgresql.java index b557d1ccbec4..b3bac48eeb03 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodePostgresql.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodePostgresql.java @@ -19,7 +19,7 @@ import io.trino.tests.product.launcher.env.DockerContainer; import io.trino.tests.product.launcher.env.Environment; import io.trino.tests.product.launcher.env.EnvironmentProvider; -import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.StandardMultinode; import io.trino.tests.product.launcher.env.common.TestsEnvironment; import io.trino.tests.product.launcher.testcontainers.PortBinder; import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; @@ -31,7 +31,7 @@ import static org.testcontainers.utility.MountableFile.forHostPath; @TestsEnvironment -public final class EnvSinglenodePostgresql +public final class EnvMultinodePostgresql extends EnvironmentProvider { // Use non-default PostgreSQL port to avoid conflicts with locally installed PostgreSQL if any. @@ -41,9 +41,9 @@ public final class EnvSinglenodePostgresql private final PortBinder portBinder; @Inject - public EnvSinglenodePostgresql(Standard standard, DockerFiles dockerFiles, PortBinder portBinder) + public EnvMultinodePostgresql(StandardMultinode standardMultinode, DockerFiles dockerFiles, PortBinder portBinder) { - super(ImmutableList.of(standard)); + super(ImmutableList.of(standardMultinode)); this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null"); this.portBinder = requireNonNull(portBinder, "portBinder is null"); } @@ -53,7 +53,7 @@ public void extendEnvironment(Environment.Builder builder) { builder.addConnector( "postgresql", - forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-postgresql/postgresql.properties"))); + forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-postgresql/postgresql.properties"))); builder.addContainer(createPostgreSql()); } diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSqlserver.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeSqlserver.java similarity index 86% rename from testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSqlserver.java rename to testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeSqlserver.java index 98563b79114a..a8f392486b9b 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSqlserver.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeSqlserver.java @@ -19,7 +19,7 @@ import io.trino.tests.product.launcher.env.DockerContainer; import io.trino.tests.product.launcher.env.Environment; import io.trino.tests.product.launcher.env.EnvironmentProvider; -import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.StandardMultinode; import io.trino.tests.product.launcher.env.common.TestsEnvironment; import io.trino.tests.product.launcher.testcontainers.PortBinder; import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; @@ -31,7 +31,7 @@ import static org.testcontainers.utility.MountableFile.forHostPath; @TestsEnvironment -public final class EnvSinglenodeSqlserver +public final class EnvMultinodeSqlserver extends EnvironmentProvider { public static final int SQLSERVER_PORT = 1433; @@ -40,9 +40,9 @@ public final class EnvSinglenodeSqlserver private final PortBinder portBinder; @Inject - public EnvSinglenodeSqlserver(Standard standard, DockerFiles dockerFiles, PortBinder portBinder) + public EnvMultinodeSqlserver(StandardMultinode standardMultinode, DockerFiles dockerFiles, PortBinder portBinder) { - super(ImmutableList.of(standard)); + super(ImmutableList.of(standardMultinode)); this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null"); this.portBinder = requireNonNull(portBinder, "portBinder is null"); } @@ -50,7 +50,7 @@ public EnvSinglenodeSqlserver(Standard standard, DockerFiles dockerFiles, PortBi @Override public void extendEnvironment(Environment.Builder builder) { - builder.addConnector("sqlserver", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-sqlserver/sqlserver.properties"))); + builder.addConnector("sqlserver", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/multinode-sqlserver/sqlserver.properties"))); builder.addContainer(createSqlServer()); } diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenode.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenode.java deleted file mode 100644 index 114d4ed1e2ea..000000000000 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenode.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.tests.product.launcher.env.environment; - -import com.google.common.collect.ImmutableList; -import io.trino.tests.product.launcher.env.Environment; -import io.trino.tests.product.launcher.env.EnvironmentProvider; -import io.trino.tests.product.launcher.env.common.Hadoop; -import io.trino.tests.product.launcher.env.common.Standard; -import io.trino.tests.product.launcher.env.common.TestsEnvironment; - -import javax.inject.Inject; - -@TestsEnvironment -public final class EnvSinglenode - extends EnvironmentProvider -{ - @Inject - public EnvSinglenode(Standard standard, Hadoop hadoop) - { - super(ImmutableList.of(standard, hadoop)); - } - - @Override - public void extendEnvironment(Environment.Builder builder) {} -} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite2.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite2.java index 3ab0c9322156..54570fe3d20a 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite2.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite2.java @@ -15,7 +15,7 @@ import com.google.common.collect.ImmutableList; import io.trino.tests.product.launcher.env.EnvironmentConfig; -import io.trino.tests.product.launcher.env.environment.EnvSinglenode; +import io.trino.tests.product.launcher.env.environment.EnvMultinode; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeHdfsImpersonation; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeKerberosHdfsImpersonation; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeKerberosHdfsNoImpersonation; @@ -34,7 +34,7 @@ public class Suite2 public List getTestRuns(EnvironmentConfig config) { return ImmutableList.of( - testOnEnvironment(EnvSinglenode.class) + testOnEnvironment(EnvMultinode.class) .withGroups("configured_features", "hdfs_no_impersonation") .build(), testOnEnvironment(EnvSinglenodeKerberosHdfsNoImpersonation.class) diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java index 153fd21b53e2..c8b81e9d62ec 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java @@ -17,10 +17,10 @@ import io.trino.tests.product.launcher.env.EnvironmentConfig; import io.trino.tests.product.launcher.env.EnvironmentDefaults; import io.trino.tests.product.launcher.env.environment.EnvMultinodeKerberosKudu; +import io.trino.tests.product.launcher.env.environment.EnvMultinodePostgresql; +import io.trino.tests.product.launcher.env.environment.EnvMultinodeSqlserver; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeKerberosHdfsImpersonationCrossRealm; -import io.trino.tests.product.launcher.env.environment.EnvSinglenodePostgresql; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkHive; -import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSqlserver; import io.trino.tests.product.launcher.env.environment.EnvTwoKerberosHives; import io.trino.tests.product.launcher.env.environment.EnvTwoMixedHives; import io.trino.tests.product.launcher.suite.Suite; @@ -40,10 +40,10 @@ public List getTestRuns(EnvironmentConfig config) verify(config.getHadoopBaseImage().equals(EnvironmentDefaults.HADOOP_BASE_IMAGE), "The suite should be run with default HADOOP_BASE_IMAGE. Leave HADOOP_BASE_IMAGE unset."); return ImmutableList.of( - testOnEnvironment(EnvSinglenodePostgresql.class) + testOnEnvironment(EnvMultinodePostgresql.class) .withGroups("configured_features", "postgresql") .build(), - testOnEnvironment(EnvSinglenodeSqlserver.class) + testOnEnvironment(EnvMultinodeSqlserver.class) .withGroups("configured_features", "sqlserver") .build(), testOnEnvironment(EnvSinglenodeSparkHive.class) diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteCassandra.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteCassandra.java index d0017e696ae5..99445b266a97 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteCassandra.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteCassandra.java @@ -15,7 +15,7 @@ import com.google.common.collect.ImmutableList; import io.trino.tests.product.launcher.env.EnvironmentConfig; -import io.trino.tests.product.launcher.env.environment.EnvSinglenodeCassandra; +import io.trino.tests.product.launcher.env.environment.EnvMultinodeCassandra; import io.trino.tests.product.launcher.suite.Suite; import io.trino.tests.product.launcher.suite.SuiteTestRun; @@ -30,7 +30,7 @@ public class SuiteCassandra public List getTestRuns(EnvironmentConfig config) { return ImmutableList.of( - testOnEnvironment(EnvSinglenodeCassandra.class) + testOnEnvironment(EnvMultinodeCassandra.class) .withGroups("configured_features", "cassandra") .build()); } diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteHmsOnly.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteHmsOnly.java index e9f2d5f1b49f..163c0f6336f9 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteHmsOnly.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteHmsOnly.java @@ -15,7 +15,7 @@ import com.google.common.collect.ImmutableList; import io.trino.tests.product.launcher.env.EnvironmentConfig; -import io.trino.tests.product.launcher.env.environment.EnvSinglenode; +import io.trino.tests.product.launcher.env.environment.EnvMultinode; import io.trino.tests.product.launcher.suite.Suite; import io.trino.tests.product.launcher.suite.SuiteTestRun; @@ -30,7 +30,7 @@ public class SuiteHmsOnly public List getTestRuns(EnvironmentConfig config) { return ImmutableList.of( - testOnEnvironment(EnvSinglenode.class) + testOnEnvironment(EnvMultinode.class) .withGroups("configured_features", "hms_only") .build()); } diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteMysql.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteMysql.java index 38a7e07514ab..b0133df11827 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteMysql.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteMysql.java @@ -16,7 +16,7 @@ import com.google.common.collect.ImmutableList; import io.trino.tests.product.launcher.env.EnvironmentConfig; import io.trino.tests.product.launcher.env.environment.EnvMultinodeMariadb; -import io.trino.tests.product.launcher.env.environment.EnvSinglenodeMysql; +import io.trino.tests.product.launcher.env.environment.EnvMultinodeMysql; import io.trino.tests.product.launcher.suite.Suite; import io.trino.tests.product.launcher.suite.SuiteTestRun; @@ -31,7 +31,7 @@ public class SuiteMysql public List getTestRuns(EnvironmentConfig config) { return ImmutableList.of( - testOnEnvironment(EnvSinglenodeMysql.class) + testOnEnvironment(EnvMultinodeMysql.class) .withGroups("configured_features", "mysql") .build(), testOnEnvironment(EnvMultinodeMariadb.class) diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteStorageFormatsDetailed.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteStorageFormatsDetailed.java index 5a2b6b6da75c..664e558c8068 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteStorageFormatsDetailed.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteStorageFormatsDetailed.java @@ -15,7 +15,7 @@ import com.google.common.collect.ImmutableList; import io.trino.tests.product.launcher.env.EnvironmentConfig; -import io.trino.tests.product.launcher.env.environment.EnvSinglenode; +import io.trino.tests.product.launcher.env.environment.EnvMultinode; import io.trino.tests.product.launcher.suite.Suite; import io.trino.tests.product.launcher.suite.SuiteTestRun; @@ -30,7 +30,7 @@ public class SuiteStorageFormatsDetailed public List getTestRuns(EnvironmentConfig config) { return ImmutableList.of( - testOnEnvironment(EnvSinglenode.class) + testOnEnvironment(EnvMultinode.class) .withGroups("configured_features", "storage_formats_detailed", "hive_compression") .build()); } diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-cassandra/cassandra.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-cassandra/cassandra.properties similarity index 100% rename from testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-cassandra/cassandra.properties rename to testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-cassandra/cassandra.properties diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-mysql/mysql.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-mysql/mysql.properties similarity index 100% rename from testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-mysql/mysql.properties rename to testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-mysql/mysql.properties diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-postgresql/postgresql.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-postgresql/postgresql.properties similarity index 100% rename from testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-postgresql/postgresql.properties rename to testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-postgresql/postgresql.properties diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-sqlserver/sqlserver.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-sqlserver/sqlserver.properties similarity index 100% rename from testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-sqlserver/sqlserver.properties rename to testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-sqlserver/sqlserver.properties diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties index b4ec149f6e6d..7c21315aef20 100644 --- a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-jdbc-catalog/iceberg.properties @@ -1,6 +1,8 @@ connector.name=iceberg iceberg.catalog.type=jdbc -iceberg.jdbc-catalog.connection-url=jdbc:postgresql://postgresql:25432/test?user=test&password=test +iceberg.jdbc-catalog.connection-url=jdbc:postgresql://postgresql:25432/test +iceberg.jdbc-catalog.connection-user=test +iceberg.jdbc-catalog.connection-password=test iceberg.jdbc-catalog.catalog-name=iceberg_test iceberg.jdbc-catalog.default-warehouse-dir=hdfs://hadoop-master:9000/user/hive/warehouse hive.hdfs.socks-proxy=hadoop-master:1180 diff --git a/testing/trino-product-tests-launcher/src/test/java/io/trino/tests/product/launcher/env/TestConfigurations.java b/testing/trino-product-tests-launcher/src/test/java/io/trino/tests/product/launcher/env/TestConfigurations.java index 0a19db56f05f..2f5a744d0153 100644 --- a/testing/trino-product-tests-launcher/src/test/java/io/trino/tests/product/launcher/env/TestConfigurations.java +++ b/testing/trino-product-tests-launcher/src/test/java/io/trino/tests/product/launcher/env/TestConfigurations.java @@ -13,7 +13,7 @@ */ package io.trino.tests.product.launcher.env; -import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSqlserver; +import io.trino.tests.product.launcher.env.environment.EnvMultinodeSqlserver; import io.trino.tests.product.launcher.suite.suites.Suite1; import io.trino.tests.product.launcher.suite.suites.Suite6NonGeneric; import io.trino.tests.product.launcher.suite.suites.SuiteTpcds; @@ -38,7 +38,7 @@ public void testCanonicalEnvironmentName() assertThat(canonicalEnvironmentName("DuzaAla")).isEqualTo("duza-ala"); assertThat(canonicalEnvironmentName("EnvDuzaAla")).isEqualTo("duza-ala"); // real life example - assertThat(canonicalEnvironmentName(EnvSinglenodeSqlserver.class.getSimpleName())).isEqualTo("singlenode-sqlserver"); + assertThat(canonicalEnvironmentName(EnvMultinodeSqlserver.class.getSimpleName())).isEqualTo("multinode-sqlserver"); // document current state; this behavior is neither intentional or (currently) forbidden assertThat(canonicalEnvironmentName("duza----Ala")).isEqualTo("duza-ala"); diff --git a/testing/trino-product-tests/README.md b/testing/trino-product-tests/README.md index df097789f7c2..f8c30a763ffd 100644 --- a/testing/trino-product-tests/README.md +++ b/testing/trino-product-tests/README.md @@ -78,9 +78,6 @@ testing/bin/ptl test run --environment \ connections on the HTTPS port (7778), and both coordinator and worker traffic is encrypted and kerberized. For multinode-tls-kerberos, the default configuration is 1 coordinator and 2 workers. -- **singlenode** - pseudo-distributed Hadoop installation running on a - single Docker container and a single node installation of Trino also running - on a single Docker container. - **singlenode-hdfs-impersonation** - HDFS impersonation enabled on top of the environment in singlenode profile. Trino impersonates the user who is running the query when accessing HDFS. @@ -169,12 +166,12 @@ Following table describes the profile specific test categories, the correspondin test group names and the profile(s) which must be used to run tests in those test groups. -| Tests | Test Group | Profiles | -| ----------------------|---------------------------| -------------------------------------------------------------------------------- | -| Authorization | ``authorization`` | ``singlenode-kerberos-hdfs-impersonation`` | -| HDFS impersonation | ``hdfs_impersonation`` | ``singlenode-hdfs-impersonation``, ``singlenode-kerberos-hdfs-impersonation`` | -| No HDFS impersonation | ``hdfs_no_impersonation`` | ``singlenode``, ``singlenode-kerberos-hdfs-no_impersonation`` | -| LDAP | ``ldap`` | ``singlenode-ldap`` | +| Tests | Test Group | Profiles | +| ----------------------|---------------------------|-------------------------------------------------------------------------------| +| Authorization | ``authorization`` | ``singlenode-kerberos-hdfs-impersonation`` | +| HDFS impersonation | ``hdfs_impersonation`` | ``singlenode-hdfs-impersonation``, ``singlenode-kerberos-hdfs-impersonation`` | +| No HDFS impersonation | ``hdfs_no_impersonation`` | ``multinode``, ``singlenode-kerberos-hdfs-no_impersonation`` | +| LDAP | ``ldap`` | ``singlenode-ldap`` | Below is a list of commands that explain how to run these profile specific tests and also the entire test suite: diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/mysql/TestJdbcDynamicFilteringJmx.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/mysql/TestJdbcDynamicFilteringJmx.java index 3d357d00f3b3..8239918a612c 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/mysql/TestJdbcDynamicFilteringJmx.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/mysql/TestJdbcDynamicFilteringJmx.java @@ -50,9 +50,9 @@ public void testDynamicFilteringStats() assertThat(onTrino().executeQuery(format("SELECT COUNT(*) FROM mysql.%s a JOIN tpch.tiny.nation b ON a.nationkey = b.nationkey AND b.name = 'INDIA'", TABLE_NAME))) .containsOnly(row(1)); - assertThat(onTrino().executeQuery("SELECT \"completeddynamicfilters.totalcount\" FROM jmx.current.\"io.trino.plugin.jdbc:name=mysql,type=dynamicfilteringstats\"")) + assertThat(onTrino().executeQuery("SELECT \"completeddynamicfilters.totalcount\" FROM jmx.current.\"io.trino.plugin.jdbc:name=mysql,type=dynamicfilteringstats\" WHERE node = 'presto-master'")) .containsOnly(row(1)); - assertThat(onTrino().executeQuery("SELECT \"totaldynamicfilters.totalcount\" FROM jmx.current.\"io.trino.plugin.jdbc:name=mysql,type=dynamicfilteringstats\"")) + assertThat(onTrino().executeQuery("SELECT \"totaldynamicfilters.totalcount\" FROM jmx.current.\"io.trino.plugin.jdbc:name=mysql,type=dynamicfilteringstats\" WHERE node = 'presto-master'")) .containsOnly(row(1)); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java index a90f8f20e6b5..b9028155877b 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java @@ -40,12 +40,6 @@ public static QueryExecutor onCompatibilityTestServer() return connectToTrino("compatibility-test-server"); } - @Deprecated - public static QueryExecutor connectToPresto(String prestoConfig) - { - return connectToTrino(prestoConfig); - } - public static QueryExecutor connectToTrino(String trinoConfig) { return new QueryExecutor() diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java index 8f94a52d80d5..258183489d7a 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java @@ -26,6 +26,7 @@ import io.trino.execution.SqlTaskManager; import io.trino.execution.TaskId; import io.trino.execution.TaskInfo; +import io.trino.execution.TaskState; import io.trino.execution.warnings.WarningCollector; import io.trino.memory.LocalMemoryManager; import io.trino.memory.MemoryPool; @@ -66,12 +67,14 @@ import java.util.OptionalLong; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY; +import static io.trino.execution.StageInfo.getAllStages; import static io.trino.sql.ParsingUtil.createParsingOptions; import static io.trino.sql.SqlFormatter.formatSql; import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy; @@ -192,11 +195,11 @@ private void checkQueryInfosFinal() for (BasicQueryInfo basicQueryInfo : queryManager.getQueries()) { QueryId queryId = basicQueryInfo.getQueryId(); if (!basicQueryInfo.getState().isDone()) { - fail("query is expected to be in done state: " + basicQueryInfo.getQuery()); + fail("query is expected to be in a done state\n\n" + createQueryDebuggingSummary(basicQueryInfo, queryManager.getFullQueryInfo(queryId))); } QueryInfo queryInfo = queryManager.getFullQueryInfo(queryId); if (!queryInfo.isFinalQueryInfo()) { - fail("QueryInfo is expected to be final: " + basicQueryInfo.getQuery()); + fail("QueryInfo for is expected to be final\n\n" + createQueryDebuggingSummary(basicQueryInfo, queryInfo)); } } })); @@ -216,20 +219,53 @@ private void checkTasksDone() for (TaskInfo taskInfo : taskInfos) { TaskId taskId = taskInfo.getTaskStatus().getTaskId(); QueryId queryId = taskId.getQueryId(); - String query = "unknown"; - try { - query = queryManager.getQueryInfo(queryId).getQuery(); - } - catch (NoSuchElementException ignored) { - } - if (!taskInfo.getTaskStatus().getState().isDone()) { - fail("Task is expected to be in done state. TaskId: %s, QueryId: %s, Query: %s ".formatted(taskId, queryId, query)); + TaskState taskState = taskInfo.getTaskStatus().getState(); + if (!taskState.isDone()) { + try { + BasicQueryInfo basicQueryInfo = queryManager.getQueryInfo(queryId); + QueryInfo queryInfo = queryManager.getFullQueryInfo(queryId); + String querySummary = createQueryDebuggingSummary(basicQueryInfo, queryInfo); + fail("Task is expected to be in done state, found: %s - TaskId: %s, QueryId: %s".formatted(taskState, taskId, queryId) + "\n\n" + querySummary); + } + catch (NoSuchElementException ignored) { + } + fail("Task is expected to be in done state, found: %s - TaskId: %s, QueryId: %s, Query: unknown".formatted(taskState, taskId, queryId)); } } } })); } + private static String createQueryDebuggingSummary(BasicQueryInfo basicQueryInfo, QueryInfo queryInfo) + { + String queryDetails = format("Query %s [%s]: %s", basicQueryInfo.getQueryId(), basicQueryInfo.getState(), basicQueryInfo.getQuery()); + if (queryInfo.getOutputStage().isEmpty()) { + return queryDetails + " -- "; + } + else { + return queryDetails + getAllStages(queryInfo.getOutputStage()).stream() + .map(stageInfo -> { + String stageDetail = format("Stage %s [%s]", stageInfo.getStageId(), stageInfo.getState()); + if (stageInfo.getTasks().isEmpty()) { + return stageDetail; + } + return stageDetail + stageInfo.getTasks().stream() + .map(TaskInfo::getTaskStatus) + .map(task -> { + String taskDetail = format("Task %s [%s]", task.getTaskId(), task.getState()); + if (task.getFailures().isEmpty()) { + return taskDetail; + } + return " -- Failures: " + task.getFailures().stream() + .map(failure -> format("%s %s: %s", failure.getErrorCode(), failure.getType(), failure.getMessage())) + .collect(Collectors.joining(", ", "[", "]")); + }) + .collect(Collectors.joining("\n\t\t", ":\n\t\t", "")); + }) + .collect(Collectors.joining("\n\n\t", "\nStages:\n\t", "")); + } + } + @Test public void ensureTestNamingConvention() { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorSmokeTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorSmokeTest.java index 7a8278d6c002..a30ef33c5e8b 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorSmokeTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorSmokeTest.java @@ -437,6 +437,19 @@ public void testRenameTableAcrossSchemas() assertUpdate("DROP SCHEMA " + schemaName); } + /** + * This seemingly duplicate test of {@link BaseConnectorTest#testShowInformationSchemaTables()} + * is used in the context of this class in order to be able to test + * against a wider range of connector configurations. + */ + @Test + public void testShowInformationSchemaTables() + { + assertThat(query("SHOW TABLES FROM information_schema")) + .skippingTypesCheck() + .containsAll("VALUES 'applicable_roles', 'columns', 'enabled_roles', 'roles', 'schemata', 'table_privileges', 'tables', 'views'"); + } + @Test public void testSelectInformationSchemaTables() { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index 15bf008cd24c..26e215fc9686 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -775,6 +775,14 @@ public void testDescribeTable() assertEquals(actualColumns, expectedColumns); } + @Test + public void testShowInformationSchemaTables() + { + assertThat(query("SHOW TABLES FROM information_schema")) + .skippingTypesCheck() + .containsAll("VALUES 'applicable_roles', 'columns', 'enabled_roles', 'roles', 'schemata', 'table_privileges', 'tables', 'views'"); + } + @Test public void testView() { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java index d2a3cc20c1b3..48c1af0e5d7e 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java @@ -37,6 +37,9 @@ public static Map getExtraProperties() .put("query.executor-pool-size", "10") // enable exchange compression to follow production deployment recommendations .put("exchange.compression-enabled", "true") + .put("max-tasks-waiting-for-execution-per-query", "2") + .put("max-tasks-waiting-for-node-per-stage", "2") + .put("query.schedule-split-batch-size", "2") .buildOrThrow(); } }