Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV writers keep files open during test + user events for the writing #355

Merged
merged 5 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions grizzly/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def __call__(
def event(
hook: GrizzlyInternalEventHook,
*,
decoder: GrizzlyEventDecoder,
decoder: GrizzlyEventDecoder | None = None,
measurement: str | None = None,
tags: dict[str, str | None] | None = None,
) -> Callable[..., Any]:
Expand All @@ -168,7 +168,11 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> Any:
timestamp = datetime.now(timezone.utc).isoformat()

try:
metrics, decoded_tags = decoder(*args, tags=tags, return_value=return_value, exception=exception, **kwargs)
if decoder:
metrics, decoded_tags = decoder(*args, tags=tags, return_value=return_value, exception=exception, **kwargs)
else:
metrics = {}
decoded_tags = {}

if tags is not None:
decoded_tags.update(tags)
Expand Down
9 changes: 7 additions & 2 deletions grizzly/locust.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .context import GrizzlyContext
from .listeners import init, init_statistics_listener, locust_test_start, locust_test_stop, quitting, spawning_complete, validate_result, worker_report
from .testdata.utils import initialize_testdata
from .testdata.variables.csv_writer import open_files
from .types import RequestType, TestdataType
from .types.behave import Context, Status
from .types.locust import Environment, LocustRunner, MasterRunner, Message, WorkerRunner
Expand All @@ -44,6 +45,7 @@
if TYPE_CHECKING:
from collections.abc import Generator, Iterator

from gevent.fileobject import FileObjectThread
from locust.runners import WorkerNode


Expand Down Expand Up @@ -865,7 +867,7 @@ def return_code(environment: Environment, msg: Message) -> None:
logger.info('worker %s changed environment.process_exit_code: %r -> %r', msg.node_id, old_rc, environment.process_exit_code)


def shutdown_external_processes(processes: dict[str, subprocess.Popen], greenlet: Optional[gevent.Greenlet]) -> None:
def cleanup_resources(processes: dict[str, subprocess.Popen], greenlet: Optional[gevent.Greenlet], file_handle_cache: dict[str, FileObjectThread]) -> None:
if len(processes) < 1:
return

Expand All @@ -888,6 +890,9 @@ def shutdown_external_processes(processes: dict[str, subprocess.Popen], greenlet

processes.clear()

for file_handle in file_handle_cache.values():
file_handle.close()
boffman marked this conversation as resolved.
Show resolved Hide resolved


def run(context: Context) -> int: # noqa: C901, PLR0915, PLR0912
grizzly = cast(GrizzlyContext, context.grizzly)
Expand Down Expand Up @@ -1260,7 +1265,7 @@ def wrapper() -> None:

return code
finally:
shutdown_external_processes(external_processes, watch_running_external_processes_greenlet)
cleanup_resources(external_processes, watch_running_external_processes_greenlet, open_files)


def _grizzly_sort_stats(stats: lstats.RequestStats) -> list[tuple[str, str, int]]:
Expand Down
51 changes: 46 additions & 5 deletions grizzly/testdata/variables/csv_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, Optional, cast

from gevent.fileobject import FileObjectThread

from grizzly.events import GrizzlyEventDecoder, event, events
from grizzly.types import bool_type, list_type
from grizzly.types.locust import Environment, MasterRunner, Message
from grizzly_extras.arguments import parse_arguments, split_value
Expand All @@ -40,6 +43,8 @@
from grizzly.types.locust import MessageHandler


open_files: dict[str, FileObjectThread] = {}

def atomiccsvwriter__base_type__(value: str) -> str:
"""Validate values that `AtomicCsvWriter` can be initialized with."""
grizzly_context_requests = Path(environ.get('GRIZZLY_CONTEXT_ROOT', '')) / 'requests'
Expand Down Expand Up @@ -82,6 +87,35 @@ def atomiccsvwriter__base_type__(value: str) -> str:
return value


class CsvMessageDecoder(GrizzlyEventDecoder):
def __call__(
self,
*args: Any,
tags: dict[str, str | None] | None,
return_value: Any, # noqa: ARG002
exception: Exception | None,
**kwargs: Any,
) -> tuple[dict[str, Any], dict[str, str | None]]:
if tags is None:
tags = {}

message = args[self.arg] if isinstance(self.arg, int) else kwargs.get(self.arg)

tags = {
'filename': message.data['destination'],
**tags,
}

metrics: dict[str, Any] = {
'error': None,
}

if exception is not None:
metrics.update({'error': str(exception)})

return metrics, tags

@event(events.user_event, tags={'type': 'testdata::atomiccsvwriter'}, decoder=CsvMessageDecoder(arg='msg'))
def atomiccsvwriter_message_handler(environment: Environment, msg: Message, **_kwargs: Any) -> None: # noqa: ARG001
"""Receive messages containing CSV data.
Write the data to a CSV file.
Expand All @@ -93,15 +127,22 @@ def atomiccsvwriter_message_handler(environment: Environment, msg: Message, **_k
context_root = Path(environ.get('GRIZZLY_CONTEXT_ROOT', '')) / 'requests'

output_path = context_root / destination_file
file_key = str(output_path.absolute())
boffman marked this conversation as resolved.
Show resolved Hide resolved

exists = output_path.exists()

with output_path.open('a+', newline='') as csv_file:
writer = DictWriter(csv_file, fieldnames=headers)
if not exists:
writer.writeheader()
if file_key in open_files:
csv_file = open_files[file_key]
else:
csv_file = FileObjectThread(output_path.open('a+', newline=''))
open_files[file_key] = csv_file

writer = DictWriter(csv_file, fieldnames=headers)
if not exists:
writer.writeheader()

writer.writerow(data['row'])
writer.writerow(data['row'])
csv_file.flush()


class AtomicCsvWriter(AtomicVariable[str], AtomicVariableSettable):
Expand Down
Loading