Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add signals to expose internal Waitress events #388

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions signals_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import threading
import time

import waitress
from waitress.signals import signals

total_threads_by_server = dict()
busy_threads_by_server = dict()


def app(env, start_response):
http_status = "200 OK"
response_headers = [("Content-Type", "text/plain")]
response_bytes = b"Hello World"
start_response(http_status, response_headers)
time.sleep(5)
return [response_bytes]


@signals.get("server_started").connect
def server_started(server, *args, **kwargs):
total_threads_by_server[server] = server.adj.threads
busy_threads_by_server[server] = 0
print(
f"Started server listening on {server.addr} with {server.adj.threads} threads"
)


@signals.get("server_finished").connect
def server_finished(server, *args, **kwargs):
print(f"Stopped server listening on {server.addr}")


@signals.get("task_started").connect
def task_started(server, *args, task, **kwargs):
busy_threads_by_server[server] += 1
print(
f"Thread {threading.current_thread().name} started task, {busy_threads_by_server[server]}/{total_threads_by_server[server]}"
)


@signals.get("task_finished").connect
def task_finished(server, *args, task, **kwargs):
busy_threads_by_server[server] -= 1
print(
f"Thread {threading.current_thread().name} finished task, {busy_threads_by_server[server]}/{total_threads_by_server[server]}"
)


if __name__ == "__main__":
waitress.serve(app, threads=2)
4 changes: 4 additions & 0 deletions src/waitress/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from waitress.parser import HTTPRequestParser
from waitress.task import ErrorTask, WSGITask
from waitress.utilities import InternalServerError
from waitress.signals import signals

from . import wasyncore

Expand Down Expand Up @@ -323,6 +324,7 @@ def add_channel(self, map=None):
"""
wasyncore.dispatcher.add_channel(self, map)
self.server.active_channels[self._fileno] = self
signals.send("channel_added", self.server, channel=self)

def del_channel(self, map=None):
"""See wasyncore.dispatcher
Expand All @@ -336,6 +338,8 @@ def del_channel(self, map=None):
if fd in ac:
del ac[fd]

signals.send("channel_deleted", self.server, channel=self)

#
# SYNCHRONOUS METHODS
#
Expand Down
3 changes: 3 additions & 0 deletions src/waitress/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from waitress.compat import IPPROTO_IPV6, IPV6_V6ONLY
from waitress.task import ThreadedTaskDispatcher
from waitress.utilities import cleanup_unix_socket
from waitress.signals import signals

from . import wasyncore
from .proxy_headers import proxy_headers_middleware
Expand Down Expand Up @@ -318,6 +319,7 @@ def handle_accept(self):
self.channel_class(self, conn, addr, self.adj, map=self._map)

def run(self):
signals.send("server_started", self)
try:
self.asyncore.loop(
timeout=self.adj.asyncore_loop_timeout,
Expand All @@ -326,6 +328,7 @@ def run(self):
)
except (SystemExit, KeyboardInterrupt):
self.task_dispatcher.shutdown()
signals.send("server_finished", self)

def pull_trigger(self):
self.trigger.pull_trigger()
Expand Down
117 changes: 117 additions & 0 deletions src/waitress/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from typing import Optional


class SignalsRegistry:
"""Registry for signals used by waitress.

The registry is mostly useful to gracefully switch to signals
being no-ops when the blinker library is not available.
"""

def __init__(self):
try:
import blinker
except ImportError:
self._blinker = None
else:
self._blinker = blinker

self._signals = dict()

def create(self, name: str, doc: Optional[str] = None):
"""Create a named signal."""
if self._blinker is None:
return

self._signals[name] = self._blinker.NamedSignal(name, doc=doc)

def get(self, name: str):
"""Retrieve a signal by its name."""
if self._blinker is None:
raise RuntimeError(
"Signals cannot be used without the 'blinker' library installed"
)

if name not in self._signals:
raise ValueError(f"Signal named '{name}' does not exist")

return self._signals[name]

def send(self, name: str, *args, **kwargs):
if self._blinker is None:
return []

return self._signals[name].send(*args, **kwargs)


signals = SignalsRegistry()

signals.create(
"channel_added",
doc="""\
Sent by the event loop when a channel has been added.

Signal handlers receive:

- `server` :class:`BaseWSGIServer` that added the channel.
- `channel` :class:`Channel` being added.
""",
)

signals.create(
"channel_deleted",
doc="""\
Sent by the event loop when a channel has been deleted.

Signal handlers receive:

- `server` :class:`BaseWSGIServer` that deleted the channel.
- `channel` :class:`Channel` being deleted.
""",
)

signals.create(
"server_started",
doc="""\
Sent by the event loop when a server is started.

Signal handlers receive:

- `server` :class:`BaseWSGIServer` server being started.
""",
)

signals.create(
"server_finished",
doc="""\
Sent by the event loop when a server is finishing.

Signal handlers receive:

- `server` :class:`BaseWSGIServer` server stopping.
""",
)

signals.create(
"task_started",
doc="""\
Sent by a worker thread when a task is started.

Signal handlers receive:

- `server` :class:`BaseWSGIServer` that handles the task.
- `task` :class:`Task` being started.
""",
)

signals.create(
"task_finished",
doc="""\
Sent by a worker thread when a task is finished.

Signal handlers receive:

- `server` :class:`BaseWSGIServer` that handles the task.
- `task` :class:`Task` being finished.
""",
)
4 changes: 4 additions & 0 deletions src/waitress/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from .buffers import ReadOnlyFileBasedBuffer
from .utilities import build_http_date, logger, queue_logger
from .signals import signals

rename_headers = { # or keep them without the HTTP_ prefix added
"CONTENT_LENGTH": "CONTENT_LENGTH",
Expand Down Expand Up @@ -289,6 +290,7 @@ def remove_content_length_header(self):
self.response_headers = response_headers

def start(self):
signals.send("task_started", self.channel.server, task=self)
self.start_time = time.time()

def finish(self):
Expand All @@ -298,6 +300,8 @@ def finish(self):
# not self.write, it will chunk it!
self.channel.write_soon(b"0\r\n\r\n")

signals.send("task_finished", self.channel.server, task=self)

def write(self, data):
if not self.complete:
raise RuntimeError("start_response was not called before body written")
Expand Down