Skip to content

Commit

Permalink
Fixes #3799: Apache Kafka procedures (#4172)
Browse files Browse the repository at this point in the history
* Fixes #3799: Apache Kafka procedures

* removed unused stuff from procedures

* code cleanup

* removed code unused by procedures

* Added docs and cleanup

* fix tests

* fix other tests

* changed deps

* added dep

* upgraded deps

* removed extradep

* added procs in extended*.txt

* cleanup

* Revert "cleanup"

This reverts commit 6a07e3f.

* restored stuff

* fixed tests

* cleanup

* resolved deps conflict error
  • Loading branch information
vga91 committed Feb 14, 2025
1 parent c5436ca commit c5bc7ab
Show file tree
Hide file tree
Showing 85 changed files with 10,271 additions and 21 deletions.
1 change: 1 addition & 0 deletions docs/asciidoc/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ include::partial$generated-documentation/nav.adoc[]
** xref::database-integration/load-ldap.adoc[]
** xref::database-integration/redis.adoc[]
** xref::database-integration/vectordb/index.adoc[]
** xref::database-integration/kafka/index.adoc[]
* xref:graph-updates/index.adoc[]
** xref::graph-updates/uuid.adoc[]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

= Confluent Cloud

[[confluent_cloud]]
Configuring a connection to a Confluent Cloud instance should follow
link:{url-confluent-java-client}[Confluent's Java Client] configuration advice.
At a minimum, to configure this, you will need:

* `BOOTSTRAP_SERVER_URL`
* `API_KEY`
* `API_SECRET`

More specifically the procedures has to be configured as follows:

.neo4j.conf
[source,ini]
----
apoc.kafka.bootstrap.servers=${BOOTSTRAP_SERVER_URL}
apoc.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${API_KEY}" password="${API_SECRET}";
apoc.kafka.ssl.endpoint.identification.algorithm=https
apoc.kafka.security.protocol=SASL_SSL
apoc.kafka.sasl.mechanism=PLAIN
apoc.kafka.request.timeout.ms=20000
apoc.kafka.retry.backoff.ms=500
----

Make sure to replace `BOOTSTRAP_SERVER_URL`, `API_SECRET`, and `API_KEY` with the values that Confluent Cloud
gives you when you generate an API access key.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
=== Configuration summary

You can set the following Kafka configuration values in your `neo4j.conf`, here are the defaults.

.neo4j.conf
[source,subs="verbatim,attributes"]
----
apoc.kafka.bootstrap.servers=localhost:9092
apoc.kafka.auto.offset.reset=earliest
apoc.kafka.group.id=neo4j
apoc.kafka.enable.auto.commit=true
apoc.kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
apoc.kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
{environment}.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
{environment}.topic.cdc.sourceId=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
{environment}.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
{environment}.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
{environment}.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>
{environment}.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
{environment}.enabled=<true/false, default=false>
----

See the https://kafka.apache.org/documentation/#brokerconfigs[Apache Kafka documentation] for details on these settings.

[NOTE]

if `apoc.kafka.cluster.only` is set to true, APOC Kafka will refuse to start in single instance mode,
or when run in the context of the backup operation. This is an important safety guard to ensure that operations do not occur in unexpected situations for production deploys

See the https://kafka.apache.org/documentation/#brokerconfigs[Apache Kafka documentation] for details on these settings.

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
= Kafka

[[kafka]]


[[apoc_neo4j_plugin_quickstart]]
== APOC Kafka Procedures

NOTE: to enable the Kafka dependencies we need to set the APOC configuration `apoc.kafka.enabled=true`

Any configuration option that starts with `apoc.kafka.` controls how the procedures itself behaves.

=== Install dependencies

The Kafka dependencies are included in https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/{apoc-release}/apoc-kafka-dependencies-{apoc-release}-all.jar[apoc-kafka-dependencies-{apoc-release}-all.jar^], which can be downloaded from the https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/tag/{apoc-release}[releases page^].
Once that file is downloaded, it should be placed in the `plugins` directory and the Neo4j Server restarted.

[[kafka-settings]]
=== Kafka settings

Any configuration option that starts with `apoc.kafka.` will be passed to the underlying Kafka driver. Neo4j
Kafka procedures uses the official Confluent Kafka producer and consumer java clients.
Configuration settings which are valid for those connectors will also work for APOC Kafka.

For example, in the Kafka documentation linked below, the configuration setting named `batch.size` should be stated as
`apoc.kafka.batch.size` in APOC Kafka.

The following are common configuration settings you may wish to use.
.Most Common Needed Configuration Settings
|===
|Setting Name |Description |Default Value

|apoc.kafka.max.poll.records
|The maximum number of records to pull per batch from Kafka. Increasing this number will mean
larger transactions in Neo4j memory and may improve throughput.
|500

|apoc.kafka.buffer.memory
|The total bytes of memory the producer can use to buffer records waiting. Use this to adjust
how much memory the procedures may require to hold messages not yet delivered to Neo4j
|33554432

|apoc.kafka.batch.size
|(Producer only) The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
|16384

|apoc.kafka.max.partition.fetch.bytes
|(Consumer only) The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress.
|1048576

|apoc.kafka.group.id
|A unique string that identifies the consumer group this consumer belongs to.
|N/A
|===

=== Configure Kafka Connection

If you are running locally or against a standalone machine, configure `apoc.conf` to point to that server:

.neo4j.conf
[source,ini]
----
apoc.kafka.bootstrap.servers=localhost:9092
----

If you are using Confluent Cloud (managed Kafka), you can connect to Kafka as described in
the xref:database-integration/kafka/cloud.adoc#confluent_cloud[Confluent Cloud] section


==== Restart Neo4j

Once the plugin is installed and configured, restarting the database will make it active.
If you have configured Neo4j to consume from kafka, it will begin immediately processing messages.
Loading

0 comments on commit c5bc7ab

Please sign in to comment.