Skip to content

Commit

Permalink
improved handling of abort (ctrl + c) (#349)
Browse files Browse the repository at this point in the history
* no double mypy in devcontainer

removed ms-python extension and using dmypy which checks the whole workspace (specified targets)

* HaltUser thrown on AsyncMessageAbort

So that scenario stops directly, without waiting for tasks to finish.

* let router do the cleanup instead of worker

* kill task greenlet when aborting a run

better exception logging when something goes wrong in a task.

better "request" statistics for SCENARIO.

* a test project that can be used to test in grizzly

* updates for handling abort (ctrl + c) in async-messaged

* update jinja to patch cve-2024-22195

* fixed linting errors

* fix pyright zmq type warnings in tests

increase timeout for `*no_pymqi*` tests.

* handling more error cases during abort

* exclude unnecessary stuff when doing local install builds with grizzly-cli

* additional handling of error paths when async-messaged is terminated

two new messages sent from worker to master:
- return_code, the workers return code. the highest reported return code will be used by the master when exiting
- sig_trap, message to tell master that a signal has been received and that the test is aborted. this is needed since compose services with healthchecks does not trap signals correctly, which means that `grizzly-cli` wouldn't get correct return code

* let router close the integration when closing down

* improved logging

* correct key for cleaning `client_worker_map`

* if there still is sender and/or receivers, change response action to "DISCONNECTING" so that the worker doesn't stop before all entitites has been disconnected

* fixed unit tests
  • Loading branch information
mgor authored Oct 1, 2024
1 parent 854886b commit e1a01ce
Show file tree
Hide file tree
Showing 39 changed files with 629 additions and 334 deletions.
3 changes: 0 additions & 3 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
"settings": {
"python.defaultInterpreterPath": "/usr/local/bin/python",
"python.pythonPath": "/usr/local/bin/python",
"mypy-type-checker.path": ["mypy"],
"python.languageServer": "Pylance",
"pythonTestExplorer.testFramework": "pytest",
"files.associations": {
"*.j2.json": "jinja-json",
"*.j2.xml": "jinja-xml"
},
"mypy-type-checker.ignorePatterns": ["tests/project/**/*.py"],
"mypy.targets": [
"grizzly/",
"grizzly_extras/",
Expand All @@ -43,7 +41,6 @@
"extensions": [
"ms-python.python",
"ms-python.vscode-pylance",
"ms-python.mypy-type-checker",
"editorconfig.editorconfig",
"eamodio.gitlens",
"samuelcolvin.jinjahtml",
Expand Down
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
./tests/
./build/
./**/*.egg-info
.*/
!.git
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ docs/_build*
grizzly/__version__.py
.pytest_tmp/
tests/project
tests/test-project/**/_*.feature
8 changes: 2 additions & 6 deletions grizzly/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@

from locust.exception import StopUser

from grizzly_extras.async_message import AsyncMessageAbort
from grizzly_extras.exceptions import StopScenario

if TYPE_CHECKING: # pragma: no cover
from grizzly.types.behave import Scenario, Step


__all__ = [
'StopScenario',
'StopUser',
'AsyncMessageAbort',
]


Expand All @@ -27,10 +27,6 @@ def __init__(self, message: Optional[str] = None) -> None:
class RestartScenario(Exception): # noqa: N818
pass


class StopScenario(Exception): # noqa: N818
pass

class AssertionErrors(Exception): # noqa: N818
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
Expand Down
60 changes: 42 additions & 18 deletions grizzly/gevent.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,84 @@
"""Additional custom gevent functionality."""
from __future__ import annotations

from contextlib import contextmanager
from functools import wraps
from typing import Any, Callable
from typing import TYPE_CHECKING, Any, Callable

from gevent import Greenlet, getcurrent

if TYPE_CHECKING:
import logging
from collections.abc import Generator

class GreenletWithExceptionCatching(Greenlet):

class GreenletFactory:
"""Catch exceptions thrown by a function executing in a greenlet."""

started_from: Greenlet
logger: logging.Logger
ignore_exceptions: list[type[Exception]]
index: int
total: int
description: str

def __init__(self, *args: Any, **kwargs: Any) -> None:
def __init__(self, *args: Any, logger: logging.Logger, ignore_exceptions: list[type[Exception]] | None = None, **kwargs: Any) -> None:
"""Initialize Greenlet object, with custom property from which greenlet this greenlet was started."""
super().__init__(*args, **kwargs)
self.started_from = getcurrent()
self.logger = logger
self.ignore_exceptions = ignore_exceptions if ignore_exceptions is not None else []
self.index = -1
self.total = -1
self.description = ''

def handle_exception(self, error: Exception) -> None:
def handle_exception(self, exception: Exception) -> None:
"""Handle exception thrown, by throwing it from the greenlet that started this greenlet."""
self.started_from.throw(error)
if exception.__class__ not in self.ignore_exceptions and self.total > 0:
message = f'task {self.index} of {self.total} failed: {self.description}'
self.logger.exception(message)

self.started_from.throw(exception)

def wrap_exceptions(self, func: Callable) -> Callable:
"""Make sure exceptions is thrown from the correct place, so it can be handled."""
@wraps(func)
def exception_handler(*args: Any, **kwargs: Any) -> Any:
try:
return func(*args, **kwargs)
result = func(*args, **kwargs)

if self.total > 0:
message = f'task {self.index} of {self.total} executed: {self.description}'
self.logger.debug(message)
except Exception as exception:
self.wrap_exceptions(self.handle_exception)(exception)
return exception # pragma: no cover
return exception
else:
return result

return exception_handler

def spawn(self, func: Callable, *args: Any, **kwargs: Any) -> Greenlet:
"""Spawn a greenlet executing the function, in a way that any exceptions can be handled where it was spawned."""
return super().spawn(
return Greenlet.spawn(
self.wrap_exceptions(func),
*args,
**kwargs,
)

def spawn_blocking(self, func: Callable, *args: Any, **kwargs: Any) -> None:
@contextmanager
def spawn_task(self, func: Callable, index: int, total: int, description: str, *args: Any, **kwargs: Any) -> Generator[Greenlet, None, None]:
"""Spawn a greenlet executing the function and wait for the function to finish.
Get the result of the executed function, if there was an exception raised, it will be
re-raised by `get`.
"""
self.index = index
self.total = total
self.description = description

greenlet = self.spawn(func, *args, **kwargs)

yield greenlet

greenlet.join()
greenlet.get()

def spawn_later(self, seconds: int, func: Callable, *args: Any, **kwargs: Any) -> Greenlet:
"""Spawn a greenlet `seconds` in the future, in a way that any exceptions can be handled where it was spawned."""
return super().spawn_later(
seconds,
self.wrap_exceptions(func),
*args,
**kwargs,
)
9 changes: 6 additions & 3 deletions grizzly/listeners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,16 @@ def quitting(**_kwargs: Any) -> None:
logger.debug('locust quitting')
global producer_greenlet, producer # noqa: PLW0603
if producer is not None:
logger.debug('stopping producer')
producer.stop()
producer = None

if producer_greenlet is not None:
producer_greenlet.kill(block=True)
producer_greenlet = None
try:
producer_greenlet.kill(block=False)
except: # noqa: S110
pass
finally:
producer_greenlet = None


def grizzly_worker_quit(environment: Environment, msg: Message, **_kwargs: Any) -> None:
Expand Down
55 changes: 34 additions & 21 deletions grizzly/locust.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import TYPE_CHECKING, Any, Callable, NoReturn, Optional, SupportsIndex, TypeVar, cast

import gevent
import gevent.event
from locust import events
from locust import stats as lstats
from locust.dispatch import UsersDispatcher
Expand Down Expand Up @@ -47,7 +48,7 @@


unhandled_greenlet_exception: bool = False
abort_test: bool = False
abort_test: gevent.event.Event = gevent.event.Event()


logger = logging.getLogger('grizzly.locust')
Expand Down Expand Up @@ -818,7 +819,7 @@ def create_separator(max_length_iterations: int, max_length_status: int, max_len

stat = stats.get(scenario.locust_name, RequestType.SCENARIO())
if stat.num_requests > 0:
if abort_test:
if abort_test.is_set():
status = Status.skipped
stat.num_requests -= 1
elif stat.num_failures == 0 and stat.num_requests == scenario.iterations and total_errors == 0:
Expand Down Expand Up @@ -848,11 +849,21 @@ def create_separator(max_length_iterations: int, max_length_status: int, max_len
print(separator)


def grizzly_test_abort(*_args: Any, **_kwargs: Any) -> None:
global abort_test # noqa: PLW0603
def sig_trap(msg: Message, **_kwargs: Environment) -> None:
if not abort_test.is_set():
abort_test.set()
logger.info('worker %s triggered test abort on master', msg.node_id)


def return_code(environment: Environment, msg: Message) -> None:
rc = int(msg.data)
old_rc = environment.process_exit_code

environment.process_exit_code = max(environment.process_exit_code or -1, rc)

if old_rc != rc:
logger.info('worker %s changed environment.process_exit_code: %r -> %r', msg.node_id, old_rc, environment.process_exit_code)

if not abort_test:
abort_test = True

def shutdown_external_processes(processes: dict[str, subprocess.Popen], greenlet: Optional[gevent.Greenlet]) -> None:
if len(processes) < 1:
Expand All @@ -861,7 +872,7 @@ def shutdown_external_processes(processes: dict[str, subprocess.Popen], greenlet
if greenlet is not None:
greenlet.kill(block=False)

stop_method = 'killing' if abort_test else 'stopping'
stop_method = 'killing' if abort_test.is_set() else 'stopping'

for dependency, process in processes.items():
logger.info('%s %s', stop_method, dependency)
Expand Down Expand Up @@ -1036,7 +1047,8 @@ def watch_running_external_processes() -> None:
grizzly.state.locust.register_message(message_type, callback)
logger.info('registered callback for message type "%s"', message_type)

runner.register_message('client_aborted', grizzly_test_abort)
runner.register_message('sig_trap', sig_trap)
runner.register_message('return_code', return_code)

main_greenlet = runner.greenlet

Expand Down Expand Up @@ -1143,9 +1155,9 @@ def watch_running_users() -> None:
logger.debug('user_count=%d, user_classes_count=%r', runner.user_count, user_classes_count)
count = 0

logger.info('runner.user_count=%d, quit %s, abort_test=%r', runner.user_count, runner.__class__.__name__, abort_test)
logger.info('runner.user_count=%d, quit %s, abort_test=%r', runner.user_count, runner.__class__.__name__, abort_test.is_set())
# has already been fired if abort_test = True
if not abort_test:
if not abort_test.is_set():
runner.environment.events.quitting.fire(environment=runner.environment, reverse=True)

if isinstance(runner, MasterRunner):
Expand Down Expand Up @@ -1200,20 +1212,16 @@ def sig_handler(signum: int) -> Callable[[], None]:
signame = 'UNKNOWN'

def wrapper() -> None:
global abort_test # noqa: PLW0603
if abort_test:
if abort_test.is_set():
return

logger.info('handling signal %s (%d)', signame, signum)

logger.debug('shutdown external processes')

abort_test = True
shutdown_external_processes(external_processes, watch_running_external_processes_greenlet)
abort_test.set()

if isinstance(runner, WorkerRunner):
runner._send_stats()
runner.client.send(Message('client_aborted', None, runner.client_id))
runner.client.send(Message('sig_trap', None, runner.client_id))

runner.environment.events.quitting.fire(environment=runner.environment, reverse=True, abort=True)

Expand All @@ -1224,9 +1232,8 @@ def wrapper() -> None:

try:
main_greenlet.join()
logger.debug('main greenlet finished')
finally:
if abort_test:
if abort_test.is_set() and not isinstance(runner, MasterRunner):
code = SIGTERM.value
elif unhandled_greenlet_exception:
code = 2
Expand All @@ -1237,10 +1244,16 @@ def wrapper() -> None:
else:
code = 0

if isinstance(runner, WorkerRunner):
runner.client.send(Message('return_code', code, runner.client_id))
else:
code = max(code, environment.process_exit_code or -1)

logger.debug('main greenlet finished, rc = %d', code)

return code
finally:
if not abort_test:
shutdown_external_processes(external_processes, watch_running_external_processes_greenlet)
shutdown_external_processes(external_processes, watch_running_external_processes_greenlet)


def _grizzly_sort_stats(stats: lstats.RequestStats) -> list[tuple[str, str, int]]:
Expand Down
24 changes: 14 additions & 10 deletions grizzly/scenarios/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@
from os import environ
from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast

from gevent.event import Event
from locust.exception import LocustError
from locust.user.sequential_taskset import SequentialTaskSet

from grizzly.context import GrizzlyContext
from grizzly.exceptions import StopScenario
from grizzly.gevent import GreenletWithExceptionCatching
from grizzly.gevent import GreenletFactory
from grizzly.tasks import GrizzlyTask, grizzlytask
from grizzly.testdata.communication import TestdataConsumer
from grizzly.types import ScenarioState
from grizzly.types.locust import StopUser

if TYPE_CHECKING: # pragma: no cover
from gevent import Greenlet
from locust.user.task import TaskSet

from grizzly.users import GrizzlyUser
Expand All @@ -26,9 +28,9 @@ class GrizzlyScenario(SequentialTaskSet):
consumer: TestdataConsumer
logger: logging.Logger
grizzly: GrizzlyContext
task_greenlet: Optional[GreenletWithExceptionCatching]
task_greenlet_factory: GreenletWithExceptionCatching
abort: bool
task_greenlet: Optional[Greenlet]
task_greenlet_factory: GreenletFactory
abort: Event
spawning_complete: bool

_task_index: int
Expand All @@ -39,8 +41,8 @@ def __init__(self, parent: GrizzlyUser) -> None:
self.grizzly = GrizzlyContext()
self.user.scenario_state = ScenarioState.STOPPED
self.task_greenlet = None
self.task_greenlet_factory = GreenletWithExceptionCatching()
self.abort = False
self.task_greenlet_factory = GreenletFactory(logger=self.logger, ignore_exceptions=[StopScenario])
self.abort = Event()
self.spawning_complete = False
self.parent.environment.events.quitting.add_listener(self.on_quitting)
self._task_index = 0
Expand Down Expand Up @@ -115,9 +117,10 @@ def on_quitting(self, *_args: Any, **kwargs: Any) -> None:
"""When locust is quitting, with abort=True (signal received) we should force the
running task to stop by throwing an exception in the greenlet where it is running.
"""
if self.task_greenlet is not None and kwargs.get('abort', False):
self.abort = True
if self.task_greenlet is not None and kwargs.get('abort', False) and not self.abort.is_set():
self.abort.set()
self.task_greenlet.kill(StopScenario, block=False)
self.logger.debug('killed task (greenlet)')

def get_next_task(self) -> Union[TaskSet, Callable]:
"""Use old way of getting task, so we can reset which task to start from."""
Expand All @@ -130,12 +133,13 @@ def get_next_task(self) -> Union[TaskSet, Callable]:

return task

def execute_next_task(self) -> None:
def execute_next_task(self, index: int, total: int, description: str) -> None: # type: ignore[override]
"""Execute task in a greenlet, so that we have the possibility to stop it on demand. Any exceptions
raised in the greenlet should be caught else where.
"""
try:
self.task_greenlet_factory.spawn_blocking(super().execute_next_task)
with self.task_greenlet_factory.spawn_task(super().execute_next_task, index, total, description) as greenlet:
self.task_greenlet = greenlet
finally:
self.task_greenlet = None

Expand Down
Loading

0 comments on commit e1a01ce

Please sign in to comment.