diff --git a/.gitignore b/.gitignore index dc6c4b49..d6048e51 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,6 @@ target/ # Vim *.swp + +# VScode +.vscode/ diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7ef0a5fd..5858c592 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -9,6 +9,10 @@ TBD but options like 'fanout' can now be supported - Better handling of Pika errors - Updated how AutoBrewtils maps functions, and will skip auto marking commands with annotations +- When SystemClient is self referencing to the Plugin, child requests will be generated + locally, then uploaded to Beer-Garden once the request is completed. +- Must upgrade to a minimum version of Beer Garden 3.23.0 to support new APIs + 3.22.0 ------ diff --git a/brewtils/plugin.py b/brewtils/plugin.py index 610eda6b..9f874aa8 100644 --- a/brewtils/plugin.py +++ b/brewtils/plugin.py @@ -170,7 +170,7 @@ class Plugin(object): logging configuration. worker_shutdown_timeout (int): Time to wait during shutdown to finish processing - max_concurrent (int): Maximum number of requests to process concurrently + max_concurrent (int): Maximum number of requests to process concurrently from RabbitMQ max_attempts (int): Number of times to attempt updating of a Request before giving up. Negative numbers are interpreted as no maximum. max_timeout (int): Maximum amount of time to wait between Request update @@ -189,7 +189,6 @@ class Plugin(object): """ def __init__(self, client=None, system=None, logger=None, **kwargs): - self._client = None self._instance = None self._admin_processor = None self._request_processor = None @@ -215,9 +214,13 @@ def __init__(self, client=None, system=None, logger=None, **kwargs): # Now set up the system self._system = self._setup_system(system, kwargs) + global CLIENT # Make sure this is set after self._system if client: - self.client = client + self._client = client + CLIENT = client + else: + self._client = None # Now that the config is loaded we can create the EasyClient self._ez_client = EasyClient(logger=self._logger, **self._config) @@ -305,6 +308,7 @@ def client(self, new_client): ) self._client = new_client + brewtils.plugin.CLIENT = new_client @property def system(self): @@ -594,7 +598,7 @@ def _initialize_processors(self): max_workers=1, ) request_processor = RequestProcessor( - target=self._client, + target=CLIENT, updater=updater, consumer=request_consumer, validation_funcs=[self._correct_system, self._is_running], @@ -843,14 +847,14 @@ def _validate_system(self): if not self._system.version: raise ValidationError("Plugin system must have a version") - client_name = getattr(self._client, "_bg_name", None) + client_name = getattr(CLIENT, "_bg_name", None) if client_name and client_name != self._system.name: raise ValidationError( "System name '%s' doesn't match name from client decorator: " "@system(bg_name=%s)" % (self._system.name, client_name) ) - client_version = getattr(self._client, "_bg_version", None) + client_version = getattr(CLIENT, "_bg_version", None) if client_version and client_version != self._system.version: raise ValidationError( "System version '%s' doesn't match version from client decorator: " diff --git a/brewtils/request_handling.py b/brewtils/request_handling.py index 3ff50dc8..ecb2096f 100644 --- a/brewtils/request_handling.py +++ b/brewtils/request_handling.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import abc +import copy import json import logging import sys @@ -21,9 +22,130 @@ parse_exception_as_json, ) from brewtils.models import Request +from brewtils.resolvers.manager import ResolutionManager from brewtils.schema_parser import SchemaParser +class LocalRequestProcessor(object): + """ """ + + def __init__( + self, + logger=None, + system=None, + easy_client=None, + resolver=None, + ): + self.logger = logger or logging.getLogger(__name__) + self._system = system + self._resolver = resolver or ResolutionManager(easy_client=easy_client) + self._ez_client = easy_client + + def process_command(self, request): + """Process a command locally. + + Will update the child request map with the generated Request object to be + sent to Beer Garden + + Args: + command_name: the command to be processed + + Returns: + Any + """ + + parent_request = copy.deepcopy(brewtils.plugin.request_context.current_request) + + request.parent = Request(id=str(parent_request.id)) + request.has_parent = True + + request.status = "IN_PROGRESS" + + request = self._ez_client.put_request(request) + + try: + output = self._invoke_command(brewtils.plugin.CLIENT, request) + except Exception as exc: + self._handle_invoke_failure(request, exc) + request = self._ez_client.put_request(request) + brewtils.plugin.request_context.current_request = parent_request + raise exc + else: + self._handle_invoke_success(request, output) + request = self._ez_client.put_request(request) + brewtils.plugin.request_context.current_request = parent_request + return output + + def _invoke_command(self, target, request): + """Invoke the function named in request.command + + Args: + target: The object to search for the function implementation. + request: The request to process + + Returns: + The output of the function call + + Raises: + RequestProcessingError: The specified target does not define a + callable implementation of request.command + """ + if not callable(getattr(target, request.command, None)): + raise RequestProcessingError( + "Could not find an implementation of command '%s'" % request.command + ) + + # Get the command to use the parameter definitions when resolving + command = None + if self._system: + command = self._system.get_command_by_name(request.command) + + # Now resolve parameters, if necessary + if request.is_ephemeral or not command: + parameters = request.parameters or {} + else: + parameters = self._resolver.resolve( + request.parameters, + definitions=command.parameters, + upload=False, + ) + + return getattr(target, request.command)(**parameters) + + def _handle_invoke_success(self, request, output): + request.status = "SUCCESS" + request.output = self._format_output(output) + + def _handle_invoke_failure(self, request, exc): + self.logger.log( + getattr(exc, "_bg_error_log_level", logging.ERROR), + "Raised an exception while processing request %s: %s", + str(request), + exc, + exc_info=not getattr(exc, "_bg_suppress_stacktrace", False), + ) + request.status = "ERROR" + request.output = self._format_error_output(request, exc) + request.error_class = type(exc).__name__ + + @staticmethod + def _format_error_output(request, exc): + if request.is_json: + return parse_exception_as_json(exc) + else: + return str(exc) + + @staticmethod + def _format_output(output): + if isinstance(output, six.string_types): + return output + + try: + return json.dumps(output) + except (TypeError, ValueError): + return str(output) + + class RequestProcessor(object): """Class responsible for coordinating Request processing @@ -89,6 +211,7 @@ def on_message_received(self, message, headers): DiscardMessageException: The request failed to parse correctly RequestProcessException: Validation failures should raise a subclass of this """ + request = self._parse(message) for func in self._validation_funcs: @@ -116,6 +239,7 @@ def process_message(self, target, request, headers): Returns: None """ + request.status = "IN_PROGRESS" self._updater.update_request(request, headers) @@ -127,6 +251,7 @@ def process_message(self, target, request, headers): brewtils.plugin.request_context.current_request = request output = self._invoke_command(target, request, headers) + except Exception as exc: self._handle_invoke_failure(request, exc) else: diff --git a/brewtils/rest/client.py b/brewtils/rest/client.py index a0aedc38..81b415f1 100644 --- a/brewtils/rest/client.py +++ b/brewtils/rest/client.py @@ -543,6 +543,21 @@ def post_requests(self, payload, **kwargs): self.request_url, data=payload, headers=self.JSON_HEADERS, params=kwargs ) + @enable_auth + def put_request(self, payload): + # type: (str, **Any) -> Response + """Performs a PUT on the Request URL + + Args: + payload: Completed Request definition + + Returns: + Requests Response object + """ + return self.session.put( + self.request_url, data=payload, headers=self.JSON_HEADERS + ) + @enable_auth def patch_request(self, request_id, payload): # type: (str, str) -> Response diff --git a/brewtils/rest/easy_client.py b/brewtils/rest/easy_client.py index f842301f..575d728a 100644 --- a/brewtils/rest/easy_client.py +++ b/brewtils/rest/easy_client.py @@ -707,6 +707,20 @@ def update_request(self, request_id, status=None, output=None, error_class=None) request_id, SchemaParser.serialize_patch(operations, many=True) ) + @wrap_response( + parse_method="parse_request", parse_many=False, default_exc=SaveError + ) + def put_request(self, request): + """Creates or Updates Request with a completed requests + + Args: + request: Completed Request + Returns: + Request: The created Request + + """ + return self.client.put_request(SchemaParser.serialize_request(request)) + @wrap_response(return_boolean=True) def publish_event(self, *args, **kwargs): """Publish a new event diff --git a/brewtils/rest/publish_client.py b/brewtils/rest/publish_client.py index 536f6adf..194234d4 100644 --- a/brewtils/rest/publish_client.py +++ b/brewtils/rest/publish_client.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- import logging -from brewtils.errors import BrewtilsException import brewtils.plugin +from brewtils.errors import BrewtilsException from brewtils.models import Event, Events, Request from brewtils.rest.easy_client import EasyClient diff --git a/brewtils/rest/system_client.py b/brewtils/rest/system_client.py index 05fec6c3..d41cd78b 100644 --- a/brewtils/rest/system_client.py +++ b/brewtils/rest/system_client.py @@ -18,6 +18,7 @@ _deprecate, ) from brewtils.models import Request, System +from brewtils.request_handling import LocalRequestProcessor from brewtils.resolvers.manager import ResolutionManager from brewtils.rest.easy_client import EasyClient @@ -215,7 +216,7 @@ def __init__(self, *args, **kwargs): # Now need to determine if the intended target is the current running plugin. # Start by ensuring there's a valid Plugin context active - target_self = bool(brewtils.plugin.CONFIG) + self.target_self = bool(brewtils.plugin.CONFIG) # If ANY of the target specification arguments don't match the current plugin # then the target is different @@ -230,15 +231,16 @@ def __init__(self, *args, **kwargs): kwargs.get(key) is not None and kwargs.get(key) != brewtils.plugin.CONFIG[value] ): - target_self = False + self.target_self = False break # Now assign self._system_name, etc based on the value of target_self - if target_self: + if self.target_self: self._system_name = brewtils.plugin.CONFIG.name self._version_constraint = brewtils.plugin.CONFIG.version self._default_instance = brewtils.plugin.CONFIG.instance_name self._system_namespace = brewtils.plugin.CONFIG.namespace or "" + else: self._system_name = kwargs.get("system_name") self._version_constraint = kwargs.get("version_constraint", "latest") @@ -275,6 +277,10 @@ def __init__(self, *args, **kwargs): self._easy_client = EasyClient(*args, **kwargs) self._resolver = ResolutionManager(easy_client=self._easy_client) + self.local_request_handler = LocalRequestProcessor( + system=self._system, + easy_client=self._easy_client, + ) def __getattr__(self, item): # type: (str) -> partial @@ -401,9 +407,6 @@ def send_bg_request(self, *args, **kwargs): # check for a new version and retry try: request = self._construct_bg_request(**kwargs) - request = self._easy_client.create_request( - request, blocking=blocking, timeout=timeout - ) except ValidationError: if self._system and self._version_constraint == "latest": old_version = self._system.version @@ -414,16 +417,28 @@ def send_bg_request(self, *args, **kwargs): kwargs["_system_version"] = self._system.version return self.send_bg_request(**kwargs) raise + if not self.target_self: + request = self._easy_client.create_request( + request, blocking=blocking, timeout=timeout + ) # If not blocking just return the future if not blocking: - return self._thread_pool.submit( - self._wait_for_request, request, raise_on_error, timeout - ) + if not self.target_self: + return self._thread_pool.submit( + self._wait_for_request, request, raise_on_error, timeout + ) + else: + return self._thread_pool.submit( + self.local_request_handler.process_command, request + ) # Brew-view before 2.4 doesn't support the blocking flag, so make sure # the request is actually complete before returning - return self._wait_for_request(request, raise_on_error, timeout) + if not self.target_self: + return self._wait_for_request(request, raise_on_error, timeout) + + return self.local_request_handler.process_command(request) def load_bg_system(self): # type: () -> None diff --git a/test/request_handling_test.py b/test/request_handling_test.py index 14b9baef..4920bdb6 100644 --- a/test/request_handling_test.py +++ b/test/request_handling_test.py @@ -8,6 +8,7 @@ from mock import ANY, MagicMock, Mock from requests import ConnectionError as RequestsConnectionError +import brewtils.plugin from brewtils.errors import ( DiscardMessageException, ErrorLogLevelCritical, @@ -21,8 +22,12 @@ SuppressStacktrace, TooLargeError, ) -from brewtils.models import Request -from brewtils.request_handling import HTTPRequestUpdater, RequestProcessor +from brewtils.models import Command, Request, System +from brewtils.request_handling import ( + HTTPRequestUpdater, + LocalRequestProcessor, + RequestProcessor, +) from brewtils.schema_parser import SchemaParser from brewtils.test.comparable import assert_request_equal @@ -294,6 +299,18 @@ def __init__(self, foo): def test_format(self, processor, output, expected): assert processor._format_output(output) == expected + def test_process_children( + self, processor, target_mock, updater_mock, invoke_mock, format_mock + ): + request_mock = Mock() + + processor.process_message(target_mock, request_mock, {}) + invoke_mock.assert_called_once_with(target_mock, request_mock, {}) + format_mock.assert_called_once_with(invoke_mock.return_value) + assert updater_mock.update_request.call_count == 2 + assert request_mock.status == "SUCCESS" + assert request_mock.output == format_mock.return_value + class TestParse(object): def test_success(self, processor, bg_request): serialized = SchemaParser.serialize_request(bg_request) @@ -496,3 +513,61 @@ def test_create_connection_poll_thread(self, client): shutdown_event.set() updater.connection_poll_thread.join() assert not updater.connection_poll_thread.is_alive() + + +class TestLocalRequestProcessor(object): + @pytest.fixture + def client(self): + class ClientTest(object): + def command_one(self): + return True + + def command_two(self): + return False + + return ClientTest() + + @pytest.fixture + def system_client(self): + return System( + commands=[Command(name="command_one"), Command(name="command_two")] + ) + + @pytest.fixture + def resolver_mock(self): + def resolve(values, **_): + return values + + resolver = Mock() + resolver.resolve.side_effect = resolve + + return resolver + + @pytest.fixture + def local_request_processor(self, system_client, client, resolver_mock): + brewtils.plugin.CLIENT = client + + def return_input_side_effect(*args, **kwargs): + return args[0] + + _ez_client = Mock() + _ez_client.put_request.side_effect = return_input_side_effect + + return LocalRequestProcessor( + system=system_client, easy_client=_ez_client, resolver=resolver_mock + ) + + def setup_request_context(self): + brewtils.plugin.request_context = threading.local() + brewtils.plugin.request_context.current_request = None + + def test_process_command(self, local_request_processor): + self.setup_request_context() + brewtils.plugin.request_context.current_request = Request(id="1") + + assert local_request_processor.process_command( + Request(command="command_one", parameters={}) + ) + assert not local_request_processor.process_command( + Request(command="command_two", parameters={}) + ) diff --git a/test/rest/system_client_test.py b/test/rest/system_client_test.py index e0bb8c0f..3dafd8c1 100644 --- a/test/rest/system_client_test.py +++ b/test/rest/system_client_test.py @@ -355,11 +355,12 @@ def test_retry_send_different_version( client.load_bg_system() easy_client.find_systems.return_value = [bg_system_2] - easy_client.create_request.side_effect = [ValidationError, mock_success] + client._construct_bg_request = Mock(side_effect=[ValidationError, mock_success]) + easy_client.create_request.return_value = mock_success client.speak() assert client._system.version == "2.0.0" - assert easy_client.create_request.call_count == 2 + assert easy_client.create_request.call_count == 1 class TestExecuteNonBlocking(object):