-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka.yaml
234 lines (234 loc) · 8.08 KB
/
kafka.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
---
apiVersion: v1
kind: Service
metadata:
name: kafka-hs
labels:
app: kafka
annotations:
# prometheus.io/port: "9308"
prometheus.io/port: "5556"
prometheus.io/scrape: "true"
spec:
ports:
- port: 9092
name: server
# - port: 9308
# name: metrics
- port: 5556
name: metrics
clusterIP: None
selector:
app: kafka
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: kafka-pdb
spec:
selector:
matchLabels:
app: kafka
maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka
spec:
serviceName: kafka-hs
replicas: 3
selector:
matchLabels:
app: kafka
podManagementPolicy: Parallel
updateStrategy:
type: RollingUpdate
template:
metadata:
labels:
app: kafka
annotations:
prometheus.io/port: metrics
prometheus.io/scrape: "true"
spec:
securityContext:
runAsUser: 1000
fsGroup: 1000
initContainers:
- name: clean-datadir
image: busybox:1.34.1
command: ["sh", "-c", "rm -rf /var/lib/kafka/data/*"]
securityContext:
privileged: true
volumeMounts:
- name: datadir
mountPath: /var/lib/kafka/data
containers:
# - name: kafka-exporter
# image: danielqsj/kafka-exporter:v1.7.0
# imagePullPolicy: IfNotPresent
# ports:
# - containerPort: 9308
# name: metrics
# command:
# - sh
# - -exc
# - |
# until nc -z -v -w30 $POD_IP 9092; do echo 'Waiting for kafka...'; sleep 2; done && \
# kafka_exporter \
# --kafka.server=$POD_IP:9092 \
# --kafka.version=3.6.0 \
# --web.listen-address=:9308 \
# --log.level=info
# env:
# - name: POD_IP
# valueFrom:
# fieldRef:
# fieldPath: status.podIP
# resources:
# requests:
# memory: "200Mi"
# cpu: "250m"
# limits:
# memory: "200Mi"
# cpu: "250m"
- name: prometheus-jmx-exporter
image: solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143
imagePullPolicy: IfNotPresent
ports:
- containerPort: 5556
name: metrics
command:
- java
- -XX:+UnlockExperimentalVMOptions
- -XX:+UseCGroupMemoryLimitForHeap
- -XX:MaxRAMFraction=1
- -XshowSettings:vm
- -jar
- jmx_prometheus_httpserver.jar
- "5556"
- /etc/jmx-kafka/jmx-kafka-prometheus.yml
volumeMounts:
- name: jmx-config
mountPath: /etc/jmx-kafka
- name: kafka
# if KAFKA_BROKER_ID is not specified, it will automatically be generated
image: confluentinc/cp-kafka:7.6.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
name: kafka
- containerPort: 5555
name: jmx
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: KAFKA_HEAP_OPTS
value: "-Xms512M -Xmx512M"
- name: KAFKA_JMX_PORT
value: "5555"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk-cs:2181
- name: KAFKA_LOG_DIRS
value: /var/lib/kafka/data
# For subscribed consumers, committed offset of a specific partition will be expired and discarded when
# 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty)
# 2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic
# default 7 days
- name: KAFKA_OFFSETS_RETENTION_MINUTES
value: "10080"
# The replication factor for the offsets topic (set higher to ensure availability)
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
# The number of partitions for the offset commit topic (should not change after deployment)
- name: KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS
value: "50"
# set to 256GB
- name: KAFKA_LOG_RETENTION_BYTES
value: "274877906944"
# - name: KAFKA_LOG_RETENTION_HOURS
# value: "72"
- name: KAFKA_LOG_RETENTION_MINUTES
value: "4320"
- name: KAFKA_BACKGROUND_THREADS
value: "15"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "true"
- name: KAFKA_AUTO_LEADER_REBALANCE_ENABLE
value: "true"
- name: KAFKA_DEFAULT_REPLICATION_FACTOR
value: "3"
# The number of acknowledgments the producer requires the leader to have received
# 1 means he leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers
# if the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost
# all means the leader will wait for the full set of in-sync replicas to acknowledge the record
# default=1
# The default number of partitions per topic (default to 1)
- name: KAFKA_NUM_PARTITIONS
value: "1"
- name: KAFKA_ACKS
value: "all"
# When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of in-sync replicas that must acknowledge the write
# set to 2 to ensure quorum acknowledgement under relication factor = 3 and acks = all
- name: KAFKA_MIN_INSYNC_REPLICAS
value: "2"
# When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream (using an internal sequence number)
# If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream
# enabling idempotence requires max.in.flight.requests.per.connection <= 5 (default=5), retries > 0 (default=2147483647) and acks must be 'all'
- name: KAFKA_ENABLE_IDEMPOTENCE
value: "false"
# The number of threads that the server uses for processing requests, which may include disk I/O
# default=8
- name: KAFKA_NUM_IO_THREADS
value: "8"
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
# default=3
- name: KAFKA_NUM_NETWORK_THREADS
value: "4"
# default=1048588 (1MB)
- name: KAFKA_MESSAGE_MAX_BYTES
value: "1048588"
# The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance (default=3000)
- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
value: "3500"
- name: KAFKA_LOG4J_ROOT_LOGLEVEL
value: "ERROR"
- name: KAFKA_LOG4J_LOGGERS
value: "org.apache.zookeeper=ERROR,org.apache.kafka=ERROR,kafka=ERROR,kafka.cluster=ERROR,kafka.controller=ERROR,kafka.coordinator=ERROR,kafka.log=ERROR,kafka.server=ERROR,kafka.zookeeper=ERROR,state.change.logger=ERROR"
- name: KAFKA_TOOLS_LOG4J_LOGLEVEL
value: "ERROR"
command:
- sh
- -exc
- |
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_NAME}.kafka-hs:9092 && \
exec /etc/confluent/docker/run
volumeMounts:
- name: datadir
mountPath: /var/lib/kafka/data
livenessProbe:
tcpSocket:
port: 9092
initialDelaySeconds: 15
periodSeconds: 30
volumes:
- name: jmx-config
configMap:
name: kafka-jmx-configmap