diff --git a/awx/main/conf.py b/awx/main/conf.py index 674c7fabee56..7610a8fa0cb3 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -320,9 +320,30 @@ def _load_default_license_from_file(): 'LOG_AGGREGATOR_TOWER_UUID', field_class=fields.CharField, allow_blank=True, + default='', label=_('Cluster-wide Tower unique identifier.'), help_text=_('Useful to uniquely identify Tower instances.'), category=_('Logging'), category_slug='logging', - default='', +) +register( + 'LOG_AGGREGATOR_PROTOCOL', + field_class=fields.ChoiceField, + choices=['https', 'tcp', 'udp'], + default='https', + label=_('Logging Aggregator Protocol'), + help_text=_('Protocol used to communicate with log aggregator.'), + category=_('Logging'), + category_slug='logging', +) +register( + 'LOG_AGGREGATOR_TCP_TIMEOUT', + field_class=fields.IntegerField, + default=5, + label=_('TCP Connection Timeout'), + help_text=_('Number of seconds for a TCP connection to external log ' + 'aggregator to timeout. Applies to HTTPS and TCP log ' + 'aggregator protocols.'), + category=_('Logging'), + category_slug='logging', ) diff --git a/awx/main/tests/functional/api/test_settings.py b/awx/main/tests/functional/api/test_settings.py index f10daf5162b2..b483c5223b3d 100644 --- a/awx/main/tests/functional/api/test_settings.py +++ b/awx/main/tests/functional/api/test_settings.py @@ -225,7 +225,9 @@ def test_logging_aggregrator_connection_test_valid(mocker, get, post, admin): ('LOG_AGGREGATOR_LOGGERS', ['awx', 'activity_stream', 'job_events', 'system_tracking']), ('LOG_AGGREGATOR_INDIVIDUAL_FACTS', False), ('LOG_AGGREGATOR_ENABLED', False), - ('LOG_AGGREGATOR_TOWER_UUID', '') + ('LOG_AGGREGATOR_TOWER_UUID', ''), + ('LOG_AGGREGATOR_PROTOCOL', 'https'), + ('LOG_AGGREGATOR_TCP_TIMEOUT', 5), ])) diff --git a/awx/main/tests/unit/utils/test_handlers.py b/awx/main/tests/unit/utils/test_handlers.py index e398e72ff839..055e2d3286cd 100644 --- a/awx/main/tests/unit/utils/test_handlers.py +++ b/awx/main/tests/unit/utils/test_handlers.py @@ -1,16 +1,21 @@ +# -*- coding: utf-8 -*- import base64 import cStringIO import json import logging +import socket from uuid import uuid4 +import mock + from django.conf import settings from django.conf import LazySettings import pytest import requests from requests_futures.sessions import FuturesSession -from awx.main.utils.handlers import (BaseHTTPSHandler as HTTPSHandler, +from awx.main.utils.handlers import (BaseHandler, BaseHTTPSHandler as HTTPSHandler, + TCPHandler, UDPHandler, _encode_payload_for_socket, PARAM_NAMES, LoggingConnectivityException) from awx.main.utils.formatters import LogstashFormatter @@ -57,6 +62,16 @@ def send(self, request, **kwargs): return ConnectionErrorAdapter() +@pytest.fixture +def fake_socket(tmpdir_factory, request): + sok = socket._socketobject + sok.send = mock.MagicMock() + sok.connect = mock.MagicMock() + sok.setblocking = mock.MagicMock() + sok.close = mock.MagicMock() + return sok + + def test_https_logging_handler_requests_async_implementation(): handler = HTTPSHandler() assert isinstance(handler.session, FuturesSession) @@ -64,31 +79,121 @@ def test_https_logging_handler_requests_async_implementation(): def test_https_logging_handler_has_default_http_timeout(): handler = HTTPSHandler.from_django_settings(settings) - assert handler.http_timeout == 5 + assert handler.tcp_timeout == 5 @pytest.mark.parametrize('param', PARAM_NAMES.keys()) -def test_https_logging_handler_defaults(param): - handler = HTTPSHandler() +def test_base_logging_handler_defaults(param): + handler = BaseHandler() assert hasattr(handler, param) and getattr(handler, param) is None @pytest.mark.parametrize('param', PARAM_NAMES.keys()) -def test_https_logging_handler_kwargs(param): - handler = HTTPSHandler(**{param: 'EXAMPLE'}) +def test_base_logging_handler_kwargs(param): + handler = BaseHandler(**{param: 'EXAMPLE'}) assert hasattr(handler, param) and getattr(handler, param) == 'EXAMPLE' @pytest.mark.parametrize('param, django_settings_name', PARAM_NAMES.items()) -def test_https_logging_handler_from_django_settings(param, django_settings_name): +def test_base_logging_handler_from_django_settings(param, django_settings_name): settings = LazySettings() settings.configure(**{ django_settings_name: 'EXAMPLE' }) - handler = HTTPSHandler.from_django_settings(settings) + handler = BaseHandler.from_django_settings(settings) assert hasattr(handler, param) and getattr(handler, param) == 'EXAMPLE' +@pytest.mark.parametrize('params, logger_name, expected', [ + # skip all records if enabled_flag = False + ({'enabled_flag': False}, 'awx.main', True), + # skip all records if the host is undefined + ({'host': '', 'enabled_flag': True}, 'awx.main', True), + # skip all records if underlying logger is used by handlers themselves + ({'host': '127.0.0.1', 'enabled_flag': True}, 'awx.main.utils.handlers', True), + ({'host': '127.0.0.1', 'enabled_flag': True}, 'awx.main', False), + ({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['abc']}, 'awx.analytics.xyz', True), + ({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['xyz']}, 'awx.analytics.xyz', False), +]) +def test_base_logging_handler_skip_log(params, logger_name, expected): + handler = BaseHandler(**params) + assert handler._skip_log(logger_name) is expected + + +def test_base_logging_handler_emit(dummy_log_record): + handler = BaseHandler(host='127.0.0.1', enabled_flag=True, + message_type='logstash', + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) + handler.setFormatter(LogstashFormatter()) + sent_payloads = handler.emit(dummy_log_record) + + assert len(sent_payloads) == 1 + body = json.loads(sent_payloads[0]) + + assert body['level'] == 'INFO' + assert body['logger_name'] == 'awx' + assert body['message'] == 'User joe logged in' + + +def test_base_logging_handler_emit_one_record_per_fact(): + handler = BaseHandler(host='127.0.0.1', enabled_flag=True, + message_type='logstash', indv_facts=True, + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) + handler.setFormatter(LogstashFormatter()) + record = logging.LogRecord( + 'awx.analytics.system_tracking', # logger name + 20, # loglevel INFO + './awx/some/module.py', # pathname + 100, # lineno + None, # msg + tuple(), # args, + None # exc_info + ) + record.module_name = 'packages' + record.facts_data = [{ + "name": "ansible", + "version": "2.2.1.0" + }, { + "name": "ansible-tower", + "version": "3.1.0" + }] + sent_payloads = handler.emit(record) + + assert len(sent_payloads) == 2 + sent_payloads.sort(key=lambda payload: payload['version']) + + assert sent_payloads[0]['level'] == 'INFO' + assert sent_payloads[0]['logger_name'] == 'awx.analytics.system_tracking' + assert sent_payloads[0]['name'] == 'ansible' + assert sent_payloads[0]['version'] == '2.2.1.0' + assert sent_payloads[1]['level'] == 'INFO' + assert sent_payloads[1]['logger_name'] == 'awx.analytics.system_tracking' + assert sent_payloads[1]['name'] == 'ansible-tower' + assert sent_payloads[1]['version'] == '3.1.0' + + +@pytest.mark.parametrize('host, port, normalized, hostname_only', [ + ('localhost', None, 'http://localhost', False), + ('localhost', 8080, 'http://localhost:8080', False), + ('http://localhost', None, 'http://localhost', False), + ('http://localhost', 8080, 'http://localhost:8080', False), + ('https://localhost', 443, 'https://localhost:443', False), + ('ftp://localhost', 443, 'ftp://localhost:443', False), + ('https://localhost:550', 443, 'https://localhost:550', False), + ('https://localhost:yoho/foobar', 443, 'https://localhost:443/foobar', False), + ('https://localhost:yoho/foobar', None, 'https://localhost:yoho/foobar', False), + ('http://splunk.server:8088/services/collector/event', 80, + 'http://splunk.server:8088/services/collector/event', False), + ('http://splunk.server/services/collector/event', 8088, + 'http://splunk.server:8088/services/collector/event', False), + ('localhost', 4399, 'localhost', True), + ('tcp://localhost:4399/foo/bar', 4399, 'localhost', True), +]) +def test_base_logging_handler_host_format(host, port, normalized, hostname_only): + handler = BaseHandler(host=host, port=port) + assert handler._get_host(scheme='http', hostname_only=hostname_only) == normalized + + @pytest.mark.parametrize( 'status, reason, exc', [(200, '200 OK', None), (404, 'Not Found', LoggingConnectivityException)] @@ -127,7 +232,7 @@ def emit(self, record): def test_https_logging_handler_logstash_auth_info(): handler = HTTPSHandler(message_type='logstash', username='bob', password='ansible') - handler.add_auth_information() + handler._add_auth_information() assert isinstance(handler.session.auth, requests.auth.HTTPBasicAuth) assert handler.session.auth.username == 'bob' assert handler.session.auth.password == 'ansible' @@ -135,47 +240,11 @@ def test_https_logging_handler_logstash_auth_info(): def test_https_logging_handler_splunk_auth_info(): handler = HTTPSHandler(message_type='splunk', password='ansible') - handler.add_auth_information() + handler._add_auth_information() assert handler.session.headers['Authorization'] == 'Splunk ansible' assert handler.session.headers['Content-Type'] == 'application/json' -@pytest.mark.parametrize('host, port, normalized', [ - ('localhost', None, 'http://localhost'), - ('localhost', 80, 'http://localhost'), - ('localhost', 8080, 'http://localhost:8080'), - ('http://localhost', None, 'http://localhost'), - ('http://localhost', 80, 'http://localhost'), - ('http://localhost', 8080, 'http://localhost:8080'), - ('https://localhost', 443, 'https://localhost:443'), - ('ftp://localhost', 443, 'ftp://localhost:443'), - ('https://localhost:550', 443, 'https://localhost:550'), - ('https://localhost:yoho/foobar', 443, 'https://localhost:443/foobar'), - ('https://localhost:yoho/foobar', None, 'https://localhost:yoho/foobar'), - ('http://splunk.server:8088/services/collector/event', 80, - 'http://splunk.server:8088/services/collector/event'), - ('http://splunk.server/services/collector/event', 80, - 'http://splunk.server/services/collector/event'), - ('http://splunk.server/services/collector/event', 8088, - 'http://splunk.server:8088/services/collector/event'), -]) -def test_https_logging_handler_http_host_format(host, port, normalized): - handler = HTTPSHandler(host=host, port=port) - assert handler.get_http_host() == normalized - - -@pytest.mark.parametrize('params, logger_name, expected', [ - ({'enabled_flag': False}, 'awx.main', True), # skip all records if enabled_flag = False - ({'host': '', 'enabled_flag': True}, 'awx.main', True), # skip all records if the host is undefined - ({'host': '127.0.0.1', 'enabled_flag': True}, 'awx.main', False), - ({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['abc']}, 'awx.analytics.xyz', True), - ({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['xyz']}, 'awx.analytics.xyz', False), -]) -def test_https_logging_handler_skip_log(params, logger_name, expected): - handler = HTTPSHandler(**params) - assert handler.skip_log(logger_name) is expected - - def test_https_logging_handler_connection_error(connection_error_adapter, dummy_log_record): handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, @@ -204,8 +273,8 @@ def test_https_logging_handler_connection_error(connection_error_adapter, @pytest.mark.parametrize('message_type', ['logstash', 'splunk']) -def test_https_logging_handler_emit(http_adapter, dummy_log_record, - message_type): +def test_https_logging_handler_emit_without_cred(http_adapter, dummy_log_record, + message_type): handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, message_type=message_type, enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) @@ -218,21 +287,14 @@ def test_https_logging_handler_emit(http_adapter, dummy_log_record, request = http_adapter.requests[0] assert request.url == 'http://127.0.0.1/' assert request.method == 'POST' - body = json.loads(request.body) if message_type == 'logstash': # A username + password weren't used, so this header should be missing assert 'Authorization' not in request.headers if message_type == 'splunk': - # splunk messages are nested under the 'event' key - body = body['event'] assert request.headers['Authorization'] == 'Splunk None' - assert body['level'] == 'INFO' - assert body['logger_name'] == 'awx' - assert body['message'] == 'User joe logged in' - def test_https_logging_handler_emit_logstash_with_creds(http_adapter, dummy_log_record): @@ -265,49 +327,78 @@ def test_https_logging_handler_emit_splunk_with_creds(http_adapter, assert request.headers['Authorization'] == 'Splunk pass' -def test_https_logging_handler_emit_one_record_per_fact(http_adapter): - handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, - message_type='logstash', indv_facts=True, - enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) - handler.setFormatter(LogstashFormatter()) - handler.session.mount('http://', http_adapter) - record = logging.LogRecord( - 'awx.analytics.system_tracking', # logger name - 20, # loglevel INFO - './awx/some/module.py', # pathname - 100, # lineno - None, # msg - tuple(), # args, - None # exc_info - ) - record.module_name = 'packages' - record.facts_data = [{ - "name": "ansible", - "version": "2.2.1.0" - }, { - "name": "ansible-tower", - "version": "3.1.0" - }] - async_futures = handler.emit(record) - [future.result() for future in async_futures] +@pytest.mark.parametrize('payload, encoded_payload', [ + ('foobar', 'foobar'), + ({'foo': 'bar'}, '{"foo": "bar"}'), + ({u'测试键': u'测试值'}, '{"测试键": "测试值"}'), +]) +def test_encode_payload_for_socket(payload, encoded_payload): + assert _encode_payload_for_socket(payload) == encoded_payload - assert len(http_adapter.requests) == 2 - requests = sorted(http_adapter.requests, key=lambda request: json.loads(request.body)['version']) - request = requests[0] - assert request.url == 'http://127.0.0.1/' - assert request.method == 'POST' - body = json.loads(request.body) - assert body['level'] == 'INFO' - assert body['logger_name'] == 'awx.analytics.system_tracking' - assert body['name'] == 'ansible' - assert body['version'] == '2.2.1.0' +def test_udp_handler_create_socket_at_init(): + handler = UDPHandler(host='127.0.0.1', port=4399, + enabled_flag=True, message_type='splunk', + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) + assert hasattr(handler, 'socket') + assert isinstance(handler.socket, socket.socket) + assert handler.socket.family == socket.AF_INET + assert handler.socket.type == socket.SOCK_DGRAM - request = requests[1] - assert request.url == 'http://127.0.0.1/' - assert request.method == 'POST' - body = json.loads(request.body) - assert body['level'] == 'INFO' - assert body['logger_name'] == 'awx.analytics.system_tracking' - assert body['name'] == 'ansible-tower' - assert body['version'] == '3.1.0' + +def test_udp_handler_send(dummy_log_record): + handler = UDPHandler(host='127.0.0.1', port=4399, + enabled_flag=True, message_type='splunk', + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) + handler.setFormatter(LogstashFormatter()) + with mock.patch('awx.main.utils.handlers._encode_payload_for_socket', return_value="des") as encode_mock,\ + mock.patch.object(handler, 'socket') as socket_mock: + handler.emit(dummy_log_record) + encode_mock.assert_called_once_with(handler.format(dummy_log_record)) + socket_mock.sendto.assert_called_once_with("des", ('127.0.0.1', 4399)) + + +def test_tcp_handler_send(fake_socket, dummy_log_record): + handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5, + enabled_flag=True, message_type='splunk', + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) + handler.setFormatter(LogstashFormatter()) + with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\ + mock.patch('select.select', return_value=([], [fake_socket], [])): + handler.emit(dummy_log_record) + sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM) + fake_socket.connect.assert_called_once_with(('127.0.0.1', 4399)) + fake_socket.setblocking.assert_called_once_with(0) + fake_socket.send.assert_called_once_with(handler.format(dummy_log_record)) + fake_socket.close.assert_called_once() + + +def test_tcp_handler_return_if_socket_unavailable(fake_socket, dummy_log_record): + handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5, + enabled_flag=True, message_type='splunk', + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) + handler.setFormatter(LogstashFormatter()) + with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\ + mock.patch('select.select', return_value=([], [], [])): + handler.emit(dummy_log_record) + sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM) + fake_socket.connect.assert_called_once_with(('127.0.0.1', 4399)) + fake_socket.setblocking.assert_called_once_with(0) + assert not fake_socket.send.called + fake_socket.close.assert_called_once() + + +def test_tcp_handler_log_exception(fake_socket, dummy_log_record): + handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5, + enabled_flag=True, message_type='splunk', + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) + handler.setFormatter(LogstashFormatter()) + with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\ + mock.patch('select.select', return_value=([], [], [])),\ + mock.patch('awx.main.utils.handlers.logger') as logger_mock: + fake_socket.connect.side_effect = Exception("foo") + handler.emit(dummy_log_record) + sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM) + logger_mock.exception.assert_called_once() + fake_socket.close.assert_called_once() + assert not fake_socket.send.called diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index ea2cb63be7cc..2cddfea09585 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -7,6 +7,9 @@ import requests import time import urlparse +import socket +import select +import six from concurrent.futures import ThreadPoolExecutor from copy import copy from requests.exceptions import RequestException @@ -20,7 +23,8 @@ from awx.main.utils.formatters import LogstashFormatter -__all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'configure_external_logger'] +__all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'TCPHandler', 'UDPHandler', + 'configure_external_logger'] logger = logging.getLogger('awx.main.utils.handlers') @@ -39,7 +43,7 @@ 'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS', 'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS', 'enabled_flag': 'LOG_AGGREGATOR_ENABLED', - 'http_timeout': 'LOG_AGGREGATOR_HTTP_TIMEOUT', + 'tcp_timeout': 'LOG_AGGREGATOR_TCP_TIMEOUT', } @@ -84,16 +88,11 @@ def _wrapped(*args, **kwargs): **kwargs) -class BaseHTTPSHandler(logging.Handler): - def __init__(self, fqdn=False, **kwargs): - super(BaseHTTPSHandler, self).__init__() - self.fqdn = fqdn +class BaseHandler(logging.Handler): + def __init__(self, **kwargs): + super(BaseHandler, self).__init__() for fd in PARAM_NAMES: setattr(self, fd, kwargs.get(fd, None)) - self.session = FuturesSession(executor=VerboseThreadPoolExecutor( - max_workers=2 # this is the default used by requests_futures - )) - self.add_auth_information() @classmethod def from_django_settings(cls, settings, *args, **kwargs): @@ -101,6 +100,113 @@ def from_django_settings(cls, settings, *args, **kwargs): kwargs[param] = getattr(settings, django_setting_name, None) return cls(*args, **kwargs) + def get_full_message(self, record): + if record.exc_info: + return '\n'.join(traceback.format_exception(*record.exc_info)) + else: + return record.getMessage() + + def _send(self, payload): + """Actually send message to log aggregator. + """ + return payload + + def _send_and_queue_system_tracking(self, payload_data): + # Special action for System Tracking, queue up multiple log messages + ret = [] + module_name = payload_data['module_name'] + if module_name in ['services', 'packages', 'files']: + facts_dict = payload_data.pop(module_name) + for key in facts_dict: + fact_payload = copy(payload_data) + fact_payload.update(facts_dict[key]) + ret.append(self._send(fact_payload)) + return ret + + def _format_and_send_record(self, record): + ret = [] + payload = self.format(record) + if self.indv_facts: + payload_data = json.loads(payload) + if record.name.startswith('awx.analytics.system_tracking'): + ret.extend(self._send_and_queue_system_tracking(payload_data)) + if len(ret) == 0: + ret.append(self._send(payload)) + return ret + + def _skip_log(self, logger_name): + if self.host == '' or (not self.enabled_flag): + return True + # Don't send handler-related records. + if logger_name == logger.name: + return True + # Tower log emission is only turned off by enablement setting + if not logger_name.startswith('awx.analytics'): + return False + return self.enabled_loggers is None or logger_name[len('awx.analytics.'):] not in self.enabled_loggers + + def emit(self, record): + """ + Emit a log record. Returns a list of zero or more + implementation-specific objects for tests. + """ + if self._skip_log(record.name): + return [] + try: + return self._format_and_send_record(record) + except (KeyboardInterrupt, SystemExit): + raise + except: + self.handleError(record) + + def _get_host(self, scheme='', hostname_only=False): + """Return the host name of log aggregator. + """ + host = self.host or '' + # urlparse requires '//' to be provided if scheme is not specified + if not urlparse.urlsplit(host).scheme and not host.startswith('//'): + host = '%s://%s' % (scheme, host) if scheme else '//%s' % host + parsed = urlparse.urlsplit(host) + + if hostname_only: + return parsed.hostname + + try: + port = parsed.port or self.port + except ValueError: + port = self.port + netloc = parsed.netloc if port is None else '%s:%s' % (parsed.hostname, port) + + url_components = list(parsed) + url_components[1] = netloc + ret = urlparse.urlunsplit(url_components) + return ret.lstrip('/') + + +class BaseHTTPSHandler(BaseHandler): + def _add_auth_information(self): + if self.message_type == 'logstash': + if not self.username: + # Logstash authentication not enabled + return + logstash_auth = requests.auth.HTTPBasicAuth(self.username, self.password) + self.session.auth = logstash_auth + elif self.message_type == 'splunk': + auth_header = "Splunk %s" % self.password + headers = { + "Authorization": auth_header, + "Content-Type": "application/json" + } + self.session.headers.update(headers) + + def __init__(self, fqdn=False, **kwargs): + self.fqdn = fqdn + super(BaseHTTPSHandler, self).__init__(**kwargs) + self.session = FuturesSession(executor=VerboseThreadPoolExecutor( + max_workers=2 # this is the default used by requests_futures + )) + self._add_auth_information() + @classmethod def perform_test(cls, settings): """ @@ -126,47 +232,7 @@ def perform_test(cls, settings): except RequestException as e: raise LoggingConnectivityException(str(e)) - def get_full_message(self, record): - if record.exc_info: - return '\n'.join(traceback.format_exception(*record.exc_info)) - else: - return record.getMessage() - - def add_auth_information(self): - if self.message_type == 'logstash': - if not self.username: - # Logstash authentication not enabled - return - logstash_auth = requests.auth.HTTPBasicAuth(self.username, self.password) - self.session.auth = logstash_auth - elif self.message_type == 'splunk': - auth_header = "Splunk %s" % self.password - headers = { - "Authorization": auth_header, - "Content-Type": "application/json" - } - self.session.headers.update(headers) - - def get_http_host(self): - host = self.host or '' - # urlparse requires scheme to be provided, default to use http if - # missing - if not urlparse.urlsplit(host).scheme: - host = 'http://%s' % host - parsed = urlparse.urlsplit(host) - # Insert self.port if its special and port number is either not - # given in host or given as non-numerical - try: - port = parsed.port or self.port - except ValueError: - port = self.port - if port not in (80, None): - new_netloc = '%s:%s' % (parsed.hostname, port) - return urlparse.urlunsplit((parsed.scheme, new_netloc, parsed.path, - parsed.query, parsed.fragment)) - return host - - def get_post_kwargs(self, payload_input): + def _get_post_kwargs(self, payload_input): if self.message_type == 'splunk': # Splunk needs data nested under key "event" if not isinstance(payload_input, dict): @@ -177,56 +243,64 @@ def get_post_kwargs(self, payload_input): else: payload_str = payload_input return dict(data=payload_str, background_callback=unused_callback, - timeout=self.http_timeout) + timeout=self.tcp_timeout) - def skip_log(self, logger_name): - if self.host == '' or (not self.enabled_flag): - return True - if not logger_name.startswith('awx.analytics'): - # Tower log emission is only turned off by enablement setting - return False - return self.enabled_loggers is None or logger_name[len('awx.analytics.'):] not in self.enabled_loggers - def emit(self, record): - """ - Emit a log record. Returns a list of zero or more - ``concurrent.futures.Future`` objects. - - See: + def _send(self, payload): + """See: https://docs.python.org/3/library/concurrent.futures.html#future-objects http://pythonhosted.org/futures/ """ - if self.skip_log(record.name): - return [] + return self.session.post(self._get_host(scheme='http'), + **self._get_post_kwargs(payload)) + + +def _encode_payload_for_socket(payload): + encoded_payload = payload + if isinstance(encoded_payload, dict): + encoded_payload = json.dumps(encoded_payload, ensure_ascii=False) + if isinstance(encoded_payload, six.text_type): + encoded_payload = encoded_payload.encode('utf-8') + return encoded_payload + + +class TCPHandler(BaseHandler): + def _send(self, payload): + payload = _encode_payload_for_socket(payload) + sok = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: - payload = self.format(record) - - # Special action for System Tracking, queue up multiple log messages - if self.indv_facts: - payload_data = json.loads(payload) - if record.name.startswith('awx.analytics.system_tracking'): - module_name = payload_data['module_name'] - if module_name in ['services', 'packages', 'files']: - facts_dict = payload_data.pop(module_name) - async_futures = [] - for key in facts_dict: - fact_payload = copy(payload_data) - fact_payload.update(facts_dict[key]) - async_futures.append(self._send(fact_payload)) - return async_futures - - return [self._send(payload)] - except (KeyboardInterrupt, SystemExit): - raise - except: - self.handleError(record) + sok.connect((self._get_host(hostname_only=True), self.port or 0)) + sok.setblocking(0) + _, ready_to_send, _ = select.select([], [sok], [], float(self.tcp_timeout)) + if len(ready_to_send) == 0: + logger.warning("Socket currently busy, failed to send message") + sok.close() + return + sok.send(payload) + except Exception as e: + logger.exception("Error sending message from %s: %s" % + (TCPHandler.__name__, e.message)) + sok.close() + + +class UDPHandler(BaseHandler): + def __init__(self, **kwargs): + super(UDPHandler, self).__init__(**kwargs) + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) def _send(self, payload): - return self.session.post(self.get_http_host(), - **self.get_post_kwargs(payload)) + payload = _encode_payload_for_socket(payload) + return self.socket.sendto(payload, (self._get_host(hostname_only=True), self.port or 0)) -def add_or_remove_logger(address, instance): +HANDLER_MAPPING = { + 'https': BaseHTTPSHandler, + 'tcp': TCPHandler, + 'udp': UDPHandler, +} + + +def _add_or_remove_logger(address, instance): specific_logger = logging.getLogger(address) for i, handler in enumerate(specific_logger.handlers): if isinstance(handler, (HTTPSNullHandler, BaseHTTPSHandler)): @@ -238,7 +312,6 @@ def add_or_remove_logger(address, instance): def configure_external_logger(settings_module, is_startup=True): - is_enabled = settings_module.LOG_AGGREGATOR_ENABLED if is_startup and (not is_enabled): # Pass-through if external logging not being used @@ -246,11 +319,12 @@ def configure_external_logger(settings_module, is_startup=True): instance = None if is_enabled: - instance = BaseHTTPSHandler.from_django_settings(settings_module) + handler_class = HANDLER_MAPPING[settings_module.LOG_AGGREGATOR_PROTOCOL] + instance = handler_class.from_django_settings(settings_module) instance.setFormatter(LogstashFormatter(settings_module=settings_module)) awx_logger_instance = instance if is_enabled and 'awx' not in settings_module.LOG_AGGREGATOR_LOGGERS: awx_logger_instance = None - add_or_remove_logger('awx.analytics', instance) - add_or_remove_logger('awx', awx_logger_instance) + _add_or_remove_logger('awx.analytics', instance) + _add_or_remove_logger('awx', awx_logger_instance) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index dd8a0a9a73db..10974033e7d7 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -877,7 +877,7 @@ def IS_TESTING(argv=None): TOWER_SETTINGS_MANIFEST = {} LOG_AGGREGATOR_ENABLED = False -LOG_AGGREGATOR_HTTP_TIMEOUT = 5 +LOG_AGGREGATOR_TCP_TIMEOUT = 5 # The number of retry attempts for websocket session establishment # If you're encountering issues establishing websockets in clustered Tower, diff --git a/docs/logging_integration.md b/docs/logging_integration.md index cca7434db489..14bc34e1cdf5 100644 --- a/docs/logging_integration.md +++ b/docs/logging_integration.md @@ -4,7 +4,8 @@ This feature builds in the capability to send detailed logs to several kinds of 3rd party external log aggregation services. Services connected to this data feed should be useful in order to gain insights into Tower usage or technical trends. The data is intended to be -sent in JSON format over a HTTP connection using minimal service-specific +sent in JSON format via three ways: over a HTTP connection, a direct TCP +connection or a direct UDP connection. It uses minimal service-specific tweaks engineered in a custom handler or via an imported library. ## Loggers @@ -169,14 +170,24 @@ supported services: - A flag to indicate how system tracking records will be sent - Selecting which loggers to send - Enabling sending logs + - Connection type, which can be HTTPS, TCP and UDP. + - Timeout value if connection type is based on TCP protocol (HTTPS and TCP). Some settings for the log handler will not be exposed to the user via -this mechanism. In particular, threading (enabled), and connection type -(designed for HTTP/HTTPS). +this mechanism. For example, threading (enabled). Parameters for the items listed above should be configurable through the Configure-Tower-in-Tower interface. +One note on configuring Host and Port: When entering URL it is customary to +include port number, like `https://localhost:4399/foo/bar`. So for the convenience +of users, when connection type is HTTPS, we allow entering hostname as a URL +with port number and thus ignore Port field. In other words, Port field is +optional in this case. On the other hand, TCP and UDP connections are determined +by `` tuple rather than URL. So in the case of TCP/UDP +connection, Port field is supposed to be provided and Host field is supposed to +contain hostname only. If instead a URL is entered in Host field, its hostname +portion will be extracted as the actual hostname. # Acceptance Criteria Notes @@ -204,4 +215,3 @@ request-response cycle. For example, loggly examples use threading work to fire the message without interfering with other operations. A timeout on the part of the log aggregation service should not cause Tower operations to hang. - diff --git a/tools/docker-compose/logstash.conf b/tools/docker-compose/logstash.conf index b340bfe1957d..47fbbb5d3a03 100644 --- a/tools/docker-compose/logstash.conf +++ b/tools/docker-compose/logstash.conf @@ -4,6 +4,12 @@ input { user => awx_logger password => "workflows" } + udp { + port => 8086 + } + tcp { + port => 8087 + } } ## Add your filters / logstash plugins configuration here diff --git a/tools/elastic/README.md b/tools/elastic/README.md index c872831eafaa..7681acae694c 100644 --- a/tools/elastic/README.md +++ b/tools/elastic/README.md @@ -35,8 +35,8 @@ by going to `{server address}:5601`. ### Authentication -The default logstash configuration makes use of basic auth, so a username -and password is needed in the configuration, in addition to the other +The default HTTPS logstash configuration makes use of basic auth, so a username +and password is needed in HTTPS configuration, in addition to the other parameters. The following settings are supported: ``` @@ -53,10 +53,47 @@ parameters. The following settings are supported: "system_tracking" ], "LOG_AGGREGATOR_INDIVIDUAL_FACTS": false, - "LOG_AGGREGATOR_ENABLED": true + "LOG_AGGREGATOR_ENABLED": true, + "LOG_AGGREGATOR_PROTOCOL": "https", + "LOG_AGGREGATOR_TCP_TIMEOUT": 5 +} +``` +and +``` +{ + "LOG_AGGREGATOR_HOST": "logstash", + "LOG_AGGREGATOR_PORT": 8086, + "LOG_AGGREGATOR_TYPE": "logstash", + "LOG_AGGREGATOR_LOGGERS": [ + "awx", + "activity_stream", + "job_events", + "system_tracking" + ], + "LOG_AGGREGATOR_INDIVIDUAL_FACTS": false, + "LOG_AGGREGATOR_ENABLED": true, + "LOG_AGGREGATOR_PROTOCOL": "udp", + "LOG_AGGREGATOR_TCP_TIMEOUT": 5 +} +``` +and +``` +{ + "LOG_AGGREGATOR_HOST": "logstash", + "LOG_AGGREGATOR_PORT": 8087, + "LOG_AGGREGATOR_TYPE": "logstash", + "LOG_AGGREGATOR_LOGGERS": [ + "awx", + "activity_stream", + "job_events", + "system_tracking" + ], + "LOG_AGGREGATOR_INDIVIDUAL_FACTS": false, + "LOG_AGGREGATOR_ENABLED": true, + "LOG_AGGREGATOR_PROTOCOL": "tcp", + "LOG_AGGREGATOR_TCP_TIMEOUT": 5 } ``` - These can be entered via Configure-Tower-in-Tower by making a POST to `/api/v1/settings/logging/`. @@ -81,4 +118,3 @@ Nov 18, 2016 - Original branch point `b5a4deee142b152d4f9232ebac5bbabb2d2cef3c` Sep 25, 2016, before X-Pack support - diff --git a/tools/elastic/docker-compose.logstash-link.yml b/tools/elastic/docker-compose.logstash-link.yml index 7092efcffe6f..9138dcf8e463 100644 --- a/tools/elastic/docker-compose.logstash-link.yml +++ b/tools/elastic/docker-compose.logstash-link.yml @@ -3,4 +3,4 @@ services: # Primary Tower Development Container tower: links: - - logstash \ No newline at end of file + - logstash