Skip to content

Commit

Permalink
upload psot
Browse files Browse the repository at this point in the history
  • Loading branch information
BlueHorn07 committed Dec 26, 2024
1 parent df4a2ed commit d2024a6
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ excerpt: "κΌ­ Java둜 Kafka Producer κ°œλ°œν•  ν•„μš˜ μ—†μž–μ•„ Β―\\\_(ツ)_/Β―

# μ™œ Python Producer에 관심을 κ°–κ²Œ λ˜μ—ˆλ‚˜μš”?

"Kafka κ°œλ°œμ€ Javaκ°€ 일반적이죠"λΌλŠ” 말을 μ°Έ 많이 듀은 것 κ°™λ‹€. 그런데, μ§€κΈˆ λ‹€λ‹ˆλŠ” νšŒμ‚¬μ˜ Kafka Producer Application은 Python으둜 개발 λ˜μ–΄ 있고, 이걸둜 κ½€ λ§Žμ€ 데이터가 처리되고 μžˆλ‹€!! (μ§€κΈˆλ„!)
"Kafka κ°œλ°œμ€ Javaκ°€ 일반적이죠"λΌλŠ” 말을 μ°Έ 많이 듀은 것 κ°™λ‹€. 그런데, νšŒμ‚¬μ—μ„œμ˜ Kafka Producer Application은 Python으둜 개발 λ˜μ–΄ 있고, 이걸둜 κ½€ λ§Žμ€ 데이터가 처리되고 μžˆλ‹€!! (μ§€κΈˆλ„!)

κ·Έλ ‡κ²Œ μƒκ°ν•˜λ‹ˆ 'Python Producer도 λ‚˜μ˜μ§€ μ•ŠλŠ”λ°?'λΌλŠ” 생각이 듀기도 ν•˜κ³ , ν•œλ²ˆ μ œλŒ€λ‘œ 정리해보면 쒋을 것 κ°™λ‹€λŠ” 생각이 λ“€μ—ˆλ‹€. 이 글은 그런 생각이 듀은 λ‚΄κ°€ ν…ŒμŠ€νŠΈ ν•΄λ³Έ Python Producer μ½”λ“œλ“€μ„ μ•„μΉ΄μ΄λΈŒ ν•˜κ³  깨달은 점을 적은 글이닀.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ excerpt: "Kafka Connect둜 Topic 데이터λ₯Ό μ†μ‰½κ²Œ μ „λ‹¬ν•˜κΈ° ✌️ File

# λ“€μ–΄κ°€λ©°

μ§€κΈˆ λ‹€λ‹ˆλŠ” νšŒμ‚¬λŠ” Confluentλ₯Ό 톡해 Kafka ν΄λŸ¬μŠ€ν„°λ₯Ό μ‚¬μš©ν•˜κ³  μžˆμŠ΅λ‹ˆλ‹€. λŒ€λΆ€λΆ„μ˜ 경우 Confluent-managed Connectorλ₯Ό μ‚¬μš© ν–ˆμ—ˆμœΌλ‚˜, 졜근 Confluentμ—μ„œ μ§€μ›ν•˜μ§€ μ•ŠλŠ” λ°μ΄ν„°λ² μ΄μŠ€λ‘œ Topic의 데이터λ₯Ό μ μž¬ν•΄μ•Ό ν•  일이 μƒκ²ΌμŠ΅λ‹ˆλ‹€...! μ €λŠ” Confluentλ₯Ό 이해할 쒋은 기회라고 μƒκ°ν–ˆκ³ , Kubernetes μœ„μ—μ„œ Sink Connectorλ₯Ό μš΄μ˜ν•˜λŠ” κ²½ν—˜μ„ ν•΄λ³Ό 수 μžˆμ—ˆμŠ΅λ‹ˆλ‹€.
νšŒμ‚¬μ—μ„œ Confluentλ₯Ό 톡해 Kafka ν΄λŸ¬μŠ€ν„°λ₯Ό μ‚¬μš©ν•˜κ³  μžˆμŠ΅λ‹ˆλ‹€. λŒ€λΆ€λΆ„μ˜ 경우 Confluent-managed Connectorλ₯Ό μ‚¬μš© ν–ˆμ—ˆμœΌλ‚˜, 졜근 Confluentμ—μ„œ μ§€μ›ν•˜μ§€ μ•ŠλŠ” λ°μ΄ν„°λ² μ΄μŠ€λ‘œ Topic의 데이터λ₯Ό μ μž¬ν•΄μ•Ό ν•  일이 μƒκ²ΌμŠ΅λ‹ˆλ‹€...! μ €λŠ” Confluentλ₯Ό 이해할 쒋은 기회라고 μƒκ°ν–ˆκ³ , Kubernetes μœ„μ—μ„œ Sink Connectorλ₯Ό μš΄μ˜ν•˜λŠ” κ²½ν—˜μ„ ν•΄λ³Ό 수 μžˆμ—ˆμŠ΅λ‹ˆλ‹€.

