From 53acaf044b1298d3d1876d6747a7b4252689abbe Mon Sep 17 00:00:00 2001 From: Pierre MORVAN Date: Mon, 2 Oct 2023 12:26:59 +0200 Subject: [PATCH] Partition batched creations / alter / delete to support kraft batched operations (#158) Skip python2 installation when specified --- module_utils/kafka_manager.py | 159 ++++++++++++++----------- molecule/default/Dockerfile.j2 | 4 +- molecule/scram-kafka-270/Dockerfile.j2 | 4 +- 3 files changed, 95 insertions(+), 72 deletions(-) diff --git a/module_utils/kafka_manager.py b/module_utils/kafka_manager.py index 20aaacf3..15f893f6 100644 --- a/module_utils/kafka_manager.py +++ b/module_utils/kafka_manager.py @@ -64,6 +64,11 @@ ) +# Max entries that can be inserted inside one kraft entry +# @see QuorumController.java +KRAFT_MAX_OPERATIONS = 10000 + + class KafkaManager: """ A class used to interact with Kafka and Zookeeper @@ -156,7 +161,7 @@ def create_topics(self, topics): Creates a topic Usable for Kafka version >= 0.10.1 """ - request = CreateTopicsRequest_v0( + requests = [CreateTopicsRequest_v0( create_topic_requests=[( topic['name'], topic['partitions'], @@ -164,20 +169,22 @@ def create_topics(self, topics): topic['replica_assignment'] if 'replica_assignment' in topic else [], topic['options'].items() if 'options' in topic else [] - ) for topic in topics], + ) for topic in partitioned_topics], timeout=self.request_timeout_ms - ) - response = self.send_request_and_get_response(request) + ) for partitioned_topics in + self._list_partition_by(topics, KRAFT_MAX_OPERATIONS)] + for request in requests: + response = self.send_request_and_get_response(request) - for topic, error_code in response.topic_errors: - if error_code != self.SUCCESS_CODE: - raise KafkaManagerError( - 'Error while creating topic %s. ' - 'Error key is %s, %s.' % ( - topic, kafka.errors.for_code(error_code).message, - kafka.errors.for_code(error_code).description + for topic, error_code in response.topic_errors: + if error_code != self.SUCCESS_CODE: + raise KafkaManagerError( + 'Error while creating topic %s. ' + 'Error key is %s, %s.' % ( + topic, kafka.errors.for_code(error_code).message, + kafka.errors.for_code(error_code).description + ) ) - ) def delete_topics(self, topics): """ @@ -414,52 +421,62 @@ def describe_acls(self, acl_resource, api_version): return acl_list + def _list_partition_by(self, lst, size): + for i in range(0, len(lst), size): + yield list(itertools.islice(lst, i, i + size)) + def create_acls(self, acl_resources, api_version): """Create a set of ACLs""" if api_version < parse_version('2.0.0'): - request = CreateAclsRequest_v0( + requests = [CreateAclsRequest_v0( creations=[self._convert_create_acls_resource_request_v0( acl_resource) for acl_resource in acl_resources] - ) + )] else: - request = CreateAclsRequest_v1( + requests = [CreateAclsRequest_v1( creations=[self._convert_create_acls_resource_request_v1( - acl_resource) for acl_resource in acl_resources] - ) - response = self.send_request_and_get_response(request) + acl_resource) for acl_resource + in partitioned_acl_resources] + ) for partitioned_acl_resources in + self._list_partition_by(acl_resources, KRAFT_MAX_OPERATIONS)] + for request in requests: + response = self.send_request_and_get_response(request) - for error_code, error_message in response.creation_responses: - if error_code != self.SUCCESS_CODE: - raise KafkaManagerError( - 'Error while creating ACL %s. Error %s: %s.' % ( - acl_resources, error_code, error_message + for error_code, error_message in response.creation_responses: + if error_code != self.SUCCESS_CODE: + raise KafkaManagerError( + 'Error while creating ACL %s. Error %s: %s.' % ( + acl_resources, error_code, error_message + ) ) - ) def delete_acls(self, acl_resources, api_version): """Delete a set of ACLSs""" if api_version < parse_version('2.0.0'): - request = DeleteAclsRequest_v0( + requests = [DeleteAclsRequest_v0( filters=[self._convert_delete_acls_resource_request_v0( acl_resource) for acl_resource in acl_resources] - ) + )] else: - request = DeleteAclsRequest_v1( + requests = [DeleteAclsRequest_v1( filters=[self._convert_delete_acls_resource_request_v1( - acl_resource) for acl_resource in acl_resources] - ) + acl_resource) for acl_resource in + partitioned_acl_resources] + ) for partitioned_acl_resources in + self._list_partition_by(acl_resources, KRAFT_MAX_OPERATIONS)] - response = self.send_request_and_get_response(request) + for request in requests: + response = self.send_request_and_get_response(request) - for error_code, error_message, _ in response.filter_responses: - if error_code != self.SUCCESS_CODE: - raise KafkaManagerError( - 'Error while deleting ACL %s. Error %s: %s.' % ( - acl_resources, error_code, error_message + for error_code, error_message, _ in response.filter_responses: + if error_code != self.SUCCESS_CODE: + raise KafkaManagerError( + 'Error while deleting ACL %s. Error %s: %s.' % ( + acl_resources, error_code, error_message + ) ) - ) def send_request_and_get_response(self, request, node_id=None): """ @@ -1613,32 +1630,36 @@ def _map_to_quota_resources(entries): } } for entry in entries] - @staticmethod - def _map_to_quota_request(entries): - return AlterClientQuotasRequest_v0(entries=[ - ( - [( - entity['entity_type'], - entity['entity_name'] - ) for entity in entry['entity']], - [( - key, - value, - True - ) for key, value in entry['quotas_to_delete'].items()] + - [( - key, - value, - False - ) for key, value in entry['quotas_to_alter'].items()] + - [( - key, - value, - False - ) for key, value in entry['quotas_to_add'].items()] - ) - for entry in entries - ], validate_only=False) + def _map_to_quota_requests(self, entries): + return [AlterClientQuotasRequest_v0(entries=[ + ( + [( + entity['entity_type'], + entity['entity_name'] + ) for entity in entry['entity']], + [( + key, + value, + True + ) for key, value in + entry['quotas_to_delete'].items()] + + [( + key, + value, + False + ) for key, value in + entry['quotas_to_alter'].items()] + + [( + key, + value, + False + ) for key, value in + entry['quotas_to_add'].items()] + ) + for entry in partitioned_entries + ], validate_only=False + ) for partitioned_entries in + self._list_partition_by(entries, KRAFT_MAX_OPERATIONS)] def describe_quotas(self): if parse_version(self.get_api_version()) >= parse_version('2.6.0'): @@ -1725,12 +1746,14 @@ def describe_quotas(self): def alter_quotas(self, quotas): if parse_version(self.get_api_version()) >= parse_version('2.6.0'): - request = self._map_to_quota_request(quotas) - response = self.send_request_and_get_response(request) - response_entries = response.to_object()['entries'] - for response_entry in response_entries: - if response_entry['error_code'] != 0: - raise KafkaManagerError(response_entry['error_message']) + requests = self._map_to_quota_requests(quotas) + for request in requests: + response = self.send_request_and_get_response(request) + response_entries = response.to_object()['entries'] + for response_entry in response_entries: + if response_entry['error_code'] != 0: + raise KafkaManagerError( + response_entry['error_message']) else: # Use zookeeper when kafka < 2.6.0 try: diff --git a/molecule/default/Dockerfile.j2 b/molecule/default/Dockerfile.j2 index 37323368..1148986c 100644 --- a/molecule/default/Dockerfile.j2 +++ b/molecule/default/Dockerfile.j2 @@ -6,8 +6,8 @@ FROM {{ item.registry.url }}/{{ item.image }} FROM {{ item.image }} {% endif %} -RUN if [ $(command -v apt-get) ]; then apt-get update && apt-get upgrade -y && apt-get install -y python2 sudo bash ca-certificates iproute2 && apt-get clean; \ +RUN if [ $(command -v apt-get) ]; then apt-get update && apt-get upgrade -y && apt-get install -y python2 || apt-get install -y python3 && apt-get install -y sudo bash ca-certificates iproute2 && apt-get clean; \ elif [ $(command -v dnf) ]; then dnf makecache && dnf --assumeyes install python sudo python-devel python2-dnf bash iproute2 && dnf clean all; \ elif [ $(command -v yum) ]; then yum makecache fast && yum update -y && yum install -y python sudo yum-plugin-ovl bash iproute2 && sed -i 's/plugins=0/plugins=1/g' /etc/yum.conf && yum clean all; \ elif [ $(command -v zypper) ]; then zypper refresh && zypper update -y && zypper install -y python sudo bash python-xml iproute2 && zypper clean -a; \ - elif [ $(command -v apk) ]; then apk update && apk add --no-cache python2 sudo bash ca-certificates iproute2; fi + elif [ $(command -v apk) ]; then apk update && apk add --no-cache python2 || apk add --no-cache python3 && apk add --no-cache sudo bash ca-certificates iproute2; fi diff --git a/molecule/scram-kafka-270/Dockerfile.j2 b/molecule/scram-kafka-270/Dockerfile.j2 index 37323368..1148986c 100644 --- a/molecule/scram-kafka-270/Dockerfile.j2 +++ b/molecule/scram-kafka-270/Dockerfile.j2 @@ -6,8 +6,8 @@ FROM {{ item.registry.url }}/{{ item.image }} FROM {{ item.image }} {% endif %} -RUN if [ $(command -v apt-get) ]; then apt-get update && apt-get upgrade -y && apt-get install -y python2 sudo bash ca-certificates iproute2 && apt-get clean; \ +RUN if [ $(command -v apt-get) ]; then apt-get update && apt-get upgrade -y && apt-get install -y python2 || apt-get install -y python3 && apt-get install -y sudo bash ca-certificates iproute2 && apt-get clean; \ elif [ $(command -v dnf) ]; then dnf makecache && dnf --assumeyes install python sudo python-devel python2-dnf bash iproute2 && dnf clean all; \ elif [ $(command -v yum) ]; then yum makecache fast && yum update -y && yum install -y python sudo yum-plugin-ovl bash iproute2 && sed -i 's/plugins=0/plugins=1/g' /etc/yum.conf && yum clean all; \ elif [ $(command -v zypper) ]; then zypper refresh && zypper update -y && zypper install -y python sudo bash python-xml iproute2 && zypper clean -a; \ - elif [ $(command -v apk) ]; then apk update && apk add --no-cache python2 sudo bash ca-certificates iproute2; fi + elif [ $(command -v apk) ]; then apk update && apk add --no-cache python2 || apk add --no-cache python3 && apk add --no-cache sudo bash ca-certificates iproute2; fi