Skip to content

Commit

Permalink
Max Concurrence Fix (#436)
Browse files Browse the repository at this point in the history
  • Loading branch information
TheBurchLog authored Dec 27, 2023
1 parent da172c0 commit 2f4b058
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 21 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,6 @@ target/

# Vim
*.swp

# VScode
.vscode/
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ TBD
but options like 'fanout' can now be supported
- Better handling of Pika errors
- Updated how AutoBrewtils maps functions, and will skip auto marking commands with annotations
- When SystemClient is self referencing to the Plugin, child requests will be generated
locally, then uploaded to Beer-Garden once the request is completed.
- Must upgrade to a minimum version of Beer Garden 3.23.0 to support new APIs


3.22.0
------
Expand Down
16 changes: 10 additions & 6 deletions brewtils/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class Plugin(object):
logging configuration.
worker_shutdown_timeout (int): Time to wait during shutdown to finish processing
max_concurrent (int): Maximum number of requests to process concurrently
max_concurrent (int): Maximum number of requests to process concurrently from RabbitMQ
max_attempts (int): Number of times to attempt updating of a Request
before giving up. Negative numbers are interpreted as no maximum.
max_timeout (int): Maximum amount of time to wait between Request update
Expand All @@ -189,7 +189,6 @@ class Plugin(object):
"""

def __init__(self, client=None, system=None, logger=None, **kwargs):
self._client = None
self._instance = None
self._admin_processor = None
self._request_processor = None
Expand All @@ -215,9 +214,13 @@ def __init__(self, client=None, system=None, logger=None, **kwargs):
# Now set up the system
self._system = self._setup_system(system, kwargs)

global CLIENT
# Make sure this is set after self._system
if client:
self.client = client
self._client = client
CLIENT = client
else:
self._client = None

# Now that the config is loaded we can create the EasyClient
self._ez_client = EasyClient(logger=self._logger, **self._config)
Expand Down Expand Up @@ -305,6 +308,7 @@ def client(self, new_client):
)

self._client = new_client
brewtils.plugin.CLIENT = new_client

@property
def system(self):
Expand Down Expand Up @@ -594,7 +598,7 @@ def _initialize_processors(self):
max_workers=1,
)
request_processor = RequestProcessor(
target=self._client,
target=CLIENT,
updater=updater,
consumer=request_consumer,
validation_funcs=[self._correct_system, self._is_running],
Expand Down Expand Up @@ -843,14 +847,14 @@ def _validate_system(self):
if not self._system.version:
raise ValidationError("Plugin system must have a version")

client_name = getattr(self._client, "_bg_name", None)
client_name = getattr(CLIENT, "_bg_name", None)
if client_name and client_name != self._system.name:
raise ValidationError(
"System name '%s' doesn't match name from client decorator: "
"@system(bg_name=%s)" % (self._system.name, client_name)
)

client_version = getattr(self._client, "_bg_version", None)
client_version = getattr(CLIENT, "_bg_version", None)
if client_version and client_version != self._system.version:
raise ValidationError(
"System version '%s' doesn't match version from client decorator: "
Expand Down
125 changes: 125 additions & 0 deletions brewtils/request_handling.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import abc
import copy
import json
import logging
import sys
Expand All @@ -21,9 +22,130 @@
parse_exception_as_json,
)
from brewtils.models import Request
from brewtils.resolvers.manager import ResolutionManager
from brewtils.schema_parser import SchemaParser


class LocalRequestProcessor(object):
""" """

def __init__(
self,
logger=None,
system=None,
easy_client=None,
resolver=None,
):
self.logger = logger or logging.getLogger(__name__)
self._system = system
self._resolver = resolver or ResolutionManager(easy_client=easy_client)
self._ez_client = easy_client

def process_command(self, request):
"""Process a command locally.
Will update the child request map with the generated Request object to be
sent to Beer Garden
Args:
command_name: the command to be processed
Returns:
Any
"""

parent_request = copy.deepcopy(brewtils.plugin.request_context.current_request)

request.parent = Request(id=str(parent_request.id))
request.has_parent = True

request.status = "IN_PROGRESS"

request = self._ez_client.put_request(request)

try:
output = self._invoke_command(brewtils.plugin.CLIENT, request)
except Exception as exc:
self._handle_invoke_failure(request, exc)
request = self._ez_client.put_request(request)
brewtils.plugin.request_context.current_request = parent_request
raise exc
else:
self._handle_invoke_success(request, output)
request = self._ez_client.put_request(request)
brewtils.plugin.request_context.current_request = parent_request
return output

def _invoke_command(self, target, request):
"""Invoke the function named in request.command
Args:
target: The object to search for the function implementation.
request: The request to process
Returns:
The output of the function call
Raises:
RequestProcessingError: The specified target does not define a
callable implementation of request.command
"""
if not callable(getattr(target, request.command, None)):
raise RequestProcessingError(
"Could not find an implementation of command '%s'" % request.command
)

# Get the command to use the parameter definitions when resolving
command = None
if self._system:
command = self._system.get_command_by_name(request.command)

# Now resolve parameters, if necessary
if request.is_ephemeral or not command:
parameters = request.parameters or {}
else:
parameters = self._resolver.resolve(
request.parameters,
definitions=command.parameters,
upload=False,
)

return getattr(target, request.command)(**parameters)

def _handle_invoke_success(self, request, output):
request.status = "SUCCESS"
request.output = self._format_output(output)

def _handle_invoke_failure(self, request, exc):
self.logger.log(
getattr(exc, "_bg_error_log_level", logging.ERROR),
"Raised an exception while processing request %s: %s",
str(request),
exc,
exc_info=not getattr(exc, "_bg_suppress_stacktrace", False),
)
request.status = "ERROR"
request.output = self._format_error_output(request, exc)
request.error_class = type(exc).__name__

@staticmethod
def _format_error_output(request, exc):
if request.is_json:
return parse_exception_as_json(exc)
else:
return str(exc)

@staticmethod
def _format_output(output):
if isinstance(output, six.string_types):
return output

try:
return json.dumps(output)
except (TypeError, ValueError):
return str(output)


class RequestProcessor(object):
"""Class responsible for coordinating Request processing
Expand Down Expand Up @@ -89,6 +211,7 @@ def on_message_received(self, message, headers):
DiscardMessageException: The request failed to parse correctly
RequestProcessException: Validation failures should raise a subclass of this
"""

request = self._parse(message)

for func in self._validation_funcs:
Expand Down Expand Up @@ -116,6 +239,7 @@ def process_message(self, target, request, headers):
Returns:
None
"""

request.status = "IN_PROGRESS"
self._updater.update_request(request, headers)

Expand All @@ -127,6 +251,7 @@ def process_message(self, target, request, headers):
brewtils.plugin.request_context.current_request = request

output = self._invoke_command(target, request, headers)

except Exception as exc:
self._handle_invoke_failure(request, exc)
else:
Expand Down
15 changes: 15 additions & 0 deletions brewtils/rest/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,21 @@ def post_requests(self, payload, **kwargs):
self.request_url, data=payload, headers=self.JSON_HEADERS, params=kwargs
)

