Skip to content

Commit

Permalink
Merge pull request #333 from YunoHost/new-log-streaming-api
Browse files Browse the repository at this point in the history
POC for new log streaming API using a zero-mq broker
  • Loading branch information
alexAubin authored Jan 20, 2025
2 parents cdd65c2 + 080f831 commit 515d09d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 139 deletions.
155 changes: 24 additions & 131 deletions moulinette/interfaces/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

import os
import sys
import re
import errno
Expand All @@ -28,10 +29,6 @@
from tempfile import mkdtemp
from shutil import rmtree

from gevent import sleep
from gevent.queue import Queue
from geventwebsocket import WebSocketError

from bottle import redirect, request, response, Bottle, HTTPResponse, FileUpload
from bottle import abort

Expand All @@ -47,7 +44,6 @@
ExtendedArgumentParser,
JSONExtendedEncoder,
)
from moulinette.utils import log

logger = logging.getLogger("moulinette.interface.api")

Expand Down Expand Up @@ -84,47 +80,6 @@ def wrapper(*args, **kwargs):
return wrapper


class LogQueues(dict):
"""Map of session ids to queue."""

pass


class APIQueueHandler(logging.Handler):
"""
A handler class which store logging records into a queue, to be used
and retrieved from the API.
"""

def __init__(self):
logging.Handler.__init__(self)
self.queues = LogQueues()
# actionsmap is actually set during the interface's init ...
self.actionsmap = None

def emit(self, record):
# Prevent triggering this function while moulinette
# is being initialized with --debug
if not self.actionsmap or len(request.cookies) == 0:
return

profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)

s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"]
try:
queue = self.queues[s_id]
except KeyError:
# Session is not initialized, abandon.
return
else:
# Put the message as a 2-tuple in the queue
queue.put_nowait((record.levelname.lower(), record.getMessage()))
# Put the current greenlet to sleep for 0 second in order to
# populate the new message in the queue
sleep(0)


class _HTTPArgumentParser:
"""Argument parser for HTTP requests
Expand Down Expand Up @@ -268,9 +223,8 @@ class _ActionsMapPlugin:
name = "actionsmap"
api = 2

def __init__(self, actionsmap, log_queues={}):
def __init__(self, actionsmap):
self.actionsmap = actionsmap
self.log_queues = log_queues

def setup(self, app):
"""Setup plugin on the application
Expand Down Expand Up @@ -299,11 +253,10 @@ def setup(self, app):
skip=["actionsmap"],
)

# Append messages route
app.route(
"/messages",
name="messages",
callback=self.messages,
"/sse",
name="sse",
callback=self.sse,
skip=["actionsmap"],
)

Expand Down Expand Up @@ -440,46 +393,24 @@ def logout(self):
else:
return m18n.g("logged_out")

def messages(self):
"""Listen to the messages WebSocket stream
Retrieve the WebSocket stream and send to it each messages displayed by
the display method. They are JSON encoded as a dict { style: message }.
"""
def sse(self):

profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)

s_id = authenticator.get_session_cookie()["id"]
try:
queue = self.log_queues[s_id]
except KeyError:
# Create a new queue for the session
queue = Queue()
self.log_queues[s_id] = queue

wsock = request.environ.get("wsgi.websocket")
if not wsock:
raise HTTPResponse(m18n.g("websocket_request_expected"), 500)

while True:
item = queue.get()
# Hardcoded yunohost stuff for the SSE stream to not require authentication when postinstall isnt done yet...
if os.path.exists("/etc/yunohost/installed"):
try:
# Retrieve the message
style, message = item
except TypeError:
if item == StopIteration:
# Delete the current queue and break
del self.log_queues[s_id]
break
logger.exception("invalid item in the messages queue: %r", item)
else:
try:
# Send the message
wsock.send(json_encode({style: message}))
except WebSocketError:
break
sleep(0)
authenticator.get_session_cookie()
except MoulinetteAuthenticationError:
raise HTTPResponse(m18n.g("not_logged_in"), 401)

response.content_type = 'text/event-stream'
response.cache_control = 'no-cache'
response.headers["X-Accel-Buffering"] = "no"

from yunohost.utils.sse import sse_stream
yield from sse_stream()