ꡬ글에 검색해보면, [Strimzi](https://strimzi.io/)λ₯Ό μ‚¬μš©ν•΄ Kafka Cluster와 ConnectorκΉŒμ§€ λ„μš°λŠ” 경우λ₯Ό 많이 λ΄€λŠ”λ°μš”. Strimzi도 맀λ ₯적인 λ„κ΅¬μ΄μ§€λ§Œ, 이번 μž‘μ—…μ—μ„œλŠ” 그것 없이 Sink Connectorλ₯Ό λ„μ›Œλ³΄κ³  μ‹Άμ—ˆμŠ΅λ‹ˆλ‹€! κ·Έλž˜μ„œ μ € 말고도 이런 λ‹ˆμ¦ˆλ₯Ό 가진 뢄듀이 μžˆμ„κΉŒ ν•˜μ—¬ λ‚΄μš©μ„ μ •λ¦¬ν•΄λ³΄μ•˜μŠ΅λ‹ˆλ‹€ γ…Žγ…Ž

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
---
title: "Kafka Connector on k8s - Distributed Mode"
toc: true
toc_sticky: true
categories: ["Kafka"]
excerpt: ""
---

이번 ν¬μŠ€νŠΈλŠ” [Kafka Connector on k8s - Standalone Mode](/2024/12/17/kafka-connector-standalone-mode/)μ—μ„œ λ‚΄μš©μ΄ μ΄μ–΄μ§‘λ‹ˆλ‹€. πŸ™


# μ™œ Distributed Mode에 관심을 κ°–κ²Œ λ˜μ—ˆλ‚˜μš”?

νšŒμ‚¬μ—μ„œ Confluentλ₯Ό 톡해 Kafka ν΄λŸ¬μŠ€ν„°λ₯Ό μ‚¬μš©ν•˜κ³  μžˆμŠ΅λ‹ˆλ‹€. λŒ€λΆ€λΆ„μ˜ 경우 Confluent-managed Connectorλ₯Ό μ‚¬μš© ν–ˆμ—ˆμœΌλ‚˜, 졜근 Confluentμ—μ„œ μ§€μ›ν•˜μ§€ μ•ŠλŠ” λ°μ΄ν„°λ² μ΄μŠ€λ‘œ Topic의 데이터λ₯Ό μ μž¬ν•΄μ•Ό ν•  일이 생겼고, 이λ₯Ό K8s에 Standalone mode둜 λ„μ› μŠ΅λ‹ˆλ‹€.

κ·Έλ ‡κ²Œ ν•œ 달 정도 잘 μš΄μ˜ν•˜κ³  μžˆλ‹€κ°€ Confluent μ •κΈ° λ―ΈνŒ…μ—μ„œ Connectorλ₯Ό Standalone λͺ¨λ“œλ‘œ μ“°κ³  μžˆλ‹€κ³  말씀 λ“œλ¦¬λ‹ˆ, Standalone λͺ¨λ“œλŠ” λ¬Έμ„œ μ–΄λ””λ₯Ό 봐도 "*개발, ν…ŒμŠ€νŠΈ λͺ©μ μœΌλ‘œ μ“°μ‹œμ˜€*"라고 λ‚˜μ™€μžˆμ§€ Prod ν™˜κ²½μ—μ„œλŠ” "Distributed" λͺ¨λ“œλ‘œ λŒλ¦¬λŠ” 것이 ꢌμž₯ μ‚¬ν•­μ΄λΌλŠ” ν”Όλ“œλ°±μ„ λ“€μ—ˆμŠ΅λ‹ˆλ‹€! μ•”νŠΌ μ—¬κΈ°κΉŒμ§€κ°€ λ°°κ²½μ΄μ—ˆκ΅¬μš”! μ–΄λ–»κ²Œ κ΅¬ν˜„ν–ˆλŠ”μ§€ μ‚΄νŽ΄λ³΄κ² μŠ΅λ‹ˆλ‹€.

# Standalone vs. Distributed: Scalability

![](/images/development/kafka/kafka-connect-standalone-vs-distributed.png){: style="width: 100%" }
{: .small .gray .text-center }

Kafka Connectλ₯Ό Standalone λͺ¨λ“œλ‘œ λ””ν”Œλ‘œμ΄ ν•œ μƒνƒœμ—μ„œ Pod Replicaλ₯Ό 1μ—μ„œ 2둜 늘리게 λœλ‹€λ©΄, 각 Pod이 동일 데이터λ₯Ό μ²˜λ¦¬ν•˜κ²Œ λ©λ‹ˆλ‹€. 즉, Throughput이 2λ°°κ°€ λ˜λŠ” 것은 λ§žμ§€λ§Œ 데이터도 2λ°°μ”© μ€‘λ³΅ν•΄μ„œ λ“€μ–΄μ˜€κ²Œ λ©λ‹ˆλ‹€!!

λ°˜λ©΄μ— Distributed λͺ¨λ“œλ‘œ λ””ν”Œλ‘œμ΄ ν•œ μƒνƒœμ—μ„œ Pod Replicaλ₯Ό 2둜 늘리게 되면, 각 Pod이 데이터λ₯Ό "μ ˆλ°˜μ”© λ‚˜λˆ„μ–΄" μ²˜λ¦¬ν•˜κ²Œ λ©λ‹ˆλ‹€. 즉, Throughput을 2배둜 λŠ˜λ¦¬λ©΄μ„œ 데이터도 μ€‘λ³΅λ˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€!


# Standalone vs. Distributed: Config Topics

Standalone λͺ¨λ“œμ—μ„œλŠ” Connectorλ₯Ό μ‹€ν–‰ν•  λ•Œ, μ•„λž˜μ™€ 같이 μ‹€ν–‰ν•©λ‹ˆλ‹€.

```bash
$ connect-standalone standalone.properties local-file-sink.properties
```

`standalone.properties`μ—λŠ” Kafka Server에 μ ‘μ†ν•˜κΈ° μœ„ν•œ 정보가 λ‹΄κ²¨μžˆμ—ˆκ³ , `local-file-sink.properties`μ—λŠ” File Sink Connectorλ₯Ό λ™μž‘ν•˜κΈ° μœ„ν•œ 정보가 담겨 μžˆμ—ˆμŠ΅λ‹ˆλ‹€.

Distributed λͺ¨λ“œμ—μ„œλŠ” λ’€μ˜ Connector Plugin ꡬ성이 파일이 μ•„λ‹ˆλΌ Kafka Configuration Topic에 κΈ°λ‘λ©λ‹ˆλ‹€.

![](https://images.ctfassets.net/gt6dp23g0g38/6SpP65mFNZLSdngL4Gf4XD/9df532fc6b1b3d2c9385d443c17770e1/kafka-connect-distributed-mode.jpg)
{: .align-center style="width: 100%" }
developer.confluent.io
{: .small .gray .text-center }

Distributed λͺ¨λ“œλŠ” Standaloneκ³Ό 달리 3가지 토픽을 ν•„μš”λ‘œ ν•©λ‹ˆλ‹€.

- `config.storage.topic`
- Kafka Connectorμ—μ„œ μ‹€ν–‰ν•˜λŠ” μž‘μ—…(task)에 λŒ€ν•œ ꡬ성 정보
- `local-file-sink.properties` νŒŒμΌμ— 있던 정보가 μš” 토픽에 λ‹΄κΈ΄λ‹€.
- `offset.storage.topic`
- Kafka Connector의 μž‘μ—…μ΄ μ–΄λ””κΉŒμ§€ 처리 ν–ˆλŠ”μ§€ κΈ°λ‘ν•œ 정보
- Standaloneμ—μ„œλŠ” `offset.storage.file.filename`에 λͺ…μ‹œν•œ νŒŒμΌμ— ν•΄λ‹Ή 정보가 λ‹΄κ²Όλ‹€.
- `status.storage.topic`
- Kafka Connector μœ„μ—μ„œ λ™μž‘ν•˜λŠ” κ°œλ³„ μž‘μ—…(task)의 μƒνƒœλ₯Ό μ €μž₯ν•˜λŠ” 정보
- Distributed λͺ¨λ“œμ—μ„œλŠ” Fault Toleranceλ₯Ό μœ„ν•΄ 각 Taskκ°€ μ„œλ‘œ μƒνƒœλ₯Ό μ²΄ν¬ν•œλ‹€.

μ΄λ ‡κ²Œ ꡬ성 정보가 Kafka Connect λ‚΄λΆ€κ°€ μ•„λ‹ˆλΌ, μ™ΈλΆ€(remote)인 Topic에 기둝되기 λ•Œλ¬Έμ— λͺ¨λ“  Pod이 μ€‘λ‹¨λ˜κ±°λ‚˜ μœ μ‹€ λ˜λ”λΌλ„, Topic에 κΈ°λ‘ν•΄λ‘μ—ˆλ˜ 정보λ₯Ό λ°”νƒ•μœΌλ‘œ Kafka Connectλ₯Ό μ•ˆμ „ν•˜κ²Œ λ‹€μ‹œ μ‹€ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€ 😌


# Standalone β†’ Distributed

제 κ²½μš°λŠ” Standalone λͺ¨λ“œλ‘œ λ””ν”Œλ‘œμ΄ ν•œ Kafka Connectλ₯Ό Distributed λͺ¨λ“œλ‘œ μ „ν™˜ν•˜λŠ” 경우 μ˜€μŠ΅λ‹ˆλ‹€. μ–΄λ–€ propertiesλ₯Ό λ³€κ²½ ν–ˆλŠ”μ§€ μœ„μ£Όλ‘œ μ‚΄νŽ΄λ³΄λ©΄

```diff
- offset.storage.file.filename=/tmp/connect.offsets
+ group.id=local-file-sink
+ config.storage.topic=_local_file_sink.config
+ offset.storage.topic=_local_file_sink.offset
+ status.storage.topic=_local_file_sink.status
```

μš°μ„  더이상 offset 정보λ₯Ό Kafka Connect의 λ‘œμ»¬μ— μ €μž₯ν•˜μ§€ μ•ŠκΈ° λ•Œλ¬Έμ— `offset.storage.file.filename` 값이 ν•„μš” μ—†μŠ΅λ‹ˆλ‹€.

그리고 Distributed λͺ¨λ“œλ‘œ λ™μž‘ν•˜κΈ° μœ„ν•΄ 각 νƒœμŠ€ν¬μ˜ 정보λ₯Ό μ €μž₯ν•  Topic 3가지λ₯Ό 지정 ν•©λ‹ˆλ‹€.

- `config.storage.topic`
- `offset.storage.topic`
- `status.storage.topic`

Distributed λͺ¨λ“œλ‘œ Kafka Connectλ₯Ό λ””ν”Œλ‘œμ΄ ν•˜κ²Œ 되면, Kafka Connectλ₯Ό ν΄λŸ¬μŠ€ν„°(cluster) ν˜•μ‹μœΌλ‘œ μš΄μ˜ν•˜κ²Œ λ©λ‹ˆλ‹€. κ·Έλž˜μ„œ 이 ν΄λŸ¬μŠ€ν„° 이름을 `group.id`둜 μ§€μ •ν•΄μ€λ‹ˆλ‹€.

## Update Pod Yaml

Pod Yaml도 μ•„λž˜μ™€ 같이 λ³€κ²½ν•©λ‹ˆλ‹€.

```diff
- command:
- - "connect-standalone"
- - "/etc/kafka-connect-properties/standalone/standalone.properties"
- - "/etc/kafka-connect-properties/file-sink-connector/local-file-sink.properties"
+ command:
+ - "connect-distributed"
+ - "/etc/kafka-connect-properties/distributed/distributed.properties"
```

크게 λ³€κ²½λ˜λŠ” 점은 μ—†κ³ , containerλ₯Ό 돌릴 λ•Œ, `connect-distributed`와 μœ„μ˜ μš”κ΅¬μ‚¬ν•­μ„ λ°˜μ˜ν•œ `distributed.properties`둜 μ‹€ν–‰ν•˜λ„λ‘ λ³€κ²½ν•©λ‹ˆλ‹€.

## Registry Task using REST API

Standalone λͺ¨λ“œμ—μ„œλŠ” μ–΄λ–€ μž‘μ—…(task)λ₯Ό λŒλ¦΄μ§€ `.properties` νŒŒμΌμ„ μž‘μ„±ν•˜κ³  이λ₯Ό `connect-standalone`에 λ„˜κ²¨μ£Όμ—ˆμŠ΅λ‹ˆλ‹€.

Distributed λͺ¨λ“œμ—μ„œλŠ” μž‘μ—…(task)을 등둝할 λ•Œ Kafka Connect의 REST APIλ₯Ό μ‚¬μš©ν•©λ‹ˆλ‹€!

```bash
$ curl -X POST -H "Content-Type: application/json" \
http://localhost:8083/connectors \
--data "@/etc/kafka-connect-properties/file-sink-connector/local-file-sink.json"
```

μœ„μ˜ `curl` λͺ…λ Ήμ–΄μ—μ„œ `local-file-sink.json`으둜 μ „λ‹¬ν•˜λŠ”λ°, 이λ₯Ό μœ„ν•΄ `local-file-sink.json`을 μ•„λž˜μ™€ 같이 μž‘μ„±ν•œ ν›„, Standalone λͺ¨λ“œμ—μ„œ ν–ˆλ˜ κ²ƒμ²˜λŸΌ K8s Secret으둜 λ§Œλ“€μ–΄ Pod에 Volume Mount둜 μ£Όμž…ν•©λ‹ˆλ‹€.

```json
// @./local-file-sink.json
{
"name": "local-file-sink",
"config":
{
"connector.class": "FileStreamSink",
"tasks.max": "1",
"topics": "szcode2.qa.avro.server",

"file": "/tmp/test.sink.txt"
}
}
```

`curl`을 ν†΅ν•œ μž‘μ—… 등둝은 Kafka Connector Pod이 λ””ν”Œλ‘œμ΄ 되고, λͺ‡μ΄ˆκ°„μ˜ λžœλ”© ν›„ 등둝이 κ°€λŠ₯ν•©λ‹ˆλ‹€.

그리고, μž‘μ—… 등둝 후에 또 1λΆ„~3λΆ„ 정도 기닀리면, Confluentμ—μ„œλ„ Distributed λͺ¨λ“œμ˜ Connectorκ°€ λ“±λ‘λœ 것을 확인할 수 μžˆμŠ΅λ‹ˆλ‹€ 😌


# 맺음말

λ­”κ°€ Kafkaλ₯Ό 처음 곡뢀할 λ•Œ 봀던 기얡이 어렴풋이 λ‚˜λŠ” 것 같은데, 직접 λ„μ›Œλ³΄λ‹ˆ μ™œ Standalone λͺ¨λ“œμ™€ Distributed λͺ¨λ“œ, 두 방식이 μ‘΄μž¬ν•˜λŠ”μ§€ 잘 μ™€λ‹ΏλŠ” 것 κ°™μŠ΅λ‹ˆλ‹€ γ…Žγ…Ž (μ—­μ‹œ 직접 해봐야 λŠ˜μ–΄)

Distributed Mode 속성 쀑에 `rest.advertised.host.name` μͺ½μ€ 아직 μ œλŒ€λ‘œ λͺ» λ΄€λŠ”λ°, λ‚˜μ€‘μ— μ‹œκ°„μ΄ λ‚˜λ©΄ 쒀더 μ‚΄νŽ΄λ³΄κ³ μž ν•©λ‹ˆλ‹€. (μ λ‹Ήνžˆ λŠμ–΄μ£ΌλŠ” 것도 ν•„μš” γ…Žγ…Ž)

μ΄λ²ˆμ— νšŒμ‚¬ μ—…λ¬΄λ‘œ Kafka μž‘μ—…μ„ κ½€ 많이 ν•΄λ³΄κ²Œ λ˜μ–΄μ„œ λ‹€μŒ 자격증으둜 Confluent Certificateλ₯Ό λͺ©ν‘œλ‘œ μ„€μ • ν–ˆμŠ΅λ‹ˆλ‹€! 이μͺ½ μƒνƒœκ³„μ— λŒ€ν•΄μ„œλ„ 더더더 많이 μ•Œ 수 있게 되길 γ…Žγ…Ž 그럼 μ•žμœΌλ‘œλ„ μ•„μ’Œμž£! πŸ‘Š
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit d2024a6

Please sign in to comment.