Skip to content

Commit

Permalink
Rewire python<->java logging to be generally friendlier to python
Browse files Browse the repository at this point in the history
Fixes #2734
  • Loading branch information
niloc132 committed Aug 26, 2022
1 parent 8a5b494 commit 117c9ce
Show file tree
Hide file tree
Showing 15 changed files with 244 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.nio.ByteBuffer;
import java.util.Objects;

class LogBufferOutputStream extends OutputStream {
public class LogBufferOutputStream extends OutputStream {

private final LogBuffer sink;
private final LogLevel level;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.deephaven.integrations.python;

import org.jetbrains.annotations.NotNull;
import org.jpy.PyObject;

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Supplier;

/**
* Simple output stream that redirects all writes to a python io.TextIOBase type.
*/
public class PyLogOutputStream extends OutputStream {
private final Supplier<PyObject> rawIoBaseSupplier;

public PyLogOutputStream(Supplier<PyObject> rawIoBaseSupplier) {
this.rawIoBaseSupplier = rawIoBaseSupplier;
}

@Override
public void write(int i) throws IOException {
write(new byte[] {(byte) i});
}

@Override
public void write(@NotNull byte[] b) throws IOException {
// noinspection PrimitiveArrayArgumentToVarargsMethod
rawIoBaseSupplier.get().callMethod("write", new String(b));
}

@Override
public void write(@NotNull byte[] b, int off, int len) throws IOException {
byte[] buffer = new byte[len];
System.arraycopy(b, off, buffer, 0, len);
write(buffer);
}

@Override
public void flush() throws IOException {
rawIoBaseSupplier.get().callMethod("flush");
}

@Override
public void close() throws IOException {
rawIoBaseSupplier.get().callMethod("close");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.context.QueryLibrary;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.util.AbstractScriptSession;
Expand Down Expand Up @@ -93,11 +92,6 @@ public PythonDeephavenSession(
}
this.scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH);

/*
* We redirect the standard Python sys.stdout and sys.stderr streams to our log object.
*/
PythonLogAdapter.interceptOutputStreams(evaluator);

publishInitial();

/*
Expand Down

This file was deleted.

9 changes: 8 additions & 1 deletion engine/table/python/core/deephaven_jpy_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@

import jpy
import os
import sys
from deephaven_internal.stream import TeeStream

# Set stdin to /dev/null to prevent functions (like help()) that attempt to read from stdin from hanging the worker.
# Set stdin to /dev/null to prevent functions (like help()) that attempt to read from stdin from hanging python
# execution from within Java.
os.dup2(os.open("/dev/null", os.O_RDONLY), 0)

jpy.VerboseExceptions.enabled = True
# If you want jpy to tell you about all that it is doing, change this
# jpy.diag.flags = jpy.diag.F_ALL

j_sys = jpy.get_type('java.lang.System')
sys.stdout = TeeStream.redirect(sys.stdout, j_sys.out)
sys.stderr = TeeStream.redirect(sys.stderr, j_sys.err)
9 changes: 8 additions & 1 deletion py/embedded-server/deephaven_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

from typing import Dict, List, Optional
import sys

from .start_jvm import start_jvm

Expand Down Expand Up @@ -39,9 +40,15 @@ def __init__(self, host: Optional[str] = None, port: Optional[int] = None, jvm_a
# it is now safe to import jpy
import jpy

# Create a wrapped java server that we can reference to talk to the platform
# Create a python-wrapped java server that we can reference to talk to the platform
self.j_server = jpy.get_type('io.deephaven.python.server.EmbeddedServer')(host, port, dh_args)

# Obtain references to the deephaven logbuffer and redirect stdout/stderr to it. Note that we should not import
# this until after jpy has started.
from deephaven_internal.stream import TeeStream
sys.stdout = TeeStream.split(sys.stdout, self.j_server.getStdout())
sys.stderr = TeeStream.split(sys.stderr, self.j_server.getStderr())

# Keep a reference to the server so we know it is running
Server.instance = self

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.deephaven.python.server;

import dagger.Module;
import dagger.Provides;
import dagger.multibindings.ElementsIntoSet;
import io.deephaven.base.system.StandardStreamState;
import io.deephaven.internal.log.InitSink;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.log.LogSink;
import io.deephaven.io.logger.LogBuffer;
import io.deephaven.io.logger.LogBufferGlobal;
import io.deephaven.server.log.LogModule;

import javax.inject.Singleton;
import java.util.Collections;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

@Module
public interface EmbeddedPyLogModule {

@Provides
static LogBuffer providesLogBuffer() {
return LogBufferGlobal.getInstance().orElseThrow(() -> new RuntimeException("No global LogBuffer found"));
}

@Provides
static LogSink providesLogSink() {
// In contract, this should be a singleton - see LogInit#checkLogSinkIsSingleton()
return LoggerFactory.getLogger(LogModule.class).getSink();
}

@Provides
@ElementsIntoSet
static Set<InitSink> providesLoggerSinkSetups() {
return StreamSupport
.stream(ServiceLoader.load(InitSink.class).spliterator(), false)
.collect(Collectors.toSet());
}

@Provides
@Singleton
static StandardStreamState providesStandardStreamState() {
return new StandardStreamState(Collections.emptySet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
import dagger.Component;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.util.ScriptSession;
import io.deephaven.integrations.python.PyLogOutputStream;
import io.deephaven.io.log.LogLevel;
import io.deephaven.io.logger.LogBuffer;
import io.deephaven.io.logger.LogBufferOutputStream;
import io.deephaven.server.console.groovy.GroovyConsoleSessionModule;
import io.deephaven.server.console.python.PythonConsoleSessionModule;
import io.deephaven.server.console.python.PythonGlobalScopeModule;
Expand All @@ -21,18 +25,22 @@
import io.deephaven.server.runner.DeephavenApiServerModule;
import io.deephaven.server.runner.Main;
import io.deephaven.server.util.Scheduler;
import org.jpy.PyModule;
import org.jpy.PyObject;

import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Singleton;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;

public class EmbeddedServer {
@Singleton
@Component(modules = {
DeephavenApiServerModule.class,
EmbeddedPyLogModule.class,
DeephavenApiConfigModule.class,
PythonGlobalScopeModule.class,
HealthCheckModule.class,
Expand Down Expand Up @@ -60,7 +68,19 @@ interface Builder extends DeephavenApiServerComponent.Builder<PythonServerCompon
@Inject
Provider<ScriptSession> scriptSession;

// // this is a nice idea, but won't work, since this is the same instance that we had to disable via sysprop
// @Inject
// StreamToLogBuffer logBuffer;

@Inject
LogBuffer logBuffer;

public EmbeddedServer(String host, Integer port, PyObject dict) throws IOException {
// Redirect System.out and err to the python equivelents, in case python has (or will) redirected them.
PyModule sys = PyModule.importModule("sys");
System.setOut(new PrintStream(new PyLogOutputStream(() -> sys.getAttribute("stdout"))));
System.setErr(new PrintStream(new PyLogOutputStream(() -> sys.getAttribute("stderr"))));

final Configuration config = Main.init(new String[0], EmbeddedServer.class);
final Builder builder = JettyConfig.buildFromConfig(config);
if (host != null) {
Expand Down Expand Up @@ -113,4 +133,18 @@ private void checkGlobals(ScriptSession scriptSession, @Nullable ScriptSession.S
public int getPort() {
return server.server().getPort();
}

/**
* Provide a way for Python to "tee" log output into the Deephaven log buffers.
*/
public OutputStream getStdout() {
return new LogBufferOutputStream(logBuffer, LogLevel.STDOUT, 256, 1 << 19);
}

/**
* Provide a way for Python to "tee" log output into the Deephaven log buffers.
*/
public OutputStream getStderr() {
return new LogBufferOutputStream(logBuffer, LogLevel.STDERR, 256, 1 << 19);
}
}
Loading

0 comments on commit 117c9ce

Please sign in to comment.