diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java index 59c03d1551d..2a57d8cad1f 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java @@ -9,6 +9,7 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.context.QueryScope; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.util.AbstractScriptSession; import io.deephaven.engine.util.PythonEvaluator; @@ -70,6 +71,7 @@ public class PythonDeephavenSession extends AbstractScriptSession scope) { - super(updateGraph, threadInitializationFactory, NoOp.INSTANCE, null); + public PythonDeephavenSession( + final UpdateGraph updateGraph, + final OperationInitializer operationInitializer, + final ThreadInitializationFactory threadInitializationFactory, + final PythonScope scope) { + super(updateGraph, operationInitializer, NoOp.INSTANCE, null); evaluator = null; this.scope = (PythonScope) scope; diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index e61f03bc624..da476d4752f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -103,6 +103,7 @@ public static Builder newBuilder(final String name) { private final long defaultTargetCycleDurationMillis; private volatile long targetCycleDurationMillis; private final ThreadInitializationFactory threadInitializationFactory; + private final OperationInitializer operationInitializer; /** @@ -122,12 +123,14 @@ public PeriodicUpdateGraph( final long targetCycleDurationMillis, final long minimumCycleDurationToLogNanos, final int numUpdateThreads, - final ThreadInitializationFactory threadInitializationFactory) { + final ThreadInitializationFactory threadInitializationFactory, + final OperationInitializer operationInitializer) { super(name, allowUnitTestMode, log, minimumCycleDurationToLogNanos); this.allowUnitTestMode = allowUnitTestMode; this.defaultTargetCycleDurationMillis = targetCycleDurationMillis; this.targetCycleDurationMillis = targetCycleDurationMillis; this.threadInitializationFactory = threadInitializationFactory; + this.operationInitializer = operationInitializer; if (numUpdateThreads <= 0) { this.updateThreads = Runtime.getRuntime().availableProcessors(); @@ -342,10 +345,8 @@ public Thread newThread(@NotNull final Runnable r) { notificationProcessor = makeNotificationProcessor(); } if (refreshThread == null) { - final OperationInitializer operationInitializer = - ExecutionContext.getContext().getOperationInitializer(); refreshThread = new Thread(threadInitializationFactory.createInitializer(() -> { - configureRefreshThread(operationInitializer); + configureRefreshThread(); while (running) { Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode"); refreshTablesAndFlushNotifications(); @@ -1103,9 +1104,8 @@ private NotificationProcessorThreadFactory(@NotNull final ThreadGroup threadGrou @Override public Thread newThread(@NotNull final Runnable r) { - OperationInitializer captured = ExecutionContext.getContext().getOperationInitializer(); return super.newThread(threadInitializationFactory.createInitializer(() -> { - configureRefreshThread(captured); + configureRefreshThread(); r.run(); })); } @@ -1124,9 +1124,8 @@ private UnitTestThreadFactory() { @Override public Thread newThread(@NotNull final Runnable r) { - OperationInitializer captured = ExecutionContext.getContext().getOperationInitializer(); return super.newThread(() -> { - configureUnitTestRefreshThread(captured); + configureUnitTestRefreshThread(); r.run(); }); } @@ -1135,19 +1134,19 @@ public Thread newThread(@NotNull final Runnable r) { /** * Configure the primary UpdateGraph thread or one of the auxiliary notification processing threads. */ - private void configureRefreshThread(OperationInitializer captured) { + private void configureRefreshThread() { SystemicObjectTracker.markThreadSystemic(); MultiChunkPool.enableDedicatedPoolForThisThread(); isUpdateThread.set(true); // Install this UpdateGraph via ExecutionContext for refresh threads, share the same operation initializer // noinspection resource - ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(captured).build().open(); + ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(operationInitializer).build().open(); } /** * Configure threads to be used for unit test processing. */ - private void configureUnitTestRefreshThread(OperationInitializer captured) { + private void configureUnitTestRefreshThread() { final Thread currentThread = Thread.currentThread(); final Thread.UncaughtExceptionHandler existing = currentThread.getUncaughtExceptionHandler(); currentThread.setUncaughtExceptionHandler((final Thread errorThread, final Throwable throwable) -> { @@ -1157,7 +1156,7 @@ private void configureUnitTestRefreshThread(OperationInitializer captured) { isUpdateThread.set(true); // Install this UpdateGraph and share operation initializer pool via ExecutionContext for refresh threads // noinspection resource - ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(captured).build().open(); + ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(operationInitializer).build().open(); } public static PeriodicUpdateGraph getInstance(final String name) { @@ -1174,6 +1173,7 @@ public static final class Builder { private String name; private int numUpdateThreads = -1; private ThreadInitializationFactory threadInitializationFactory = runnable -> runnable; + private OperationInitializer operationInitializer = ExecutionContext.getContext().getOperationInitializer(); public Builder(String name) { this.name = name; @@ -1227,6 +1227,17 @@ public Builder threadInitializationFactory(ThreadInitializationFactory threadIni return this; } + /** + * Sets the {@link OperationInitializer} to use for threads started by this UpdateGraph. + * + * @param operationInitializer the operation initializer to use + * @return this builder + */ + public Builder operationInitializer(OperationInitializer operationInitializer) { + this.operationInitializer = operationInitializer; + return this; + } + /** * Constructs and returns a PeriodicUpdateGraph. It is an error to do so an instance already exists with the * name provided to this builder. @@ -1256,7 +1267,8 @@ private PeriodicUpdateGraph construct() { targetCycleDurationMillis, minimumCycleDurationToLogNanos, numUpdateThreads, - threadInitializationFactory); + threadInitializationFactory, + operationInitializer); } } } diff --git a/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java b/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java index 9bd1679b939..4e8e908610f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java @@ -18,12 +18,11 @@ import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.context.QueryScopeParam; import io.deephaven.engine.table.hierarchical.HierarchicalTable; -import io.deephaven.engine.table.impl.OperationInitializationThreadPool; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.plugin.type.ObjectType; import io.deephaven.plugin.type.ObjectTypeLookup; import io.deephaven.util.SafeCloseable; -import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -72,7 +71,7 @@ private static void createOrClearDirectory(final File directory) { protected AbstractScriptSession( UpdateGraph updateGraph, - final ThreadInitializationFactory threadInitializationFactory, + final OperationInitializer operationInitializer, ObjectTypeLookup objectTypeLookup, @Nullable Listener changeListener) { this.objectTypeLookup = objectTypeLookup; @@ -93,7 +92,7 @@ protected AbstractScriptSession( .setQueryScope(queryScope) .setQueryCompiler(compilerContext) .setUpdateGraph(updateGraph) - .setOperationInitializer(new OperationInitializationThreadPool(threadInitializationFactory)) + .setOperationInitializer(operationInitializer) .build(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java b/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java index c071ce0a9c7..a30d63298aa 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java @@ -32,6 +32,7 @@ import io.deephaven.engine.table.TableFactory; import io.deephaven.engine.table.impl.lang.QueryLanguageFunctionUtils; import io.deephaven.engine.table.impl.util.TableLoggers; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.util.GroovyDeephavenSession.GroovySnapshot; import io.deephaven.internal.log.LoggerFactory; @@ -41,7 +42,6 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.util.QueryConstants; import io.deephaven.util.annotations.VisibleForTesting; -import io.deephaven.util.thread.ThreadInitializationFactory; import io.deephaven.util.type.ArrayTypeUtils; import io.deephaven.util.type.TypeUtils; import io.github.classgraph.ClassGraph; @@ -146,20 +146,20 @@ private String getNextScriptClassName() { public GroovyDeephavenSession( final UpdateGraph updateGraph, - final ThreadInitializationFactory threadInitializationFactory, + final OperationInitializer operationInitializer, final ObjectTypeLookup objectTypeLookup, final RunScripts runScripts) throws IOException { - this(updateGraph, threadInitializationFactory, objectTypeLookup, null, runScripts); + this(updateGraph, operationInitializer, objectTypeLookup, null, runScripts); } public GroovyDeephavenSession( final UpdateGraph updateGraph, - final ThreadInitializationFactory threadInitializationFactory, + final OperationInitializer operationInitializer, ObjectTypeLookup objectTypeLookup, @Nullable final Listener changeListener, final RunScripts runScripts) throws IOException { - super(updateGraph, threadInitializationFactory, objectTypeLookup, changeListener); + super(updateGraph, operationInitializer, objectTypeLookup, changeListener); addDefaultImports(consoleImports); if (INCLUDE_DEFAULT_IMPORTS_IN_LOADED_GROOVY) { diff --git a/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java b/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java index 7c7a28c838e..ac524bb4075 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java @@ -4,8 +4,8 @@ package io.deephaven.engine.util; import io.deephaven.engine.context.QueryScope; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; -import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -13,7 +13,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; -import java.util.function.Supplier; /** * ScriptSession implementation that simply allows variables to be exported. This is not intended for use in user @@ -25,14 +24,17 @@ public class NoLanguageDeephavenSession extends AbstractScriptSession variables; - public NoLanguageDeephavenSession(final UpdateGraph updateGraph, - final ThreadInitializationFactory threadInitializationFactory) { - this(updateGraph, threadInitializationFactory, SCRIPT_TYPE); + public NoLanguageDeephavenSession( + final UpdateGraph updateGraph, + final OperationInitializer operationInitializer) { + this(updateGraph, operationInitializer, SCRIPT_TYPE); } - public NoLanguageDeephavenSession(final UpdateGraph updateGraph, - final ThreadInitializationFactory threadInitializationFactory, final String scriptType) { - super(updateGraph, threadInitializationFactory, null, null); + public NoLanguageDeephavenSession( + final UpdateGraph updateGraph, + final OperationInitializer operationInitializer, + final String scriptType) { + super(updateGraph, operationInitializer, null, null); this.scriptType = scriptType; variables = new LinkedHashMap<>(); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java index b179795b59d..a9160c0da2b 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java @@ -76,7 +76,7 @@ private GroovyDeephavenSession getGroovySession() throws IOException { private GroovyDeephavenSession getGroovySession(@Nullable Clock clock) throws IOException { final GroovyDeephavenSession session = new GroovyDeephavenSession( ExecutionContext.getContext().getUpdateGraph(), - ThreadInitializationFactory.NO_OP, + ExecutionContext.getContext().getOperationInitializer(), NoOp.INSTANCE, GroovyDeephavenSession.RunScripts.serviceLoader()); session.getExecutionContext().open(); diff --git a/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java b/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java index e1074176a6e..47926a61860 100644 --- a/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java +++ b/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java @@ -48,8 +48,9 @@ public class TestGroovyDeephavenSession { public void setup() throws IOException { livenessScope = new LivenessScope(); LivenessScopeStack.push(livenessScope); + final ExecutionContext context = ExecutionContext.getContext(); session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), ThreadInitializationFactory.NO_OP, NoOp.INSTANCE, null, + context.getUpdateGraph(), context.getOperationInitializer(), NoOp.INSTANCE, null, GroovyDeephavenSession.RunScripts.none()); executionContext = session.getExecutionContext().open(); } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java index a0ee07d8112..50f656117b4 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java @@ -6,11 +6,12 @@ import io.deephaven.util.thread.ThreadInitializationFactory; public class TestExecutionContext { - public static final ControlledUpdateGraph UPDATE_GRAPH = new ControlledUpdateGraph(); public static final OperationInitializationThreadPool OPERATION_INITIALIZATION = new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP); + public static final ControlledUpdateGraph UPDATE_GRAPH = new ControlledUpdateGraph(OPERATION_INITIALIZATION); + public static ExecutionContext createForUnitTests() { return new ExecutionContext.Builder(new AuthContext.SuperUser()) .markSystemic() diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java index 0ca0d815015..e9f91c02dea 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java @@ -1,11 +1,12 @@ package io.deephaven.engine.testutil; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.util.thread.ThreadInitializationFactory; // TODO (deephaven-core#3886): Extract test functionality from PeriodicUpdateGraph public class ControlledUpdateGraph extends PeriodicUpdateGraph { - public ControlledUpdateGraph() { - super("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); + public ControlledUpdateGraph(final OperationInitializer operationInitializer) { + super("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP, operationInitializer); } } diff --git a/py/server/test_helper/__init__.py b/py/server/test_helper/__init__.py index 1dc3eaaca55..3db56d00eba 100644 --- a/py/server/test_helper/__init__.py +++ b/py/server/test_helper/__init__.py @@ -69,11 +69,18 @@ def start_jvm_for_tests(jvm_props: Dict[str, str] = None): # Set up a Deephaven Python session py_scope_jpy = jpy.get_type("io.deephaven.engine.util.PythonScopeJpyImpl").ofMainGlobals() global py_dh_session + + no_op_thread_factory = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP + _JOperationInitializationThreadPool = jpy.get_type("io.deephaven.engine.table.impl.OperationInitializationThreadPool") + _j_operation_initializer = _JOperationInitializationThreadPool(no_op_thread_factory) + _JPeriodicUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph") - _j_test_update_graph = _JPeriodicUpdateGraph.newBuilder(_JPeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME).existingOrBuild() - no_op_operation_initializer = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP + _j_test_update_graph = _JPeriodicUpdateGraph.newBuilder(_JPeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) \ + .operationInitializer(_j_operation_initializer) \ + .existingOrBuild() + _JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession") - py_dh_session = _JPythonScriptSession(_j_test_update_graph, no_op_operation_initializer, py_scope_jpy) + py_dh_session = _JPythonScriptSession(_j_test_update_graph, _j_operation_initializer, no_op_thread_factory, py_scope_jpy) def _expand_wildcards_in_list(elements): diff --git a/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java index a460bc6f1f7..3b561f124f7 100644 --- a/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java @@ -7,12 +7,12 @@ import dagger.Provides; import dagger.multibindings.IntoMap; import dagger.multibindings.StringKey; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.engine.util.NoLanguageDeephavenSession; import io.deephaven.engine.util.ScriptSession; import io.deephaven.server.console.groovy.InitScriptsModule; -import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; @@ -28,7 +28,7 @@ ScriptSession bindScriptSession(NoLanguageDeephavenSession noLanguageSession) { @Provides NoLanguageDeephavenSession bindNoLanguageSession( @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, - ThreadInitializationFactory threadInitializationFactory) { - return new NoLanguageDeephavenSession(updateGraph, threadInitializationFactory); + final OperationInitializer operationInitializer) { + return new NoLanguageDeephavenSession(updateGraph, operationInitializer); } } diff --git a/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java index c5e8158568d..c17b4b6fc11 100644 --- a/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java @@ -7,13 +7,13 @@ import dagger.Provides; import dagger.multibindings.IntoMap; import dagger.multibindings.StringKey; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.engine.util.GroovyDeephavenSession; import io.deephaven.engine.util.GroovyDeephavenSession.RunScripts; import io.deephaven.engine.util.ScriptSession; import io.deephaven.plugin.type.ObjectTypeLookup; -import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; import java.io.IOException; @@ -31,12 +31,12 @@ ScriptSession bindScriptSession(final GroovyDeephavenSession groovySession) { @Provides GroovyDeephavenSession bindGroovySession( @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, - ThreadInitializationFactory threadInitializationFactory, + final OperationInitializer operationInitializer, final ObjectTypeLookup lookup, final ScriptSession.Listener listener, final RunScripts runScripts) { try { - return new GroovyDeephavenSession(updateGraph, threadInitializationFactory, lookup, listener, runScripts); + return new GroovyDeephavenSession(updateGraph, operationInitializer, lookup, listener, runScripts); } catch (final IOException e) { throw new UncheckedIOException(e); } diff --git a/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java index d5f54618462..c702b74f9b8 100644 --- a/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java @@ -7,6 +7,7 @@ import dagger.Provides; import dagger.multibindings.IntoMap; import dagger.multibindings.StringKey; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.engine.util.PythonEvaluatorJpy; @@ -32,12 +33,14 @@ ScriptSession bindScriptSession(PythonDeephavenSession pythonSession) { PythonDeephavenSession bindPythonSession( @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, final ThreadInitializationFactory threadInitializationFactory, + final OperationInitializer operationInitializer, final ObjectTypeLookup lookup, final ScriptSession.Listener listener, final PythonEvaluatorJpy pythonEvaluator) { try { - return new PythonDeephavenSession(updateGraph, threadInitializationFactory, lookup, listener, true, - pythonEvaluator); + return new PythonDeephavenSession( + updateGraph, operationInitializer, threadInitializationFactory, lookup, listener, + true, pythonEvaluator); } catch (IOException e) { throw new UncheckedIOException("Unable to run python startup scripts", e); } diff --git a/server/src/main/java/io/deephaven/server/runner/updategraph/UpdateGraphModule.java b/server/src/main/java/io/deephaven/server/runner/updategraph/UpdateGraphModule.java index 322371f2ed7..2e01abff24c 100644 --- a/server/src/main/java/io/deephaven/server/runner/updategraph/UpdateGraphModule.java +++ b/server/src/main/java/io/deephaven/server/runner/updategraph/UpdateGraphModule.java @@ -2,8 +2,11 @@ import dagger.Module; import dagger.Provides; +import io.deephaven.engine.table.impl.OperationInitializationThreadPool; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; +import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; import javax.inject.Singleton; @@ -16,9 +19,20 @@ public class UpdateGraphModule { @Provides @Singleton @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) - public static UpdateGraph provideUpdateGraph() { + public static UpdateGraph provideUpdateGraph( + final ThreadInitializationFactory threadInitializationFactory, + final OperationInitializer operationInitializer) { return PeriodicUpdateGraph.newBuilder(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) .numUpdateThreads(PeriodicUpdateGraph.NUM_THREADS_DEFAULT_UPDATE_GRAPH) + .threadInitializationFactory(threadInitializationFactory) + .operationInitializer(operationInitializer) .existingOrBuild(); } + + @Provides + @Singleton + public static OperationInitializer provideOperationInitializer( + final ThreadInitializationFactory factory) { + return new OperationInitializationThreadPool(factory); + } } diff --git a/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java b/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java index b11c16fece8..204581417b6 100644 --- a/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java +++ b/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java @@ -17,7 +17,6 @@ import io.deephaven.server.session.SessionState; import io.deephaven.server.util.TestControlledScheduler; import io.deephaven.auth.AuthContext; -import io.deephaven.util.thread.ThreadInitializationFactory; import io.grpc.Context; import io.grpc.stub.StreamObserver; import org.junit.After; @@ -88,7 +87,8 @@ public void onListFieldsSubscribeFailedObserver() { // trigger a change ScriptSession scriptSession = new NoLanguageDeephavenSession( - ExecutionContext.getDefaultContext().getUpdateGraph(), ThreadInitializationFactory.NO_OP); + ExecutionContext.getContext().getUpdateGraph(), + ExecutionContext.getContext().getOperationInitializer()); scriptSession.setVariable("key", "hello world"); ScriptSession.Changes changes = new ScriptSession.Changes(); changes.created.put("key", "Object"); diff --git a/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java b/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java index 8214cd3e313..be57f02c551 100644 --- a/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java +++ b/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java @@ -53,7 +53,7 @@ public void app00() { public void app01() throws IOException { session = new GroovyDeephavenSession( ExecutionContext.getContext().getUpdateGraph(), - ThreadInitializationFactory.NO_OP, + ExecutionContext.getContext().getOperationInitializer(), NoOp.INSTANCE, null, GroovyDeephavenSession.RunScripts.none()); ApplicationState app = ApplicationFactory.create(ApplicationConfigs.testAppDir(), ApplicationConfigs.app01(), @@ -69,7 +69,7 @@ public void app01() throws IOException { public void app02() throws IOException, InterruptedException, TimeoutException { session = new PythonDeephavenSession( ExecutionContext.getDefaultContext().getUpdateGraph(), - ThreadInitializationFactory.NO_OP, + ExecutionContext.getContext().getOperationInitializer(), ThreadInitializationFactory.NO_OP, NoOp.INSTANCE, null, false, PythonEvaluatorJpy.withGlobalCopy()); ApplicationState app = ApplicationFactory.create(ApplicationConfigs.testAppDir(), ApplicationConfigs.app02(), diff --git a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java index 860eed84cb5..f485cfb5d85 100644 --- a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java +++ b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java @@ -28,6 +28,7 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.table.Table; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.table.impl.DataAccessHelpers; import io.deephaven.engine.util.AbstractScriptSession; @@ -62,7 +63,6 @@ import io.deephaven.server.util.Scheduler; import io.deephaven.util.SafeCloseable; import io.deephaven.auth.AuthContext; -import io.deephaven.util.thread.ThreadInitializationFactory; import io.grpc.*; import io.grpc.CallOptions; import io.grpc.stub.ClientCalls; @@ -127,8 +127,11 @@ TicketResolver ticketResolver(ScopeTicketResolver resolver) { @Singleton @Provides - AbstractScriptSession provideAbstractScriptSession(final UpdateGraph updateGraph) { - return new NoLanguageDeephavenSession(updateGraph, ThreadInitializationFactory.NO_OP, "non-script-session"); + AbstractScriptSession provideAbstractScriptSession( + final UpdateGraph updateGraph, + final OperationInitializer operationInitializer) { + return new NoLanguageDeephavenSession( + updateGraph, operationInitializer, "non-script-session"); } @Provides @@ -184,6 +187,12 @@ TestAuthorizationProvider provideTestAuthorizationProvider() { static UpdateGraph provideUpdateGraph() { return ExecutionContext.getContext().getUpdateGraph(); } + + @Provides + @Singleton + static OperationInitializer provideOperationInitializer() { + return ExecutionContext.getContext().getOperationInitializer(); + } } public interface TestComponent { diff --git a/sphinx/source/conf.py b/sphinx/source/conf.py index 27accbdb119..26f13515214 100644 --- a/sphinx/source/conf.py +++ b/sphinx/source/conf.py @@ -105,13 +105,21 @@ import jpy py_scope_jpy = jpy.get_type("io.deephaven.engine.util.PythonScopeJpyImpl").ofMainGlobals() + +no_op_thread_factory = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP +_JOperationInitializationThreadPool = jpy.get_type("io.deephaven.engine.table.impl.OperationInitializationThreadPool") +_j_operation_initializer = _JOperationInitializationThreadPool(no_op_thread_factory) + _JUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph") -docs_update_graph = _JUpdateGraph.newBuilder("PYTHON_DOCS").build() +docs_update_graph = _JUpdateGraph.newBuilder("PYTHON_DOCS") \ + .operationInitializer(_j_operation_initializer) \ + .build() + _JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession") -no_op_operation_initializer = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP -py_dh_session = _JPythonScriptSession(docs_update_graph, no_op_operation_initializer, py_scope_jpy) +py_dh_session = _JPythonScriptSession(docs_update_graph, _j_operation_initializer, no_op_thread_factory, py_scope_jpy) py_dh_session.getExecutionContext().open() + pygments_style = 'sphinx' import deephaven