forked from fresh2dev/pskafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathREADME.Rmd
336 lines (233 loc) · 14.4 KB
/
README.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
---
title: "pskafka"
output:
md_document:
toc: true
toc_depth: 3
variant: markdown_strict+backtick_code_blocks
pandoc_args: ["--atx-headers"]
---
```{r setup, include=FALSE, echo=FALSE}
knitr::opts_chunk$set(echo = TRUE)
```
`pskafka`: Enhancing the Kafka CLI with Powershell flavor.
Read this on [GitHub](https://github.com/dm3ll3n/pskafka) or [my site](https://www.donaldmellenbruch.com/project/pskafka/).
## Overview
[Apache Kafka](https://kafka.apache.org/) is a useful publish & subscribe messaging system. Data is transmitted, or "produced", to a Kafka as "messages" that are later retrieved, "consumed", by any number of recipients. A simple way of producing and consuming messages is with the default Kafka command-line interface, which uses Java to interact with a Kafka instance. Another Kafka CLI exists, [kafkacat](https://github.com/edenhill/kafkacat), which depends on the C/C++ library [librdkafka](https://github.com/edenhill/librdkafka). This Powershell module, pskafka, wraps around *either* the default Kafka CLI, or kafkacat, to provide the following:
1. a syntax friendly to Powershell developers.
2. easy reuse of Kafka producer(s) throughout a pipeline by communicating with the Kafka CLI over the standard input stream.
3. easily spawn and read from multiple Kafka consumers in separate threads.
Powershell is an object-oriented scripting language that was recently made open-source and cross-platform. Powershell can natively convert to and from JSON, which is a common format in which Kafka messages are produced. By parsing a JSON message into a Powershell object, transformations in the command-line are made much easier.
pskafka has comment-based help (i.e., docstring) that can be explored using Powershell's help system.
```{bash, engine.path='/snap/bin/pwsh'}
Import-Module ./pskafka.psd1
# List all commands in the `pskafka` module.
Get-Command -Module pskafka | Select-Object CommandType, Name
```
```{bash, engine.path='/snap/bin/pwsh'}
Import-Module ./pskafka.psd1
# Get help for a command.
Get-Help -Name 'Start-KafkaConsumer'
```
## Requirements
* A Kafka instance (if you don't have one, follow steps 1-3 of the [Kafka quickstart guide](https://kafka.apache.org/quickstart)).
* Powershell v5+ (if you're on a non-Windows system, install [Powershell Core](https://docs.microsoft.com/en-us/powershell/scripting/install/installing-powershell)).
* The `ThreadJob` module (ships with Powershell Core; if necessary, install with `Install-Module -Name 'ThreadJob'`).
* The `pskafka` module, of course --> install with `Install-Module -Name 'pskafka'`.
You will also need a local Kafka command-line interface, either [kafkacat](https://github.com/edenhill/kafkacat) or the standard Kafka CLI. `pskafka` ships with compiled builds of kafkacat v1.4.0RC1 for Debian Linux, Mac, and Windows. Either CLI has dependencies of its own that may need to be resolved; consult the documentation if necessary.
```{bash, engine.path='/snap/bin/pwsh', include=FALSE, eval=FALSE}
# unix
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
# windows
bin/windows/zookeeper-server-start.bat config/zookeeper.properties
bin/windows/kafka-server-start.bat config/server.properties
```
```{bash engine.path='/snap/bin/pwsh', eval=TRUE, echo=FALSE, include=FALSE}
Import-Module ./pskafka.psd1
Set-KafkaHome '~/kafka'
$topic = 'test'
$topic2 = 'test_two'
$zookeeper = 'localhost:2181'
# delete topic if exists
& "$env:KAFKA_HOME/bin/kafka-topics.sh" --delete --if-exists --topic $topic --zookeeper $zookeeper
& "$env:KAFKA_HOME/bin/kafka-topics.sh" --delete --if-exists --topic $topic2 --zookeeper $zookeeper
# create new topic
& "$env:KAFKA_HOME/bin/kafka-topics.sh" --create --topic $topic --zookeeper $zookeeper --replication-factor 1 --partitions 3
& "$env:KAFKA_HOME/bin/kafka-topics.sh" --create --topic $topic2 --zookeeper $zookeeper --replication-factor 1 --partitions 3
```
## Topics
First, get a list of all existing topics.
Using the Kafka CLI:
```{bash engine.path='/snap/bin/pwsh', eval=FALSE}
~/kafka/bin/kafka-topics.sh --zookeeper localhost --list
```
Using kafkacat:
```{bash engine.path='/snap/bin/pwsh', eval=FALSE}
./bin/deb/kafkacat -b localhost -L
```
Using pskafka:
```{bash engine.path='/snap/bin/pwsh', include=TRUE}
Import-Module ./pskafka.psd1
Get-KafkaTopics -BrokerList localhost -Verbose
```
Notice that, with `-Verbose` specified, any pskafka command will output the command issued to either CLI. Above, kafkacat was used, which ships with pskafka. To use the Java-based Kafka CLI, or another instance of kafkacat, specify the path in `KAFKA_HOME`. pskafka provides the command `Set-KafkaHome`, which will set `KAFKA_HOME` for the session.
```{bash engine.path='/snap/bin/pwsh'}
Import-Module ./pskafka.psd1
Set-KafkaHome '~/kafka'
Get-KafkaTopics -BrokerList localhost -Verbose
```
## Produce
When producing streams of messages, Kafka does so more efficiently by queueing up messages until a specified message count has been reached or time period has elapsed. A batch of messages is sent when one of either threshold is reached.
Producing with the Kafka CLI:
```{bash engine.path='/snap/bin/pwsh'}
0..9999 |
Select-Object @{Name='TsTicks';Expression={(Get-Date).Ticks}}, `
@{Name='Message'; Expression={ 'Hello world #' + $_.ToString() }} |
ForEach-Object { $_ | ConvertTo-JSON -Compress } |
~/kafka/bin/kafka-console-producer.sh --broker-list 'localhost:9092' --topic 'test' --batch-size 100 --timeout 1000 | Out-Null
```
Producing with kafkacat:
```{bash engine.path='/snap/bin/pwsh'}
0..9999 |
Select-Object @{Name='TsTicks';Expression={(Get-Date).Ticks}}, `
@{Name='Message'; Expression={ 'Hello world #' + $_.ToString() }} |
ForEach-Object { $_ | ConvertTo-JSON -Compress } |
./bin/deb/kafkacat -b 'localhost:9092' -t 'test' -P -X queue.buffering.max.messages=100,queue.buffering.max.ms=1000
```
Producing with pskafka using `Out-KafkaTopic`:
```{bash engine.path='/snap/bin/pwsh'}
Import-Module ./pskafka.psd1
0..9999 |
Select-Object @{Name='TsTicks';Expression={(Get-Date).Ticks}}, `
@{Name='Message'; Expression={ 'Hello world #' + $_.ToString() }} |
Out-KafkaTopic -TopicName 'test' -BrokerList 'localhost:9092' -BatchSize 100 -Verbose -ErrorAction Stop
```
### Persistent Producer
A useful feature of pskafka is the ability to start a Kafka CLI producer and write to it later. This allows for a more flexible workflow, such as writing messages to Kafka topic(s) given a condition. The example below first starts a Kafka producer, produces messages for a short duration, then stops the producer.
```{bash, engine.path='/snap/bin/pwsh'}
Import-Module ./pskafka.psd1
# start producer process
$p = Start-KafkaProducer -TopicName 'test' -BrokerList 'localhost:9092' -BatchSize 100 -TimeoutMS 1000 -Verbose
# start a timer
$timer = New-Object System.Diagnostics.Stopwatch
$timer.Start()
for ($i = 0; $timer.Elapsed.TotalSeconds -lt 5; $i++)
{
$obj = New-Object PSObject -Property @{
'TsTicks'=(Get-Date).Ticks;
'Message'="Hello Kafka #$i"
}
# write to producer process over STDIN.
$obj | Out-KafkaTopic -Producer $p
}
# stop timer
$timer.Stop()
# stop producer
$p | Stop-KafkaProducer | Out-Null
Write-Host $("Produced {0} messages in {1} seconds." -f $i, [math]::Round($timer.Elapsed.TotalSeconds, 2))
```
## Consume
Kafka consumers read messages from a topic. A consumer starts reading from a specific *offset*, which is typically either:
1. latest offset; the end of the topic messages (default).
2. earliest offset; the beginning of the topic messages.
3. stored offset; the offset stored for a consumer group.
A useful feature of Kafka is its ability to efficiently store offsets for consumers in a "consumer group". A stored offset allows a consumer to beginning reading where it last left off. In addition, all consumers in a group *share* the workload across Kafka topic partitions; no single message is sent to two consumers in the same group.
### Simple Consumer
Offsets are not committed for a simple consumer, so a simple consumer will either begin reading from the end of a topic (default) or the beginning (if specified).
Consuming with Kafka CLI:
```{bash, engine.path='/snap/bin/pwsh'}
$messages = ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 'localhost:9092' --topic 'test' --max-messages 1000 --from-beginning
Write-Host $("{0} total messages consumed" -f $messages.Length)
```
Consuming with kafkacat:
```{bash, engine.path='/snap/bin/pwsh'}
$messages = ./bin/deb/kafkacat -C -b 'localhost:9092' -t 'test' -o beginning -c 1000
Write-Host $("{0} total messages consumed" -f $messages.Length)
```
Consuming with pskafka:
```{bash, engine.path='/snap/bin/pwsh'}
Import-Module ./pskafka.psd1
$messages = Read-KafkaTopic -TopicName 'test' -BrokerList 'localhost:9092' -MessageCount 1000 -FromBeginning -Verbose
Write-Host $("{0} total messages consumed" -f $messages.Length)
```
Consuming with pskafka (multiple consumers):
```{bash, engine.path='/snap/bin/pwsh'}
Import-Module ./pskafka.psd1
$messages = Read-KafkaTopic -TopicName 'test' -BrokerList 'localhost:9092' -Instances 3 -MessageCount 1000 -FromBeginning -Verbose
Write-Host $("{0} total messages consumed" -f $messages.Length)
Write-Host $("{0} unique messages consumed" -f @($messages | Select-Object -Unique).Length)
```
In the example above, notice how three consumers were created (`-Instances 3`), and 3,000 messages were consumed, but only 1,000 of the messages are unique. This is because each consumer received the same set of messages from the topic.
Consuming with pskafka (multiple consumers in consumer group):
In the example below, the three consumers are made part of the same consumer group with the `-ConsumerGroup` parameter. Thus, all of the 3,000 consumed messages are distinct; i.e., each consumer received a unique set of messages from the topic.
```{bash, engine.path='/snap/bin/pwsh'}
Import-Module ./pskafka.psd1
$messages = Read-KafkaTopic -TopicName 'test' -BrokerList 'localhost:9092' -ConsumerGroup 'my_consumer_group' -Instances 3 -MessageCount 1000 -FromBeginning -Verbose
Write-Host $("{0} total messages consumed" -f $messages.Length)
Write-Host $("{0} unique messages consumed" -f @($messages | Select-Object -Unique).Length)
```
> Note that `-FromBeginning` is only applicable for a consumer group that does not already have a stored offset to read from.
### MultiTopic Consumer
Specify an array of topic names to `-TopicName` in order to spawn a consumer for each topic. If `-Instances` is greater than 1, *each* topic will get the number of instances (e.g., 3 topics w/ 2 instances each = 6 total instances).
```{bash, engine.path='/snap/bin/pwsh', echo=FALSE, include=FALSE, eval=TRUE}
Import-Module ./pskafka.psd1
0..999 | Out-KafkaTopic -TopicName 'test_two' -BrokerList 'localhost:9092'
```
```{bash, engine.path='/snap/bin/pwsh'}
Import-Module ./pskafka.psd1
$one = Read-KafkaTopic -TopicName 'test' -BrokerList 'localhost:9092' -FromBeginning -Verbose |
Measure-Object | Select-Object -ExpandProperty Count
$two = Read-KafkaTopic -TopicName 'test_two' -BrokerList 'localhost:9092' -FromBeginning -Verbose |
Measure-Object | Select-Object -ExpandProperty Count
$one_and_two = Read-KafkaTopic -TopicName 'test','test_two' -BrokerList 'localhost:9092' -FromBeginning -Verbose |
Measure-Object | Select-Object -ExpandProperty Count
($one + $two) -eq $one_and_two
```
### Persistent Consumer
By default, a consumer will exit soon after all topic messages have been processed. Include the `-Persist` parameter to instruct a consumer persist after reaching the end of a topic. The parameter `-TimeoutMS` instructs the consumer to exit if no messages have been received within the specified duration. Without this, the consumer would persist indefinitely, passing messages down the pipeline as they arrive.
```{bash, engine.path='/snap/bin/pwsh'}
Import-Module ./pskafka.psd1
Read-KafkaTopic -TopicName 'test' -BrokerList 'localhost:9092' -ConsumerGroup 'my_consumer_group_3' -Instances 3 -FromBeginning -Persist -TimeoutMS 30000 -Verbose |
ForEach-Object `
-Begin {
$i=0
$timer = New-Object System.Diagnostics.Stopwatch
$timer.Start()
} `
-Process {
$i++
if ($i % 10000 -eq 0) {
Write-Host $( '{0} msg/sec; {1} total messages.' -f ($i / $timer.Elapsed.TotalSeconds ).ToString(), $i )
}
} `
-End {
Write-Host "Consumed $i total messages."
$timer.Stop()
}
```
### Consumer Threads
The command `Read-KafkaTopic` actually encapsulates three aptly-named commands:
1. `Start-KafkaConsumer`: invokes consumer processes in separate threads; consumers immediately begin consuming messages in background threads.
2. `Read-KafkaConsumer`: reads and clears the output streams from a thread.
3. `Stop-KafkaConsumer`: stops a thread.
The object returned from `Start-KafkaConsumer` is a [ThreadJob](https://github.com/PaulHigin/PSThreadJob) that is compatible with the standard Powershell commands (`Get-Job`, `Wait-Job`, `Receive-Job`). In fact, the commands `Get-KafkaConsumer`, `Wait-KafkaConsumer`, and `Receive-KafkaConsumer` are just aliases to these native Powershell commands.
It is very easy to start a background consumer with `Start-KafkaConsumer` and never read from or stop it. If this happens, the consumer could read an unbounded number of messages until system resources are exceeded. Be responsible with calls to `Start-KafkaConsumer` by following up with `Read-KafkaConsumer` and `Stop-KafkaConsumer`. When in doubt, kill all background jobs using `Get-Job | Remove-Job -Force`.
## Powershell Object Example
Earlier, I alluded to Powershell's powerful object-oriented approach to the shell. I'll conclude this walkthrough with an example that illustrates this. The following example:
1. reads messages in JSON format.
2. converts them to a Powershell object.
3. augments the original message.
4. outputs new message to Kafka.
5. outputs new message to a local CSV file.
```{bash, engine.path='/snap/bin/pwsh'}
Import-Module ./pskafka.psd1
Read-KafkaTopic -TopicName 'test' -BrokerList 'localhost:9092' -FromBeginning -MessageCount 100 -Verbose |
ConvertFrom-Json |
Select-Object *, @{Name='Timestamp'; Expression={ ([datetime]$_['TsTicks']).ToLongTimeString() }} |
Out-KafkaTopic -TopicName 'test_two' -BrokerList 'localhost:9092' -BatchSize 100 -PassThru -Verbose |
Export-Csv 'test.csv'
```
```{bash, engine.path='/snap/bin/pwsh', eval=TRUE, include=FALSE, echo=FALSE}
Remove-Item 'test.csv'
```