Skip to content

Commit

Permalink
Refactor log handler and support TCP/UDP communications
Browse files Browse the repository at this point in the history
  • Loading branch information
jangsutsr committed Apr 25, 2017
1 parent 1f99a0d commit 8d2ee8c
Show file tree
Hide file tree
Showing 9 changed files with 446 additions and 206 deletions.
23 changes: 22 additions & 1 deletion awx/main/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,30 @@ def _load_default_license_from_file():
'LOG_AGGREGATOR_TOWER_UUID',
field_class=fields.CharField,
allow_blank=True,
default='',
label=_('Cluster-wide Tower unique identifier.'),
help_text=_('Useful to uniquely identify Tower instances.'),
category=_('Logging'),
category_slug='logging',
default='',
)
register(
'LOG_AGGREGATOR_PROTOCOL',
field_class=fields.ChoiceField,
choices=['https', 'tcp', 'udp'],
default='https',
label=_('Logging Aggregator Protocol'),
help_text=_('Protocol used to communicate with log aggregator.'),
category=_('Logging'),
category_slug='logging',
)
register(
'LOG_AGGREGATOR_TCP_TIMEOUT',
field_class=fields.IntegerField,
default=5,
label=_('TCP Connection Timeout'),
help_text=_('Number of seconds for a TCP connection to external log '
'aggregator to timeout. Applies to HTTPS and TCP log '
'aggregator protocols.'),
category=_('Logging'),
category_slug='logging',
)
4 changes: 3 additions & 1 deletion awx/main/tests/functional/api/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ def test_logging_aggregrator_connection_test_valid(mocker, get, post, admin):
('LOG_AGGREGATOR_LOGGERS', ['awx', 'activity_stream', 'job_events', 'system_tracking']),
('LOG_AGGREGATOR_INDIVIDUAL_FACTS', False),
('LOG_AGGREGATOR_ENABLED', False),
('LOG_AGGREGATOR_TOWER_UUID', '')
('LOG_AGGREGATOR_TOWER_UUID', ''),
('LOG_AGGREGATOR_PROTOCOL', 'https'),
('LOG_AGGREGATOR_TCP_TIMEOUT', 5),
]))


Expand Down
287 changes: 189 additions & 98 deletions awx/main/tests/unit/utils/test_handlers.py

Large diffs are not rendered by default.

264 changes: 169 additions & 95 deletions awx/main/utils/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import requests
import time
import urlparse
import socket
import select
import six
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from requests.exceptions import RequestException
Expand All @@ -20,7 +23,8 @@
from awx.main.utils.formatters import LogstashFormatter


__all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'configure_external_logger']
__all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'TCPHandler', 'UDPHandler',
'configure_external_logger']

logger = logging.getLogger('awx.main.utils.handlers')

Expand All @@ -39,7 +43,7 @@
'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS',
'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS',
'enabled_flag': 'LOG_AGGREGATOR_ENABLED',
'http_timeout': 'LOG_AGGREGATOR_HTTP_TIMEOUT',
'tcp_timeout': 'LOG_AGGREGATOR_TCP_TIMEOUT',
}


Expand Down Expand Up @@ -84,23 +88,125 @@ def _wrapped(*args, **kwargs):
**kwargs)


class BaseHTTPSHandler(logging.Handler):
def __init__(self, fqdn=False, **kwargs):
super(BaseHTTPSHandler, self).__init__()
self.fqdn = fqdn
class BaseHandler(logging.Handler):
def __init__(self, **kwargs):
super(BaseHandler, self).__init__()
for fd in PARAM_NAMES:
setattr(self, fd, kwargs.get(fd, None))
self.session = FuturesSession(executor=VerboseThreadPoolExecutor(
max_workers=2 # this is the default used by requests_futures
))
self.add_auth_information()

@classmethod
def from_django_settings(cls, settings, *args, **kwargs):
for param, django_setting_name in PARAM_NAMES.items():
kwargs[param] = getattr(settings, django_setting_name, None)
return cls(*args, **kwargs)

def get_full_message(self, record):
if record.exc_info:
return '\n'.join(traceback.format_exception(*record.exc_info))
else:
return record.getMessage()

def _send(self, payload):
"""Actually send message to log aggregator.
"""
return payload

def _send_and_queue_system_tracking(self, payload_data):
# Special action for System Tracking, queue up multiple log messages
ret = []
module_name = payload_data['module_name']
if module_name in ['services', 'packages', 'files']:
facts_dict = payload_data.pop(module_name)
for key in facts_dict:
fact_payload = copy(payload_data)
fact_payload.update(facts_dict[key])
ret.append(self._send(fact_payload))
return ret