def process(self, _route, arguments={}):
"""Process the relevant action for the route
Expand Down Expand Up @@ -517,37 +448,9 @@ def process(self, _route, arguments={}):
rmtree(UPLOAD_DIR, True)
UPLOAD_DIR = None

# Close opened WebSocket by putting StopIteration in the queue
profile = request.params.get(
"profile", self.actionsmap.default_authentication
)
authenticator = self.actionsmap.get_authenticator(profile)
try:
s_id = authenticator.get_session_cookie()["id"]
queue = self.log_queues[s_id]
except MoulinetteAuthenticationError:
pass
except KeyError:
pass
else:
queue.put(StopIteration)

def display(self, message, style="info"):
profile = request.params.get("profile", self.actionsmap.default_authentication)
authenticator = self.actionsmap.get_authenticator(profile)
s_id = authenticator.get_session_cookie(raise_if_no_session_exists=False)["id"]

try:
queue = self.log_queues[s_id]
except KeyError:
return

# Put the message as a 2-tuple in the queue
queue.put_nowait((style, message))

# Put the current greenlet to sleep for 0 second in order to
# populate the new message in the queue
sleep(0)
pass

def prompt(self, *args, **kwargs):
raise NotImplementedError("Prompt is not implemented for this interface")
Expand Down Expand Up @@ -744,9 +647,6 @@ class Interface:
Keyword arguments:
- routes -- A dict of additional routes to add in the form of
{(method, path): callback}
- log_queues -- A LogQueues object or None to retrieve it from
registered logging handlers
"""

type = "api"
Expand All @@ -756,12 +656,6 @@ def __init__(self, routes={}, actionsmap=None, allowed_cors_origins=[]):

self.allowed_cors_origins = allowed_cors_origins

# Attempt to retrieve log queues from an APIQueueHandler
handler = log.getHandlersByClass(APIQueueHandler, limit=1)
if handler:
log_queues = handler.queues
handler.actionsmap = actionsmap

# TODO: Return OK to 'OPTIONS' xhr requests (l173)
app = Bottle(autojson=True)

Expand Down Expand Up @@ -792,7 +686,7 @@ def wrapper(*args, **kwargs):
def api18n(callback):
def wrapper(*args, **kwargs):
try:
locale = request.params.pop("locale")
locale = request.get_header("locale")
except (KeyError, ValueError):
locale = m18n.default_locale
m18n.set_locale(locale)
Expand All @@ -804,7 +698,7 @@ def wrapper(*args, **kwargs):
app.install(filter_csrf)
app.install(cors)
app.install(api18n)
actionsmapplugin = _ActionsMapPlugin(actionsmap, log_queues)
actionsmapplugin = _ActionsMapPlugin(actionsmap)
app.install(actionsmapplugin)

self.authenticate = actionsmapplugin.authenticate
Expand Down Expand Up @@ -846,11 +740,10 @@ def run(self, host="localhost", port=80):
)

try:
from gevent.pywsgi import WSGIServer
from geventwebsocket.handler import WebSocketHandler
from gevent import monkey; monkey.patch_all()
from bottle import GeventServer

server = WSGIServer((host, port), self._app, handler_class=WebSocketHandler)
server.serve_forever()
GeventServer(host, port).run(self._app)
except IOError as e:
error_message = "unable to start the server instance on %s:%d: %s" % (
host,
Expand Down
21 changes: 17 additions & 4 deletions moulinette/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import subprocess
import os
import threading
from threading import Thread
import queue
import logging

Expand Down Expand Up @@ -122,19 +122,32 @@ def call_async_output(args, callback, **kwargs):

return p.poll()

# cf https://stackoverflow.com/questions/9192539
# The API uses monkey.patch_all() and we have to switch to a proper greenlet
# thread for the LogPipe stuff to work properly (maybe we should also enable
# gevent on the CLI, idk...)
from gevent import monkey
if monkey.is_module_patched('threading'):
from gevent import Greenlet
from gevent.fileobject import FileObjectThread
Thread = Greenlet
else:
FileObjectThread = os.fdopen

class LogPipe(threading.Thread):


class LogPipe(Thread):
# Adapted from https://codereview.stackexchange.com/a/17959
def __init__(self, log_callback, queue):
"""Setup the object with a logger and a loglevel
and start the thread
"""
threading.Thread.__init__(self)
Thread.__init__(self)
self.daemon = False
self.log_callback = log_callback

self.fdRead, self.fdWrite = os.pipe()
self.pipeReader = os.fdopen(self.fdRead, "rb")
self.pipeReader = FileObjectThread(self.fdRead, "rb")

self.queue = queue

Expand Down
4 changes: 0 additions & 4 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ def logging_configuration(moulinette):
},
"filters": {},
"handlers": {
"api": {
"level": level,
"class": "moulinette.interfaces.api.APIQueueHandler",
},
"tty": {
"level": tty_level,
"class": "moulinette.interfaces.cli.TTYHandler",
Expand Down

0 comments on commit 515d09d

Please sign in to comment.