@enable_auth
def put_request(self, payload):
# type: (str, **Any) -> Response
"""Performs a PUT on the Request URL
Args:
payload: Completed Request definition
Returns:
Requests Response object
"""
return self.session.put(
self.request_url, data=payload, headers=self.JSON_HEADERS
)

@enable_auth
def patch_request(self, request_id, payload):
# type: (str, str) -> Response
Expand Down
14 changes: 14 additions & 0 deletions brewtils/rest/easy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,20 @@ def update_request(self, request_id, status=None, output=None, error_class=None)
request_id, SchemaParser.serialize_patch(operations, many=True)
)

@wrap_response(
parse_method="parse_request", parse_many=False, default_exc=SaveError
)
def put_request(self, request):
"""Creates or Updates Request with a completed requests
Args:
request: Completed Request
Returns:
Request: The created Request
"""
return self.client.put_request(SchemaParser.serialize_request(request))

@wrap_response(return_boolean=True)
def publish_event(self, *args, **kwargs):
"""Publish a new event
Expand Down
2 changes: 1 addition & 1 deletion brewtils/rest/publish_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-
import logging

from brewtils.errors import BrewtilsException
import brewtils.plugin
from brewtils.errors import BrewtilsException
from brewtils.models import Event, Events, Request
from brewtils.rest.easy_client import EasyClient

Expand Down
35 changes: 25 additions & 10 deletions brewtils/rest/system_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
_deprecate,
)
from brewtils.models import Request, System
from brewtils.request_handling import LocalRequestProcessor
from brewtils.resolvers.manager import ResolutionManager
from brewtils.rest.easy_client import EasyClient

Expand Down Expand Up @@ -215,7 +216,7 @@ def __init__(self, *args, **kwargs):

# Now need to determine if the intended target is the current running plugin.
# Start by ensuring there's a valid Plugin context active
target_self = bool(brewtils.plugin.CONFIG)
self.target_self = bool(brewtils.plugin.CONFIG)

# If ANY of the target specification arguments don't match the current plugin
# then the target is different
Expand All @@ -230,15 +231,16 @@ def __init__(self, *args, **kwargs):
kwargs.get(key) is not None
and kwargs.get(key) != brewtils.plugin.CONFIG[value]
):
target_self = False
self.target_self = False
break

# Now assign self._system_name, etc based on the value of target_self
if target_self:
if self.target_self:
self._system_name = brewtils.plugin.CONFIG.name
self._version_constraint = brewtils.plugin.CONFIG.version
self._default_instance = brewtils.plugin.CONFIG.instance_name
self._system_namespace = brewtils.plugin.CONFIG.namespace or ""

else:
self._system_name = kwargs.get("system_name")
self._version_constraint = kwargs.get("version_constraint", "latest")
Expand Down Expand Up @@ -275,6 +277,10 @@ def __init__(self, *args, **kwargs):

self._easy_client = EasyClient(*args, **kwargs)
self._resolver = ResolutionManager(easy_client=self._easy_client)
self.local_request_handler = LocalRequestProcessor(
system=self._system,
easy_client=self._easy_client,
)

def __getattr__(self, item):
# type: (str) -> partial
Expand Down Expand Up @@ -401,9 +407,6 @@ def send_bg_request(self, *args, **kwargs):
# check for a new version and retry
try:
request = self._construct_bg_request(**kwargs)
request = self._easy_client.create_request(
request, blocking=blocking, timeout=timeout
)
except ValidationError:
if self._system and self._version_constraint == "latest":
old_version = self._system.version
Expand All @@ -414,16 +417,28 @@ def send_bg_request(self, *args, **kwargs):
kwargs["_system_version"] = self._system.version
return self.send_bg_request(**kwargs)
raise
if not self.target_self:
request = self._easy_client.create_request(
request, blocking=blocking, timeout=timeout
)

# If not blocking just return the future
if not blocking:
return self._thread_pool.submit(
self._wait_for_request, request, raise_on_error, timeout
)
if not self.target_self:
return self._thread_pool.submit(
self._wait_for_request, request, raise_on_error, timeout
)
else:
return self._thread_pool.submit(
self.local_request_handler.process_command, request
)

# Brew-view before 2.4 doesn't support the blocking flag, so make sure
# the request is actually complete before returning
return self._wait_for_request(request, raise_on_error, timeout)
if not self.target_self:
return self._wait_for_request(request, raise_on_error, timeout)

return self.local_request_handler.process_command(request)

def load_bg_system(self):
# type: () -> None
Expand Down
Loading

0 comments on commit 2f4b058

Please sign in to comment.