From 9b629d97596f816d16ad3501ac478356d5112305 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 26 Aug 2022 13:45:19 -0500 Subject: [PATCH] Rewire python<->java logging to be generally friendlier to python Fixes #2734 --- .../io/logger/LogBufferOutputStream.java | 2 +- .../python/PyLogOutputStream.java | 47 ++++++++++ .../python/PythonDeephavenSession.java | 6 -- .../integrations/python/PythonLogAdapter.java | 66 -------------- .../table/python/core/deephaven_jpy_init.py | 9 +- py/embedded-server/deephaven_server/server.py | 9 +- .../python/server/EmbeddedPyLogModule.java | 49 ++++++++++ .../python/server/EmbeddedServer.java | 33 +++++++ py/server/deephaven_internal/stream.py | 89 +++++++++++++++++++ .../engine/util/WorkerPythonEnvironment.java | 3 - .../server/jetty/JettyServerComponent.java | 2 + .../server/netty/NettyServerComponent.java | 2 + ...avenApiServerInProcessGroovyComponent.java | 2 + ...avenApiServerInProcessPythonComponent.java | 2 + .../runner/DeephavenApiServerModule.java | 6 -- 15 files changed, 243 insertions(+), 84 deletions(-) create mode 100644 Integrations/src/main/java/io/deephaven/integrations/python/PyLogOutputStream.java delete mode 100644 Integrations/src/main/java/io/deephaven/integrations/python/PythonLogAdapter.java create mode 100644 py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedPyLogModule.java create mode 100644 py/server/deephaven_internal/stream.py diff --git a/IO/src/main/java/io/deephaven/io/logger/LogBufferOutputStream.java b/IO/src/main/java/io/deephaven/io/logger/LogBufferOutputStream.java index f7bdb455b8b..0829cca3306 100644 --- a/IO/src/main/java/io/deephaven/io/logger/LogBufferOutputStream.java +++ b/IO/src/main/java/io/deephaven/io/logger/LogBufferOutputStream.java @@ -10,7 +10,7 @@ import java.nio.ByteBuffer; import java.util.Objects; -class LogBufferOutputStream extends OutputStream { +public class LogBufferOutputStream extends OutputStream { private final LogBuffer sink; private final LogLevel level; diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PyLogOutputStream.java b/Integrations/src/main/java/io/deephaven/integrations/python/PyLogOutputStream.java new file mode 100644 index 00000000000..63136643e79 --- /dev/null +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PyLogOutputStream.java @@ -0,0 +1,47 @@ +package io.deephaven.integrations.python; + +import org.jetbrains.annotations.NotNull; +import org.jpy.PyObject; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.function.Supplier; + +/** + * Simple output stream that redirects all writes to a python io.TextIOBase type. + */ +public class PyLogOutputStream extends OutputStream { + private final Supplier rawIoBaseSupplier; + + public PyLogOutputStream(Supplier rawIoBaseSupplier) { + this.rawIoBaseSupplier = rawIoBaseSupplier; + } + + @Override + public void write(int i) throws IOException { + write(new byte[] {(byte) i}); + } + + @Override + public void write(@NotNull byte[] b) throws IOException { + //noinspection PrimitiveArrayArgumentToVarargsMethod + rawIoBaseSupplier.get().callMethod("write", new String(b)); + } + + @Override + public void write(@NotNull byte[] b, int off, int len) throws IOException { + byte[] buffer = new byte[len]; + System.arraycopy(b, off, buffer, 0, len); + write(buffer); + } + + @Override + public void flush() throws IOException { + rawIoBaseSupplier.get().callMethod("flush"); + } + + @Override + public void close() throws IOException { + rawIoBaseSupplier.get().callMethod("close"); + } +} diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java index 51797293178..a7198b6f330 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java @@ -7,7 +7,6 @@ import io.deephaven.base.verify.Assert; import io.deephaven.configuration.Configuration; import io.deephaven.engine.exceptions.CancellationException; -import io.deephaven.engine.context.QueryLibrary; import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.updategraph.UpdateGraphProcessor; import io.deephaven.engine.util.AbstractScriptSession; @@ -93,11 +92,6 @@ public PythonDeephavenSession( } this.scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH); - /* - * We redirect the standard Python sys.stdout and sys.stderr streams to our log object. - */ - PythonLogAdapter.interceptOutputStreams(evaluator); - publishInitial(); /* diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonLogAdapter.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonLogAdapter.java deleted file mode 100644 index 78468defcc0..00000000000 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonLogAdapter.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.integrations.python; - -import io.deephaven.engine.util.PythonEvaluator; - -import java.io.PrintStream; - -/** - * This class is stored in the sys.stdout and sys.stderr variables inside of a Python session, so that we can intercept - * the Python session's output, rather than having it all go to the system stdout/stderr streams, which are not - * accessible to the console. - */ -public class PythonLogAdapter { - private final PrintStream out; - - private PythonLogAdapter(PrintStream out) { - this.out = out; - } - - /** - * This method is used from Python so that we appear as a stream. - * - * We don't want to write the trailing newline, as the Logger implementation will do that for us. If there is no - * newline; we need to remember that we added one, so that we can suppress the next empty newline. If there was a - * newline, we shouldn't suppress it (e.g., when printing just a blank line to the output we need to preserve it). - * - * @param s the string to write - * @return the number of characters written - */ - public int write(String s) { - out.print(s); - return s.length(); - } - - /** - * https://docs.python.org/2/library/io.html#io.IOBase.flush - * https://github.com/python/cpython/blob/2.7/Modules/_io/iobase.c - */ - public void flush() { - out.flush(); - } - - // note: technically, we *should* be implementing the other methods present on stdout / stderr. - // but it's *very* unlikely that anybody will be calling most of the other methods. - // - // Maybe these? - // https://docs.python.org/2/library/io.html#io.RawIOBase.write - // https://docs.python.org/2/library/io.html#io.IOBase.close - - /** - * If we just allow python to print to it's regular STDOUT and STDERR, then it bypasses the Java System.out/err. - * - * We replace the stdout/stderr with a small log adapter so that console users still get their output. - * - * @param pythonHolder the PythonHolder object which we will insert our adapters into - */ - public static void interceptOutputStreams(PythonEvaluator pythonHolder) { - pythonHolder.set("_stdout", new PythonLogAdapter(System.out)); - pythonHolder.set("_stderr", new PythonLogAdapter(System.err)); - pythonHolder.evalStatement("import sys"); - pythonHolder.evalStatement("sys.stdout = _stdout"); - pythonHolder.evalStatement("sys.stderr = _stderr"); - } -} diff --git a/engine/table/python/core/deephaven_jpy_init.py b/engine/table/python/core/deephaven_jpy_init.py index e73e879c934..36158d942d0 100644 --- a/engine/table/python/core/deephaven_jpy_init.py +++ b/engine/table/python/core/deephaven_jpy_init.py @@ -2,10 +2,17 @@ import jpy import os +import sys +from deephaven_internal.stream import TeeStream -# Set stdin to /dev/null to prevent functions (like help()) that attempt to read from stdin from hanging the worker. +# Set stdin to /dev/null to prevent functions (like help()) that attempt to read from stdin from hanging python +# execution from within Java. os.dup2(os.open("/dev/null", os.O_RDONLY), 0) jpy.VerboseExceptions.enabled = True # If you want jpy to tell you about all that it is doing, change this # jpy.diag.flags = jpy.diag.F_ALL + +j_sys = jpy.get_type('java.lang.System') +sys.stdout = TeeStream.redirect(sys.stdout, j_sys.out) +sys.stderr = TeeStream.redirect(sys.stderr, j_sys.err) diff --git a/py/embedded-server/deephaven_server/server.py b/py/embedded-server/deephaven_server/server.py index 53fb3a53d07..58e5260a721 100644 --- a/py/embedded-server/deephaven_server/server.py +++ b/py/embedded-server/deephaven_server/server.py @@ -3,6 +3,7 @@ # from typing import Dict, List, Optional +import sys from .start_jvm import start_jvm @@ -39,9 +40,15 @@ def __init__(self, host: Optional[str] = None, port: Optional[int] = None, jvm_a # it is now safe to import jpy import jpy - # Create a wrapped java server that we can reference to talk to the platform + # Create a python-wrapped java server that we can reference to talk to the platform self.j_server = jpy.get_type('io.deephaven.python.server.EmbeddedServer')(host, port, dh_args) + # Obtain references to the deephaven logbuffer and redirect stdout/stderr to it. Note that we should not import + # this until after jpy has started. + from deephaven_internal.stream import TeeStream + sys.stdout = TeeStream.split(sys.stdout, self.j_server.getStdout()) + sys.stderr = TeeStream.split(sys.stderr, self.j_server.getStderr()) + # Keep a reference to the server so we know it is running Server.instance = self diff --git a/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedPyLogModule.java b/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedPyLogModule.java new file mode 100644 index 00000000000..903ae81fa27 --- /dev/null +++ b/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedPyLogModule.java @@ -0,0 +1,49 @@ +package io.deephaven.python.server; + +import dagger.Module; +import dagger.Provides; +import dagger.multibindings.ElementsIntoSet; +import io.deephaven.base.system.StandardStreamState; +import io.deephaven.internal.log.InitSink; +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.log.LogSink; +import io.deephaven.io.logger.LogBuffer; +import io.deephaven.io.logger.LogBufferGlobal; +import io.deephaven.server.log.LogModule; + +import javax.inject.Singleton; +import java.util.Collections; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +@Module +public interface EmbeddedPyLogModule { + + @Provides + static LogBuffer providesLogBuffer() { + return LogBufferGlobal.getInstance().orElseThrow(() -> new RuntimeException("No global LogBuffer found")); + } + + @Provides + static LogSink providesLogSink() { + // In contract, this should be a singleton - see LogInit#checkLogSinkIsSingleton() + return LoggerFactory.getLogger(LogModule.class).getSink(); + } + + @Provides + @ElementsIntoSet + static Set providesLoggerSinkSetups() { + return StreamSupport + .stream(ServiceLoader.load(InitSink.class).spliterator(), false) + .collect(Collectors.toSet()); + } + + @Provides + @Singleton + static StandardStreamState providesStandardStreamState() { + return new StandardStreamState(Collections.emptySet()); + } + +} diff --git a/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java b/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java index 5083c33d54b..c4580f7ea1c 100644 --- a/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java +++ b/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java @@ -7,6 +7,10 @@ import dagger.Component; import io.deephaven.configuration.Configuration; import io.deephaven.engine.util.ScriptSession; +import io.deephaven.integrations.python.PyLogOutputStream; +import io.deephaven.io.log.LogLevel; +import io.deephaven.io.logger.LogBuffer; +import io.deephaven.io.logger.LogBufferOutputStream; import io.deephaven.server.console.groovy.GroovyConsoleSessionModule; import io.deephaven.server.console.python.PythonConsoleSessionModule; import io.deephaven.server.console.python.PythonGlobalScopeModule; @@ -21,6 +25,7 @@ import io.deephaven.server.runner.DeephavenApiServerModule; import io.deephaven.server.runner.Main; import io.deephaven.server.util.Scheduler; +import org.jpy.PyModule; import org.jpy.PyObject; import javax.annotation.Nullable; @@ -28,11 +33,14 @@ import javax.inject.Provider; import javax.inject.Singleton; import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; public class EmbeddedServer { @Singleton @Component(modules = { DeephavenApiServerModule.class, + EmbeddedPyLogModule.class, DeephavenApiConfigModule.class, PythonGlobalScopeModule.class, HealthCheckModule.class, @@ -60,7 +68,19 @@ interface Builder extends DeephavenApiServerComponent.Builder scriptSession; +// // this is a nice idea, but won't work, since this is the same instance that we had to disable via sysprop +// @Inject +// StreamToLogBuffer logBuffer; + + @Inject + LogBuffer logBuffer; + public EmbeddedServer(String host, Integer port, PyObject dict) throws IOException { + // Redirect System.out and err to the python equivelents, in case python has (or will) redirected them. + PyModule sys = PyModule.importModule("sys"); + System.setOut(new PrintStream(new PyLogOutputStream(() -> sys.getAttribute("stdout")))); + System.setErr(new PrintStream(new PyLogOutputStream(() -> sys.getAttribute("stderr")))); + final Configuration config = Main.init(new String[0], EmbeddedServer.class); final Builder builder = JettyConfig.buildFromConfig(config); if (host != null) { @@ -113,4 +133,17 @@ private void checkGlobals(ScriptSession scriptSession, @Nullable ScriptSession.S public int getPort() { return server.server().getPort(); } + + /** + * Provide a way for Python to "tee" log output into the Deephaven log buffers. + */ + public OutputStream getStdout() { + return new LogBufferOutputStream(logBuffer, LogLevel.STDOUT, 256, 1 << 19); + } + /** + * Provide a way for Python to "tee" log output into the Deephaven log buffers. + */ + public OutputStream getStderr() { + return new LogBufferOutputStream(logBuffer, LogLevel.STDERR, 256, 1 << 19); + } } diff --git a/py/server/deephaven_internal/stream.py b/py/server/deephaven_internal/stream.py new file mode 100644 index 00000000000..b27feb79eb6 --- /dev/null +++ b/py/server/deephaven_internal/stream.py @@ -0,0 +1,89 @@ +import io + + +class TeeStream(io.TextIOBase): + """ + TextIOBase subclass that splits output between a delegate instance and a set of lambdas. Can delegate some calls to + the provided stream but actually write only to the lambdas. + + Useful to adapt any existing file-like object (such as sys.stdout or sys.stderr), but actually let some other + processing take place before writing. + """ + + encoding = 'UTF-8' + closed = False + + @staticmethod + def split(py_stream, java_stream): + return TeeStream( + py_stream, + True, + lambda t: java_stream.write(bytes(t, py_stream.encoding)), + lambda: java_stream.flush(), + lambda: java_stream.close() + ) + + @staticmethod + def redirect(py_stream, java_stream): + if hasattr(py_stream, "encoding"): + encoding = py_stream.encoding + else: + encoding = 'UTF-8' + return TeeStream( + py_stream, + True, + lambda t: java_stream.write(bytes(t, encoding)), + lambda: java_stream.flush(), + lambda: java_stream.close() + ) + + def __init__(self, orig_stream, should_write_to_orig_stream, write_func, flush_func, close_func): + """ + Creates a new TeeStream to let output be written from out place, but be sent to + multiple places. + + Ideally, the stream would be passed as more funcs, but we have to manage correctly + propagating certain non-java details like encoding and isatty. + :param orig_stream: the underlying python stream to use as a prototype + :param should_write_to_orig_stream: True to delegate writes to the original stream, False to only write to the + lambdas that follow + :param write_func: a function to call when data should be written + :param flush_func: a function to call when data should be flushed + :param close_func: a function to call when the stream should be closed + """ + self._stream = orig_stream + self.should_write_to_orig_stream = should_write_to_orig_stream + self.write_func = write_func + self.flush_func = flush_func + self.close_func = close_func + + if hasattr(orig_stream, 'encoding') and orig_stream.encoding is not None: + # Read this in the constructor, since encoding is read as an attr, not a method + self.encoding = orig_stream.encoding + + def isatty(self): + if not hasattr(self._stream, "isatty"): + return False + return self._stream.isatty() + + def fileno(self): + if not hasattr(self._stream, "fileno"): + return None + return self._stream.fileno() + + def write(self, string): + self.write_func(string) + if self.should_write_to_orig_stream: + self._stream.write(string) + + def flush(self): + self.flush_func() + if self.should_write_to_orig_stream: + self._stream.flush() + + def close(self): + self.close_func() + self.closed = True + if self.should_write_to_orig_stream: + self._stream.close() + diff --git a/python-engine-test/src/test/java/io/deephaven/engine/util/WorkerPythonEnvironment.java b/python-engine-test/src/test/java/io/deephaven/engine/util/WorkerPythonEnvironment.java index db506c45bb1..ee7232022d6 100644 --- a/python-engine-test/src/test/java/io/deephaven/engine/util/WorkerPythonEnvironment.java +++ b/python-engine-test/src/test/java/io/deephaven/engine/util/WorkerPythonEnvironment.java @@ -8,7 +8,6 @@ import io.deephaven.base.verify.Assert; import io.deephaven.configuration.Configuration; import io.deephaven.integrations.python.PythonDeephavenSession; -import io.deephaven.integrations.python.PythonLogAdapter; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.engine.context.QueryScope; @@ -65,8 +64,6 @@ public enum WorkerPythonEnvironment { .endl(); } - PythonLogAdapter.interceptOutputStreams(evaluator); - final String defaultScriptPath = Configuration.getInstance() .getProperty("WorkerPythonEnvironment.defaultScriptPath") .replace("", Configuration.getInstance().getDevRootPath()) diff --git a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java index 8f69e50618b..3c052082e45 100644 --- a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java +++ b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyServerComponent.java @@ -9,6 +9,7 @@ import io.deephaven.server.console.python.PythonConsoleSessionModule; import io.deephaven.server.console.python.PythonGlobalScopeCopyModule; import io.deephaven.server.healthcheck.HealthCheckModule; +import io.deephaven.server.log.LogModule; import io.deephaven.server.plugin.python.PythonPluginsRegistration; import io.deephaven.server.runner.DeephavenApiConfigModule; import io.deephaven.server.runner.DeephavenApiServerComponent; @@ -19,6 +20,7 @@ @Singleton @Component(modules = { DeephavenApiServerModule.class, + LogModule.class, DeephavenApiConfigModule.class, PythonGlobalScopeCopyModule.class, HealthCheckModule.class, diff --git a/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java b/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java index 392916cd630..3469794ac2b 100644 --- a/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java +++ b/server/netty/src/main/java/io/deephaven/server/netty/NettyServerComponent.java @@ -9,6 +9,7 @@ import io.deephaven.server.console.python.PythonConsoleSessionModule; import io.deephaven.server.console.python.PythonGlobalScopeCopyModule; import io.deephaven.server.healthcheck.HealthCheckModule; +import io.deephaven.server.log.LogModule; import io.deephaven.server.plugin.python.PythonPluginsRegistration; import io.deephaven.server.runner.DeephavenApiConfigModule; import io.deephaven.server.runner.DeephavenApiServerComponent; @@ -19,6 +20,7 @@ @Singleton @Component(modules = { DeephavenApiServerModule.class, + LogModule.class, DeephavenApiConfigModule.class, PythonGlobalScopeCopyModule.class, HealthCheckModule.class, diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerInProcessGroovyComponent.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerInProcessGroovyComponent.java index 0f6e8b9eafd..4d61c51b3cf 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerInProcessGroovyComponent.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerInProcessGroovyComponent.java @@ -6,6 +6,7 @@ import dagger.BindsInstance; import dagger.Component; import io.deephaven.server.console.groovy.GroovyConsoleSessionModule; +import io.deephaven.server.log.LogModule; import io.grpc.ManagedChannelBuilder; import javax.inject.Named; @@ -15,6 +16,7 @@ @Singleton @Component(modules = { DeephavenApiServerModule.class, + LogModule.class, GroovyConsoleSessionModule.class, ServerBuilderInProcessModule.class }) diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerInProcessPythonComponent.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerInProcessPythonComponent.java index a8854915dc0..2cd576e51fe 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerInProcessPythonComponent.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerInProcessPythonComponent.java @@ -7,6 +7,7 @@ import dagger.Component; import io.deephaven.server.console.python.PythonConsoleSessionModule; import io.deephaven.server.console.python.PythonGlobalScopeCopyModule; +import io.deephaven.server.log.LogModule; import io.grpc.ManagedChannelBuilder; import javax.inject.Named; @@ -16,6 +17,7 @@ @Singleton @Component(modules = { DeephavenApiServerModule.class, + LogModule.class, PythonConsoleSessionModule.class, PythonGlobalScopeCopyModule.class, ServerBuilderInProcessModule.class diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java index f75d8b2f1f4..bfe87ef5503 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServerModule.java @@ -18,9 +18,6 @@ import io.deephaven.server.arrow.ArrowModule; import io.deephaven.server.auth.AuthContextModule; import io.deephaven.server.console.ConsoleModule; -import io.deephaven.server.console.groovy.GroovyConsoleSessionModule; -import io.deephaven.server.console.python.PythonConsoleSessionModule; -import io.deephaven.server.log.LogModule; import io.deephaven.server.session.SessionModule; import io.deephaven.server.table.TableModule; import io.deephaven.server.table.inputtables.InputTableModule; @@ -29,8 +26,6 @@ import io.deephaven.util.process.ProcessEnvironment; import io.deephaven.util.thread.NamingThreadFactory; import io.grpc.BindableService; -import io.grpc.Server; -import io.grpc.ServerBuilder; import io.grpc.ServerInterceptor; import org.jetbrains.annotations.NotNull; @@ -54,7 +49,6 @@ AppModeModule.class, ArrowModule.class, AuthContextModule.class, - LogModule.class, UriModule.class, SessionModule.class, TableModule.class,