-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Explicitly cleanup SqlTask on worker when no longer needed #24729
base: master
Are you sure you want to change the base?
Conversation
@POST | ||
@Path("{taskId}/cleanup") | ||
public void cleanupTask( | ||
@PathParam("taskId") TaskId taskId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wrapping not needed
core/trino-main/src/main/java/io/trino/server/remotetask/RemoteTaskCleaner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/remotetask/RemoteTaskCleaner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/remotetask/RemoteTaskCleaner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/remotetask/DynamicFiltersFetcher.java
Outdated
Show resolved
Hide resolved
@@ -124,6 +128,7 @@ public synchronized void updateDynamicFiltersVersionAndFetchIfNecessary(long new | |||
private synchronized void stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is not called normally. only in erronous situations. You cannot rely on it to be executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The external entry point for DynamicFiltersFetcher
request is really updateDynamicFiltersVersionAndFetchIfNecessary
.
So the conditions for DynamicFiltersFetcher
being completed are:
- there is no pending query
- all places which could call
updateDynamicFiltersVersionAndFetchIfNecessary
are stopped (currenty it's continuous fetcher)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sopel39 @raunaqmorarka Yeah - you are right. Do you have quick clue if there is natural place where we know that DynFilFetcher finished its work. The way it is called is kinda convoluted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes:
- there is no pending request from
DynamicFilterFetcher
- all places which could call
updateDynamicFiltersVersionAndFetchIfNecessary
are stopped (it's onlyContinuousTaskStatusFetcher
atm IIRC)
core/trino-main/src/main/java/io/trino/server/remotetask/RemoteTaskCleaner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/remotetask/RemoteTaskCleaner.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/server/remotetask/RemoteTaskCleaner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add testing (with DF) to TestHttpRemoteTask
import static java.util.Objects.requireNonNull; | ||
import static java.util.concurrent.TimeUnit.MILLISECONDS; | ||
|
||
public class RemoteTaskCleaner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a separate service for this? Cannot HttpRemoteTask do cleanup itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can - just having separate class is cleaner I think. I can put that as inner class too but I do not like that too much.
5045fc2
to
362f673
Compare
Currently SqlTask objects are removed from SqlTaskManager.tasks map (cache) after timeout (15 minutes by default). Even though the object is not huge, we observed increased memory pressure up to OOM on busy clusters. With this PR entries are dropped form SqlTaskManager as soon as they are no longer needed, when coordinator will no longer query for the information
362f673
to
2b798b6
Compare
@sopel39 PTAL |
public void cleanupTask(TaskId taskId) | ||
{ | ||
requireNonNull(taskId, "taskId is null"); | ||
SqlTask sqlTask = tasks.getIfPresent(taskId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could there ever be a race condition, e.g: task was not created yet?
@@ -121,6 +124,8 @@ public synchronized void stop() | |||
future.cancel(true); | |||
future = null; | |||
} | |||
remoteTaskCleaner.markTaskStatusFetcherStopped(taskStatus.get().getState()); | |||
dynamicFiltersFetcher.noMoreUpdateRequests(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could it be just DynamicFiltersFetcher#stop
method?
@@ -253,7 +258,11 @@ void updateTaskStatus(TaskStatus newValue) | |||
onFail.accept(new TrinoException(REMOTE_TASK_MISMATCH, format("%s (%s)", REMOTE_TASK_MISMATCH_ERROR, HostAddress.fromUri(getTaskStatus().getSelf())))); | |||
} | |||
|
|||
dynamicFiltersFetcher.updateDynamicFiltersVersionAndFetchIfNecessary(newValue.getDynamicFiltersVersion()); | |||
synchronized (this) { | |||
if (running) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it happen in non-aborted query?
I think that's still wrong, because last DF update could be potentially received after ContinuousTaskStatusFetcher#stop
was called. We need that last request for DFs to happen, otherwise we loose filtering opportunity.
@@ -113,17 +118,38 @@ public synchronized void start() | |||
|
|||
public synchronized void updateDynamicFiltersVersionAndFetchIfNecessary(long newDynamicFiltersVersion) | |||
{ | |||
if (noMoreUpdateRequests) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, we need that last update to happen to fetch final DFs.
Just make dynamic filters fetcher report when it's finished.
@@ -121,6 +124,8 @@ public synchronized void stop() | |||
future.cancel(true); | |||
future = null; | |||
} | |||
remoteTaskCleaner.markTaskStatusFetcherStopped(taskStatus.get().getState()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would postpone that only after pending request is finished, so that last DF update can be processed
Currently SqlTask objects are removed from SqlTaskManager.tasks map (cache) after timeout (15 minutes by default). Even though the object is not huge, we observed increased memory pressure up to OOM on busy clusters.
With this PR entries are dropped form SqlTaskManager as soon as they are no longer needed, when coordinator will no longer query for the information
Note: I this code is prone to race condition if last worker to acknowlege task results retransmits acknowlegement. There is a chance that a this point task is already cleaned by cleanup request from coordinator, and entry will get resurected in tasks cache map. This should be extremly rare though, and not cause any problems.