Skip to content

Commit

Permalink
rewrite of testdata communication, and other improvements (#357)
Browse files Browse the repository at this point in the history
* custom messages should run concurrent

* testdata* communication rewrite, et al

simplified `GrizzlyContext` by removing singleton logic from instance creation, and instead creates a global instance which should be imported where needed.

rewrite of `Testdata*` communication, to use locust custom messages instead of their own ZMQ socket and connection.

* better handling for detecting when spawning is complete

replace bool with a Semaphore, which is acquired on locust init, and released on "spawning complete" event.

master/local uses the semaphore before starting the check that there are users running.

workers/local uses the semaphore to not stop any users before all users has been spawned. this is to prevent locust to spawn additional users that has stopped before complete spawning has finished.
stopping a users after all users has spawned will not result in additional users spawning after being stopped.

* removed unnecessary logger, use the users logger instead, which also gives better traceability.

* timeout after waiting for a TestdataProducer response for 10 seconds

* cleanup of AsyncServiceBusHandler logging
  • Loading branch information
mgor authored Nov 11, 2024
1 parent f5c079b commit cf9409e
Show file tree
Hide file tree
Showing 31 changed files with 1,062 additions and 1,400 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/code-quality.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ jobs:

- name: coverage
id: coverage
run: python -m coverage report --omit=**/*messagequeue*,**/mq/*,**/__version__.py --fail-under=89
run: |
python -m coverage report -i --omit=**/*messagequeue*,**/mq/*,**/__version__.py --fail-under=89 || python -m coverage debug data
test-e2e:
name: 'e2e tests ${{ matrix.run_mode }}'
Expand Down
7 changes: 4 additions & 3 deletions grizzly/behave.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ def before_feature(context: Context, feature: Feature, *_args: Any, **_kwargs: A

destroy_variables()

with suppress(ValueError):
GrizzlyContext.destroy()
from grizzly import context as grizzly_context

grizzly = GrizzlyContext()
grizzly_context.grizzly = GrizzlyContext()

grizzly = grizzly_context.grizzly
grizzly.state.verbose = context.config.verbose

persistent_file = Path(context.config.base_dir) / 'persistent' / f'{Path(feature.filename).stem}.json'
Expand Down
104 changes: 40 additions & 64 deletions grizzly/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
from datetime import datetime, timezone
from os import environ
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast
from typing import TYPE_CHECKING, Any, Callable, Optional, cast

import yaml
from gevent.lock import Semaphore
from jinja2 import DebugUndefined, Environment, FileSystemLoader
from jinja2.filters import FILTERS
from typing_extensions import Self

from grizzly.events import events
from grizzly.testdata import GrizzlyVariables
from grizzly.types import MessageCallback, MessageDirection
from grizzly.utils import MergeYamlTag, flatten, merge_dicts
Expand All @@ -21,6 +23,7 @@
from locust.dispatch import UsersDispatcher

from grizzly.events import GrizzlyEvents
from grizzly.testdata.communication import TestdataProducer
from grizzly.types.behave import Scenario
from grizzly.types.locust import LocalRunner, MasterRunner, WorkerRunner

Expand Down Expand Up @@ -62,67 +65,6 @@ def load_configuration_file() -> dict[str, Any]:
return configuration


class GrizzlyContext:
__instance: Optional[GrizzlyContext] = None

_initialized: bool
_state: GrizzlyContextState
_setup: GrizzlyContextSetup
_scenarios: GrizzlyContextScenarios
_events: GrizzlyEvents


def __new__(cls, *_args: Any, **_kwargs: Any) -> GrizzlyContext: # noqa: PYI034
"""Class is a singleton, there should only be once instance of it."""
if cls.__instance is None:
cls.__instance = super().__new__(cls)
cls.__instance._initialized = False

return cls.__instance

@classmethod
def destroy(cls) -> None:
if cls.__instance is None:
message = f"'{cls.__name__}' is not instantiated"
raise ValueError(message)

cls.__instance = None

def __init__(self) -> None:
if not self._initialized:
from grizzly.events import events
self._setup = GrizzlyContextSetup()
self._scenarios = GrizzlyContextScenarios(self)
self._state = GrizzlyContextState()
self._events = events
self._initialized = True

@property
def setup(self) -> GrizzlyContextSetup:
return self._setup

@property
def state(self) -> GrizzlyContextState:
return self._state

@property
def scenario(self) -> GrizzlyContextScenario:
"""Read-only scenario child instance. Shortcut to the current (latest) scenario in the context."""
if len(self._scenarios) < 1:
message = 'no scenarios created!'
raise ValueError(message)

return self._scenarios[self._scenarios._active]

@property
def scenarios(self) -> GrizzlyContextScenarios:
return self._scenarios

@property
def events(self) -> GrizzlyEvents:
return self._events


class DebugChainableUndefined(DebugUndefined):
_undefined_name: str | None

Expand Down Expand Up @@ -150,11 +92,13 @@ def jinja2_environment_factory() -> Environment:

@dataclass
class GrizzlyContextState:
spawning_complete: bool = field(default=False)
spawning_complete: Semaphore = field(default_factory=Semaphore)
background_done: bool = field(default=False)
configuration: dict[str, Any] = field(init=False, default_factory=load_configuration_file)
verbose: bool = field(default=False)
locust: Union[MasterRunner, WorkerRunner, LocalRunner] = field(init=False, repr=False)
locust: MasterRunner | WorkerRunner | LocalRunner = field(init=False, repr=False)
producer: TestdataProducer | None = field(init=False, repr=False, default=None)



@dataclass
Expand Down Expand Up @@ -427,3 +371,35 @@ def create(self, behave: Scenario) -> GrizzlyContextScenario:
self.deselect()

return grizzly_scenario


@dataclass
class GrizzlyContext:
state: GrizzlyContextState = field(init=False, default_factory=GrizzlyContextState)
setup: GrizzlyContextSetup = field(init=False, default_factory=GrizzlyContextSetup)
_scenarios: GrizzlyContextScenarios = field(init=False)
_events: GrizzlyEvents = field(init=False)

def __post_init__(self) -> None:
self._scenarios = GrizzlyContextScenarios(self)
self._events = events

@property
def scenario(self) -> GrizzlyContextScenario:
"""Read-only scenario child instance. Shortcut to the current (latest) scenario in the context."""
if len(self._scenarios) < 1:
message = 'no scenarios created!'
raise ValueError(message)

return self._scenarios[self._scenarios._active]

@property
def scenarios(self) -> GrizzlyContextScenarios:
return self._scenarios

@property
def events(self) -> GrizzlyEvents:
return self._events


grizzly = GrizzlyContext()
80 changes: 20 additions & 60 deletions grizzly/listeners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
from __future__ import annotations

import logging
from os import environ
from typing import TYPE_CHECKING, Any, Callable, Optional, cast
from urllib.parse import urlparse

import gevent
from locust.stats import (
RequestStats,
StatsEntry,
Expand All @@ -16,7 +14,7 @@
)
from typing_extensions import Concatenate, ParamSpec

from grizzly.testdata.communication import TestdataProducer
from grizzly.testdata.communication import TestdataConsumer, TestdataProducer
from grizzly.types import MessageDirection, RequestType, TestdataType
from grizzly.types.behave import Status
from grizzly.types.locust import Environment, LocustRunner, MasterRunner, Message, WorkerRunner
Expand All @@ -26,47 +24,21 @@

P = ParamSpec('P')

producer: Optional[TestdataProducer] = None

producer_greenlet: Optional[gevent.Greenlet] = None

logger = logging.getLogger(__name__)


def _init_testdata_producer(grizzly: GrizzlyContext, port: str, testdata: TestdataType) -> Callable[[], None]:
def gtestdata_producer() -> None:
global producer # noqa: PLW0603
producer_address = f'tcp://0.0.0.0:{port}'
producer = TestdataProducer(
grizzly=grizzly,
address=producer_address,
testdata=testdata,
)
producer.run()

return gtestdata_producer


def init(grizzly: GrizzlyContext, testdata: Optional[TestdataType] = None) -> Callable[Concatenate[LocustRunner, P], None]:
def ginit(runner: LocustRunner, **_kwargs: P.kwargs) -> None:
producer_port = environ.get('TESTDATA_PRODUCER_PORT', '5555')
if not isinstance(runner, MasterRunner):
producer_address = runner.master_host if isinstance(runner, WorkerRunner) else '127.0.0.1'

producer_address = f'tcp://{producer_address}:{producer_port}'
logger.debug('producer_address=%s', producer_address)
environ['TESTDATA_PRODUCER_ADDRESS'] = producer_address
# acquire lock, that will be released when all users has spawned (on_spawning_complete)
grizzly.state.spawning_complete.acquire()

if not isinstance(runner, WorkerRunner):
if testdata is not None:
global producer_greenlet # noqa: PLW0603
producer_greenlet = gevent.spawn(
_init_testdata_producer(
grizzly,
producer_port,
testdata,
),
grizzly.state.producer = TestdataProducer(
runner=runner,
testdata=testdata,
)
runner.register_message('produce_testdata', grizzly.state.producer.handle_request, concurrent=True)
else:
logger.error('there is no test data!')
else:
Expand All @@ -75,11 +47,13 @@ def ginit(runner: LocustRunner, **_kwargs: P.kwargs) -> None:

if not isinstance(runner, MasterRunner):
for message_type, callback in grizzly.setup.locust.messages.get(MessageDirection.SERVER_CLIENT, {}).items():
runner.register_message(message_type, callback)
runner.register_message(message_type, callback, concurrent=True)

runner.register_message('consume_testdata', TestdataConsumer.handle_response, concurrent=True)

if not isinstance(runner, WorkerRunner):
for message_type, callback in grizzly.setup.locust.messages.get(MessageDirection.CLIENT_SERVER, {}).items():
runner.register_message(message_type, callback)
runner.register_message(message_type, callback, concurrent=True)

return cast(Callable[Concatenate[LocustRunner, P], None], ginit)

Expand Down Expand Up @@ -122,39 +96,25 @@ def gtest_start(environment: Environment, **_kwargs: P.kwargs) -> None:
return cast(Callable[Concatenate[Environment, P], None], gtest_start)


def locust_test_stop(**_kwargs: Any) -> None:
if producer is not None:
producer.on_test_stop()
def locust_test_stop(grizzly: GrizzlyContext) -> Callable[Concatenate[Environment, P], None]:
def gtest_stop(environment: Environment, **_kwargs: P.kwargs) -> None: # noqa: ARG001
if grizzly.state.producer is not None:
grizzly.state.producer.on_test_stop()

return cast(Callable[Concatenate[Environment, P], None], gtest_stop)


def spawning_complete(grizzly: GrizzlyContext) -> Callable[..., None]:
def gspawning_complete(**_kwargs: Any) -> None:
logger.debug('spawning complete!')
grizzly.state.spawning_complete = True
def gspawning_complete(user_count: int, **_kwargs: Any) -> None:
logger.debug('spawning of %d users completed', user_count)
grizzly.state.spawning_complete.release()

return gspawning_complete


def worker_report(client_id: str, data: dict[str, Any]) -> None: # noqa: ARG001
logger.debug('received worker_report from %s', client_id)


def quitting(**_kwargs: Any) -> None:
logger.debug('locust quitting')
global producer_greenlet, producer # noqa: PLW0603
if producer is not None:
producer.stop()
producer = None

if producer_greenlet is not None:
try:
producer_greenlet.kill(block=False)
except: # noqa: S110
pass
finally:
producer_greenlet = None


def grizzly_worker_quit(environment: Environment, msg: Message, **_kwargs: Any) -> None:
logger.debug('received message grizzly_worker_quit: msg=%r', msg)
runner = environment.runner
Expand Down
4 changes: 2 additions & 2 deletions grizzly/listeners/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError

from grizzly.context import GrizzlyContext
from grizzly.types.locust import CatchResponseError, Environment

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -152,14 +151,15 @@ def __init__(
self.run_events_greenlet = gevent.spawn(self.run_events)
self.run_user_count_greenlet = gevent.spawn(self.run_user_count)
self.connection = self.create_client().connect()
self.grizzly = GrizzlyContext()
self.logger = logging.getLogger(__name__)
self.environment.events.request.add_listener(self.request)
self.environment.events.heartbeat_sent.add_listener(self.heartbeat_sent)
self.environment.events.heartbeat_received.add_listener(self.heartbeat_received)
self.environment.events.usage_monitor.add_listener(self.usage_monitor)
self.environment.events.quit.add_listener(self.on_quit)

from grizzly.context import grizzly
self.grizzly = grizzly
self.grizzly.events.keystore_request.add_listener(self.on_grizzly_event)
self.grizzly.events.testdata_request.add_listener(self.on_grizzly_event)
self.grizzly.events.user_event.add_listener(self.on_grizzly_event)
Expand Down
Loading

0 comments on commit cf9409e

Please sign in to comment.