Skip to content

Commit

Permalink
Rewire python<->java logging to be generally friendlier to python
Browse files Browse the repository at this point in the history
Fixes #2734
  • Loading branch information
niloc132 committed Aug 26, 2022
1 parent 8a5b494 commit 9b629d9
Show file tree
Hide file tree
Showing 15 changed files with 243 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.nio.ByteBuffer;
import java.util.Objects;

class LogBufferOutputStream extends OutputStream {
public class LogBufferOutputStream extends OutputStream {

private final LogBuffer sink;
private final LogLevel level;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.deephaven.integrations.python;

import org.jetbrains.annotations.NotNull;
import org.jpy.PyObject;

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Supplier;

/**
* Simple output stream that redirects all writes to a python io.TextIOBase type.
*/
public class PyLogOutputStream extends OutputStream {
private final Supplier<PyObject> rawIoBaseSupplier;

public PyLogOutputStream(Supplier<PyObject> rawIoBaseSupplier) {
this.rawIoBaseSupplier = rawIoBaseSupplier;
}

@Override
public void write(int i) throws IOException {
write(new byte[] {(byte) i});
}

@Override
public void write(@NotNull byte[] b) throws IOException {
//noinspection PrimitiveArrayArgumentToVarargsMethod
rawIoBaseSupplier.get().callMethod("write", new String(b));
}

@Override
public void write(@NotNull byte[] b, int off, int len) throws IOException {
byte[] buffer = new byte[len];
System.arraycopy(b, off, buffer, 0, len);
write(buffer);
}

@Override
public void flush() throws IOException {
rawIoBaseSupplier.get().callMethod("flush");
}

@Override
public void close() throws IOException {
rawIoBaseSupplier.get().callMethod("close");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.context.QueryLibrary;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.util.AbstractScriptSession;
Expand Down Expand Up @@ -93,11 +92,6 @@ public PythonDeephavenSession(
}
this.scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH);

/*
* We redirect the standard Python sys.stdout and sys.stderr streams to our log object.
*/
PythonLogAdapter.interceptOutputStreams(evaluator);

publishInitial();

/*
Expand Down

This file was deleted.

9 changes: 8 additions & 1 deletion engine/table/python/core/deephaven_jpy_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@

import jpy
import os
import sys
from deephaven_internal.stream import TeeStream

# Set stdin to /dev/null to prevent functions (like help()) that attempt to read from stdin from hanging the worker.
# Set stdin to /dev/null to prevent functions (like help()) that attempt to read from stdin from hanging python
# execution from within Java.
os.dup2(os.open("/dev/null", os.O_RDONLY), 0)

jpy.VerboseExceptions.enabled = True
# If you want jpy to tell you about all that it is doing, change this
# jpy.diag.flags = jpy.diag.F_ALL

j_sys = jpy.get_type('java.lang.System')
sys.stdout = TeeStream.redirect(sys.stdout, j_sys.out)
sys.stderr = TeeStream.redirect(sys.stderr, j_sys.err)
9 changes: 8 additions & 1 deletion py/embedded-server/deephaven_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

from typing import Dict, List, Optional
import sys

from .start_jvm import start_jvm

Expand Down Expand Up @@ -39,9 +40,15 @@ def __init__(self, host: Optional[str] = None, port: Optional[int] = None, jvm_a
# it is now safe to import jpy
import jpy

# Create a wrapped java server that we can reference to talk to the platform
# Create a python-wrapped java server that we can reference to talk to the platform
self.j_server = jpy.get_type('io.deephaven.python.server.EmbeddedServer')(host, port, dh_args)

# Obtain references to the deephaven logbuffer and redirect stdout/stderr to it. Note that we should not import
# this until after jpy has started.
from deephaven_internal.stream import TeeStream
sys.stdout = TeeStream.split(sys.stdout, self.j_server.getStdout())
sys.stderr = TeeStream.split(sys.stderr, self.j_server.getStderr())

# Keep a reference to the server so we know it is running
Server.instance = self

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.deephaven.python.server;

import dagger.Module;
import dagger.Provides;
import dagger.multibindings.ElementsIntoSet;
import io.deephaven.base.system.StandardStreamState;
import io.deephaven.internal.log.InitSink;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.log.LogSink;
import io.deephaven.io.logger.LogBuffer;
import io.deephaven.io.logger.LogBufferGlobal;
import io.deephaven.server.log.LogModule;

import javax.inject.Singleton;
import java.util.Collections;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

@Module
public interface EmbeddedPyLogModule {

@Provides
static LogBuffer providesLogBuffer() {
return LogBufferGlobal.getInstance().orElseThrow(() -> new RuntimeException("No global LogBuffer found"));
}

@Provides
static LogSink providesLogSink() {
// In contract, this should be a singleton - see LogInit#checkLogSinkIsSingleton()
return LoggerFactory.getLogger(LogModule.class).getSink();
}

@Provides
@ElementsIntoSet
static Set<InitSink> providesLoggerSinkSetups() {
return StreamSupport
.stream(ServiceLoader.load(InitSink.class).spliterator(), false)
.collect(Collectors.toSet());
}

@Provides
@Singleton
static StandardStreamState providesStandardStreamState() {
return new StandardStreamState(Collections.emptySet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
import dagger.Component;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.util.ScriptSession;
import io.deephaven.integrations.python.PyLogOutputStream;
import io.deephaven.io.log.LogLevel;
import io.deephaven.io.logger.LogBuffer;
import io.deephaven.io.logger.LogBufferOutputStream;
import io.deephaven.server.console.groovy.GroovyConsoleSessionModule;
import io.deephaven.server.console.python.PythonConsoleSessionModule;
import io.deephaven.server.console.python.PythonGlobalScopeModule;
Expand All @@ -21,18 +25,22 @@
import io.deephaven.server.runner.DeephavenApiServerModule;
import io.deephaven.server.runner.Main;
import io.deephaven.server.util.Scheduler;
import org.jpy.PyModule;
import org.jpy.PyObject;

import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Singleton;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;

public class EmbeddedServer {
@Singleton
@Component(modules = {
DeephavenApiServerModule.class,
EmbeddedPyLogModule.class,
DeephavenApiConfigModule.class,
PythonGlobalScopeModule.class,
HealthCheckModule.class,
Expand Down Expand Up @@ -60,7 +68,19 @@ interface Builder extends DeephavenApiServerComponent.Builder<PythonServerCompon
@Inject
Provider<ScriptSession> scriptSession;

// // this is a nice idea, but won't work, since this is the same instance that we had to disable via sysprop
// @Inject
// StreamToLogBuffer logBuffer;

@Inject
LogBuffer logBuffer;

public EmbeddedServer(String host, Integer port, PyObject dict) throws IOException {
// Redirect System.out and err to the python equivelents, in case python has (or will) redirected them.
PyModule sys = PyModule.importModule("sys");
System.setOut(new PrintStream(new PyLogOutputStream(() -> sys.getAttribute("stdout"))));
System.setErr(new PrintStream(new PyLogOutputStream(() -> sys.getAttribute("stderr"))));

final Configuration config = Main.init(new String[0], EmbeddedServer.class);
final Builder builder = JettyConfig.buildFromConfig(config);
if (host != null) {
Expand Down Expand Up @@ -113,4 +133,17 @@ private void checkGlobals(ScriptSession scriptSession, @Nullable ScriptSession.S
public int getPort() {
return server.server().getPort();
}

/**
* Provide a way for Python to "tee" log output into the Deephaven log buffers.
*/
public OutputStream getStdout() {
return new LogBufferOutputStream(logBuffer, LogLevel.STDOUT, 256, 1 << 19);
}
/**
* Provide a way for Python to "tee" log output into the Deephaven log buffers.
*/
public OutputStream getStderr() {
return new LogBufferOutputStream(logBuffer, LogLevel.STDERR, 256, 1 << 19);
}
}
89 changes: 89 additions & 0 deletions py/server/deephaven_internal/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import io


class TeeStream(io.TextIOBase):
"""
TextIOBase subclass that splits output between a delegate instance and a set of lambdas. Can delegate some calls to
the provided stream but actually write only to the lambdas.
Useful to adapt any existing file-like object (such as sys.stdout or sys.stderr), but actually let some other
processing take place before writing.
"""

encoding = 'UTF-8'
closed = False

@staticmethod
def split(py_stream, java_stream):
return TeeStream(
py_stream,
True,
lambda t: java_stream.write(bytes(t, py_stream.encoding)),
lambda: java_stream.flush(),
lambda: java_stream.close()
)

@staticmethod
def redirect(py_stream, java_stream):
if hasattr(py_stream, "encoding"):
encoding = py_stream.encoding
else:
encoding = 'UTF-8'
return TeeStream(
py_stream,
True,
lambda t: java_stream.write(bytes(t, encoding)),
lambda: java_stream.flush(),
lambda: java_stream.close()
)

def __init__(self, orig_stream, should_write_to_orig_stream, write_func, flush_func, close_func):
"""
Creates a new TeeStream to let output be written from out place, but be sent to
multiple places.
Ideally, the stream would be passed as more funcs, but we have to manage correctly
propagating certain non-java details like encoding and isatty.
:param orig_stream: the underlying python stream to use as a prototype
:param should_write_to_orig_stream: True to delegate writes to the original stream, False to only write to the
lambdas that follow
:param write_func: a function to call when data should be written
:param flush_func: a function to call when data should be flushed
:param close_func: a function to call when the stream should be closed
"""
self._stream = orig_stream
self.should_write_to_orig_stream = should_write_to_orig_stream
self.write_func = write_func
self.flush_func = flush_func
self.close_func = close_func

if hasattr(orig_stream, 'encoding') and orig_stream.encoding is not None:
# Read this in the constructor, since encoding is read as an attr, not a method
self.encoding = orig_stream.encoding

def isatty(self):
if not hasattr(self._stream, "isatty"):
return False
return self._stream.isatty()

def fileno(self):
if not hasattr(self._stream, "fileno"):
return None
return self._stream.fileno()

def write(self, string):
self.write_func(string)
if self.should_write_to_orig_stream:
self._stream.write(string)

def flush(self):
self.flush_func()
if self.should_write_to_orig_stream:
self._stream.flush()

def close(self):
self.close_func()
self.closed = True
if self.should_write_to_orig_stream:
self._stream.close()

Loading

0 comments on commit 9b629d9

Please sign in to comment.