From 26699c88276517037cf569d388a13c73e5d2e3ad Mon Sep 17 00:00:00 2001 From: Nicolas Le Manchet Date: Wed, 2 Nov 2022 18:55:09 +0100 Subject: [PATCH] WIP: Add signals to expose internal Waitress events In order to expose how loaded waitress is, this commit adds a set of signals. They act as an API allowing external code to react to events like the creation of a channel or the start of execution of a task. This allows third-parties to develop extensions to track the load of Waitress via their favorite monitoring solutions. Support for signals depends on the availability of the "blinker" library. When it is not available, signals act as a no-op and cannot be subscribed to. --- signals_example.py | 51 ++++++++++++++++++ src/waitress/channel.py | 4 ++ src/waitress/server.py | 3 ++ src/waitress/signals.py | 117 ++++++++++++++++++++++++++++++++++++++++ src/waitress/task.py | 4 ++ 5 files changed, 179 insertions(+) create mode 100644 signals_example.py create mode 100644 src/waitress/signals.py diff --git a/signals_example.py b/signals_example.py new file mode 100644 index 00000000..7fa8b159 --- /dev/null +++ b/signals_example.py @@ -0,0 +1,51 @@ +import threading +import time + +import waitress +from waitress.signals import signals + +total_threads_by_server = dict() +busy_threads_by_server = dict() + + +def app(env, start_response): + http_status = "200 OK" + response_headers = [("Content-Type", "text/plain")] + response_bytes = b"Hello World" + start_response(http_status, response_headers) + time.sleep(5) + return [response_bytes] + + +@signals.get("server_started").connect +def server_started(server, *args, **kwargs): + total_threads_by_server[server] = server.adj.threads + busy_threads_by_server[server] = 0 + print( + f"Started server listening on {server.addr} with {server.adj.threads} threads" + ) + + +@signals.get("server_finished").connect +def server_finished(server, *args, **kwargs): + print(f"Stopped server listening on {server.addr}") + + +@signals.get("task_started").connect +def task_started(server, *args, task, **kwargs): + busy_threads_by_server[server] += 1 + print( + f"Thread {threading.current_thread().name} started task, {busy_threads_by_server[server]}/{total_threads_by_server[server]}" + ) + + +@signals.get("task_finished").connect +def task_finished(server, *args, task, **kwargs): + busy_threads_by_server[server] -= 1 + print( + f"Thread {threading.current_thread().name} finished task, {busy_threads_by_server[server]}/{total_threads_by_server[server]}" + ) + + +if __name__ == "__main__": + waitress.serve(app, threads=2) diff --git a/src/waitress/channel.py b/src/waitress/channel.py index eb59dd3f..4902639a 100644 --- a/src/waitress/channel.py +++ b/src/waitress/channel.py @@ -20,6 +20,7 @@ from waitress.parser import HTTPRequestParser from waitress.task import ErrorTask, WSGITask from waitress.utilities import InternalServerError +from waitress.signals import signals from . import wasyncore @@ -323,6 +324,7 @@ def add_channel(self, map=None): """ wasyncore.dispatcher.add_channel(self, map) self.server.active_channels[self._fileno] = self + signals.send("channel_added", self.server, channel=self) def del_channel(self, map=None): """See wasyncore.dispatcher @@ -336,6 +338,8 @@ def del_channel(self, map=None): if fd in ac: del ac[fd] + signals.send("channel_deleted", self.server, channel=self) + # # SYNCHRONOUS METHODS # diff --git a/src/waitress/server.py b/src/waitress/server.py index 0378d48b..e75edff9 100644 --- a/src/waitress/server.py +++ b/src/waitress/server.py @@ -23,6 +23,7 @@ from waitress.compat import IPPROTO_IPV6, IPV6_V6ONLY from waitress.task import ThreadedTaskDispatcher from waitress.utilities import cleanup_unix_socket +from waitress.signals import signals from . import wasyncore from .proxy_headers import proxy_headers_middleware @@ -318,6 +319,7 @@ def handle_accept(self): self.channel_class(self, conn, addr, self.adj, map=self._map) def run(self): + signals.send("server_started", self) try: self.asyncore.loop( timeout=self.adj.asyncore_loop_timeout, @@ -326,6 +328,7 @@ def run(self): ) except (SystemExit, KeyboardInterrupt): self.task_dispatcher.shutdown() + signals.send("server_finished", self) def pull_trigger(self): self.trigger.pull_trigger() diff --git a/src/waitress/signals.py b/src/waitress/signals.py new file mode 100644 index 00000000..53f7b240 --- /dev/null +++ b/src/waitress/signals.py @@ -0,0 +1,117 @@ +from typing import Optional + + +class SignalsRegistry: + """Registry for signals used by waitress. + + The registry is mostly useful to gracefully switch to signals + being no-ops when the blinker library is not available. + """ + + def __init__(self): + try: + import blinker + except ImportError: + self._blinker = None + else: + self._blinker = blinker + + self._signals = dict() + + def create(self, name: str, doc: Optional[str] = None): + """Create a named signal.""" + if self._blinker is None: + return + + self._signals[name] = self._blinker.NamedSignal(name, doc=doc) + + def get(self, name: str): + """Retrieve a signal by its name.""" + if self._blinker is None: + raise RuntimeError( + "Signals cannot be used without the 'blinker' library installed" + ) + + if name not in self._signals: + raise ValueError(f"Signal named '{name}' does not exist") + + return self._signals[name] + + def send(self, name: str, *args, **kwargs): + if self._blinker is None: + return [] + + return self._signals[name].send(*args, **kwargs) + + +signals = SignalsRegistry() + +signals.create( + "channel_added", + doc="""\ +Sent by the event loop when a channel has been added. + +Signal handlers receive: + +- `server` :class:`BaseWSGIServer` that added the channel. +- `channel` :class:`Channel` being added. +""", +) + +signals.create( + "channel_deleted", + doc="""\ +Sent by the event loop when a channel has been deleted. + +Signal handlers receive: + +- `server` :class:`BaseWSGIServer` that deleted the channel. +- `channel` :class:`Channel` being deleted. +""", +) + +signals.create( + "server_started", + doc="""\ +Sent by the event loop when a server is started. + +Signal handlers receive: + +- `server` :class:`BaseWSGIServer` server being started. +""", +) + +signals.create( + "server_finished", + doc="""\ +Sent by the event loop when a server is finishing. + +Signal handlers receive: + +- `server` :class:`BaseWSGIServer` server stopping. +""", +) + +signals.create( + "task_started", + doc="""\ +Sent by a worker thread when a task is started. + +Signal handlers receive: + +- `server` :class:`BaseWSGIServer` that handles the task. +- `task` :class:`Task` being started. +""", +) + +signals.create( + "task_finished", + doc="""\ +Sent by a worker thread when a task is finished. + +Signal handlers receive: + +- `server` :class:`BaseWSGIServer` that handles the task. +- `task` :class:`Task` being finished. +""", +) diff --git a/src/waitress/task.py b/src/waitress/task.py index 574532fa..8521ea19 100644 --- a/src/waitress/task.py +++ b/src/waitress/task.py @@ -20,6 +20,7 @@ from .buffers import ReadOnlyFileBasedBuffer from .utilities import build_http_date, logger, queue_logger +from .signals import signals rename_headers = { # or keep them without the HTTP_ prefix added "CONTENT_LENGTH": "CONTENT_LENGTH", @@ -289,6 +290,7 @@ def remove_content_length_header(self): self.response_headers = response_headers def start(self): + signals.send("task_started", self.channel.server, task=self) self.start_time = time.time() def finish(self): @@ -298,6 +300,8 @@ def finish(self): # not self.write, it will chunk it! self.channel.write_soon(b"0\r\n\r\n") + signals.send("task_finished", self.channel.server, task=self) + def write(self, data): if not self.complete: raise RuntimeError("start_response was not called before body written")