def _format_and_send_record(self, record):
ret = []
payload = self.format(record)
if self.indv_facts:
payload_data = json.loads(payload)
if record.name.startswith('awx.analytics.system_tracking'):
ret.extend(self._send_and_queue_system_tracking(payload_data))
if len(ret) == 0:
ret.append(self._send(payload))
return ret

def _skip_log(self, logger_name):
if self.host == '' or (not self.enabled_flag):
return True
# Don't send handler-related records.
if logger_name == logger.name:
return True
# Tower log emission is only turned off by enablement setting
if not logger_name.startswith('awx.analytics'):
return False
return self.enabled_loggers is None or logger_name[len('awx.analytics.'):] not in self.enabled_loggers

def emit(self, record):
"""
Emit a log record. Returns a list of zero or more
implementation-specific objects for tests.
"""
if self._skip_log(record.name):
return []
try:
return self._format_and_send_record(record)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)

def _get_host(self, scheme='', hostname_only=False):
"""Return the host name of log aggregator.
"""
host = self.host or ''
# urlparse requires '//' to be provided if scheme is not specified
if not urlparse.urlsplit(host).scheme and not host.startswith('//'):
host = '%s://%s' % (scheme, host) if scheme else '//%s' % host
parsed = urlparse.urlsplit(host)

if hostname_only:
return parsed.hostname

try:
port = parsed.port or self.port
except ValueError:
port = self.port
netloc = parsed.netloc if port is None else '%s:%s' % (parsed.hostname, port)

url_components = list(parsed)
url_components[1] = netloc
ret = urlparse.urlunsplit(url_components)
return ret.lstrip('/')


class BaseHTTPSHandler(BaseHandler):
def _add_auth_information(self):
if self.message_type == 'logstash':
if not self.username:
# Logstash authentication not enabled
return
logstash_auth = requests.auth.HTTPBasicAuth(self.username, self.password)
self.session.auth = logstash_auth
elif self.message_type == 'splunk':
auth_header = "Splunk %s" % self.password
headers = {
"Authorization": auth_header,
"Content-Type": "application/json"
}
self.session.headers.update(headers)

def __init__(self, fqdn=False, **kwargs):
self.fqdn = fqdn
super(BaseHTTPSHandler, self).__init__(**kwargs)
self.session = FuturesSession(executor=VerboseThreadPoolExecutor(
max_workers=2 # this is the default used by requests_futures
))
self._add_auth_information()

@classmethod
def perform_test(cls, settings):
"""
Expand All @@ -126,47 +232,7 @@ def perform_test(cls, settings):
except RequestException as e:
raise LoggingConnectivityException(str(e))

def get_full_message(self, record):
if record.exc_info:
return '\n'.join(traceback.format_exception(*record.exc_info))
else:
return record.getMessage()

def add_auth_information(self):
if self.message_type == 'logstash':
if not self.username:
# Logstash authentication not enabled
return
logstash_auth = requests.auth.HTTPBasicAuth(self.username, self.password)
self.session.auth = logstash_auth
elif self.message_type == 'splunk':
auth_header = "Splunk %s" % self.password
headers = {
"Authorization": auth_header,
"Content-Type": "application/json"
}
self.session.headers.update(headers)

def get_http_host(self):
host = self.host or ''
# urlparse requires scheme to be provided, default to use http if
# missing
if not urlparse.urlsplit(host).scheme:
host = 'http://%s' % host
parsed = urlparse.urlsplit(host)
# Insert self.port if its special and port number is either not
# given in host or given as non-numerical
try:
port = parsed.port or self.port
except ValueError:
port = self.port
if port not in (80, None):
new_netloc = '%s:%s' % (parsed.hostname, port)
return urlparse.urlunsplit((parsed.scheme, new_netloc, parsed.path,
parsed.query, parsed.fragment))
return host

def get_post_kwargs(self, payload_input):
def _get_post_kwargs(self, payload_input):
if self.message_type == 'splunk':
# Splunk needs data nested under key "event"
if not isinstance(payload_input, dict):
Expand All @@ -177,56 +243,64 @@ def get_post_kwargs(self, payload_input):
else:
payload_str = payload_input
return dict(data=payload_str, background_callback=unused_callback,
timeout=self.http_timeout)
timeout=self.tcp_timeout)

def skip_log(self, logger_name):
if self.host == '' or (not self.enabled_flag):
return True
if not logger_name.startswith('awx.analytics'):
# Tower log emission is only turned off by enablement setting
return False
return self.enabled_loggers is None or logger_name[len('awx.analytics.'):] not in self.enabled_loggers

def emit(self, record):
"""
Emit a log record. Returns a list of zero or more
``concurrent.futures.Future`` objects.
See:
def _send(self, payload):
"""See:
https://docs.python.org/3/library/concurrent.futures.html#future-objects
http://pythonhosted.org/futures/
"""
if self.skip_log(record.name):
return []
return self.session.post(self._get_host(scheme='http'),
**self._get_post_kwargs(payload))


