-
Notifications
You must be signed in to change notification settings - Fork 46
/
kafka_info.py
126 lines (109 loc) · 3.43 KB
/
kafka_info.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Ansible module for Kafka information
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from pkg_resources import parse_version
# XXX: fix kafka-python import broken for Python 3.12
import ansible.module_utils.kafka_fix_import # noqa
from ansible.module_utils.ansible_release import __version__ as ansible_version
# import module snippets
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.pycompat24 import get_exception
from kafka.errors import KafkaError
from ansible.module_utils.kafka_lib_commons import (
module_commons, DOCUMENTATION_COMMON, get_manager_from_params,
maybe_clean_kafka_ssl_files
)
ANSIBLE_METADATA = {'metadata_version': '1.0'}
DOCUMENTATION = '''
---
module: kafka_info
short_description: Gather Kafka information
description:
- Gather Kafka information.
- Not compatible with Kafka version < 0.11.0.
author:
- Stephen SORRIAUX
options:
resource:
description:
- 'the type of resource to get information about'
required: True
choices: [topic, broker, consumer_group, acl,
topic-config]
include_defaults:
description:
- 'Include defaults configuration when using topic-config resource'
required: False
default: True
include_internal:
description:
- 'Include internal topics when using topic or topic-config resource'
required: False
default: False
''' + DOCUMENTATION_COMMON
EXAMPLES = '''
- name: Get topics from Kafka cluster
kafka_info:
resource: "topic"
bootstrap_servers: "{{ ansible_ssh_host }}:9094"
api_version: "{{ kafka_api_version }}"
sasl_mechanism: "PLAIN"
security_protocol: "SASL_SSL"
sasl_plain_username: "admin"
sasl_plain_password: "{{ kafka_admin_password }}"
ssl_check_hostname: False
ssl_cafile: "{{ kafka_cacert | default('/etc/ssl/certs/cacert.crt') }}"
ignore_empty_partition: True
register: my_topics
'''
def main():
module = AnsibleModule(
argument_spec=dict(
resource=dict(
choices=[
'topic',
'broker',
'consumer_group',
'acl',
'topic-config',
'user'
],
required=True
),
include_defaults=dict(type='bool', default=True),
include_internal=dict(type='bool', default=False),
**module_commons
)
)
params = module.params
resource = params['resource']
manager = None
results = None
try:
manager = get_manager_from_params(params)
results = manager.get_resource(resource, params)
except KafkaError:
e = get_exception()
module.fail_json(
msg='Error while getting %s from Kafka: %s ' % (resource, e)
)
except Exception:
e = get_exception()
module.fail_json(
msg='Something went wrong: %s ' % e
)
finally:
if manager:
manager.close()
maybe_clean_kafka_ssl_files(params)
# Ansible deprecate module 'results' key
if parse_version(ansible_version) < parse_version('2.8.0'):
module.exit_json(changed=True, results=results)
else:
module.exit_json(changed=True, ansible_module_results=results)
if __name__ == '__main__':
main()