def _encode_payload_for_socket(payload):
encoded_payload = payload
if isinstance(encoded_payload, dict):
encoded_payload = json.dumps(encoded_payload, ensure_ascii=False)
if isinstance(encoded_payload, six.text_type):
encoded_payload = encoded_payload.encode('utf-8')
return encoded_payload


class TCPHandler(BaseHandler):
def _send(self, payload):
payload = _encode_payload_for_socket(payload)
sok = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
payload = self.format(record)

# Special action for System Tracking, queue up multiple log messages
if self.indv_facts:
payload_data = json.loads(payload)
if record.name.startswith('awx.analytics.system_tracking'):
module_name = payload_data['module_name']
if module_name in ['services', 'packages', 'files']:
facts_dict = payload_data.pop(module_name)
async_futures = []
for key in facts_dict:
fact_payload = copy(payload_data)
fact_payload.update(facts_dict[key])
async_futures.append(self._send(fact_payload))
return async_futures

return [self._send(payload)]
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
sok.connect((self._get_host(hostname_only=True), self.port or 0))
sok.setblocking(0)
_, ready_to_send, _ = select.select([], [sok], [], float(self.tcp_timeout))
if len(ready_to_send) == 0:
logger.warning("Socket currently busy, failed to send message")
sok.close()
return
sok.send(payload)
except Exception as e:
logger.exception("Error sending message from %s: %s" %
(TCPHandler.__name__, e.message))
sok.close()


class UDPHandler(BaseHandler):
def __init__(self, **kwargs):
super(UDPHandler, self).__init__(**kwargs)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

def _send(self, payload):
return self.session.post(self.get_http_host(),
**self.get_post_kwargs(payload))
payload = _encode_payload_for_socket(payload)
return self.socket.sendto(payload, (self._get_host(hostname_only=True), self.port or 0))


def add_or_remove_logger(address, instance):
HANDLER_MAPPING = {
'https': BaseHTTPSHandler,
'tcp': TCPHandler,
'udp': UDPHandler,
}


def _add_or_remove_logger(address, instance):
specific_logger = logging.getLogger(address)
for i, handler in enumerate(specific_logger.handlers):
if isinstance(handler, (HTTPSNullHandler, BaseHTTPSHandler)):
Expand All @@ -238,19 +312,19 @@ def add_or_remove_logger(address, instance):


def configure_external_logger(settings_module, is_startup=True):

is_enabled = settings_module.LOG_AGGREGATOR_ENABLED
if is_startup and (not is_enabled):
# Pass-through if external logging not being used
return

instance = None
if is_enabled:
instance = BaseHTTPSHandler.from_django_settings(settings_module)
handler_class = HANDLER_MAPPING[settings_module.LOG_AGGREGATOR_PROTOCOL]
instance = handler_class.from_django_settings(settings_module)
instance.setFormatter(LogstashFormatter(settings_module=settings_module))
awx_logger_instance = instance
if is_enabled and 'awx' not in settings_module.LOG_AGGREGATOR_LOGGERS:
awx_logger_instance = None

add_or_remove_logger('awx.analytics', instance)
add_or_remove_logger('awx', awx_logger_instance)
_add_or_remove_logger('awx.analytics', instance)
_add_or_remove_logger('awx', awx_logger_instance)
2 changes: 1 addition & 1 deletion awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ def IS_TESTING(argv=None):
TOWER_SETTINGS_MANIFEST = {}

LOG_AGGREGATOR_ENABLED = False
LOG_AGGREGATOR_HTTP_TIMEOUT = 5
LOG_AGGREGATOR_TCP_TIMEOUT = 5

# The number of retry attempts for websocket session establishment
# If you're encountering issues establishing websockets in clustered Tower,
Expand Down
Loading

0 comments on commit 8d2ee8c

Please sign in to comment.