-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Apache Kafka as alternative to AMQP 1.0 Messaging Network #8
Comments
We have an important point : AMQP 1.0 |
That's true, converting from AMQP 1.0 messages to a Kafka producer will cost some cycles. However, my expectation would be that this will be made up by the fact that Kafka can ingest and deliver the messages with a high degree of parallelism. So this might introduce some latency to the transmission of an individual message but will (hopefully) still improve the throughput of the overall system. This, of course, requires that we run multiple Hono endpoints that can serve as parallel Kafka producers ... |
Considering that Kafka doesn't support AMQP 1.0 natively we can think it as a service for our IoT Connector. Of course, it won't be the primary ingestion system but a service which connects to the IoT Connector to receive data via AMQP 1.0 and provides them in a Kafka fashion to other services like for example Apache Storm. |
We could try to use this AMQP Kafka bridge (https://github.com/rhiot/amqp-kafka-bridge) for this issue. |
Hi i would like to contribute more in the Hono project. I work in my freetime a lot with kafka. So i am interested to have a deeper look if it is possible to use kafka as a underlying messaging infrastructure for hono. I would start to look at this AMQP Kafka bridge. |
Great 👍 |
@danielbohn that's great ! Consider it as a POC for now and that the Vert.x community is working on a Vert.x client for Kafka so it could be possible to use it inside the bridge instead using directly the official Kafka client. |
@sophokles73 and @ppatierno i have an example with Kafka and the AMQP-Kafka bridge running on my local machine. I would now integrate the Kafka messaging infrastructure in the hono architecture behind the Qpid dispatch router for the telemetry endpoint. |
@ppatierno is a docker image für the Kafka-AMQP bridge available ? |
this sounds interesting but I wonder what that would actually entail. Is this merely a config file for dispatch router? |
@sophokles73 yes it would be a config file for the dispatch router and a new docker-compose file in the example folder. Later on we can also use the Kafka messaging infrastructure for the event endpoint. |
@sophokles73 @danielbohn on the top of my head should be enabling an outgoing connection from the router to the AMQP bridge (which listens as a server). @danielbohn there is no Docker image available online on Docker Hub but there is the Docker file for building that. In order to avoid a waste of time for you let me try the connection from router to bridge in a configuration outside of Hono scenario. I'll come back to you soon. ;) |
@ppatierno could you please explain "on the top of my head should be enabling an outgoing connection ..." a little bit more. I have a example running with your bridge for sending and receiving messages via AMQP 1.0 and kafka |
Before we spend more effort on this I would like to better understand what we are trying to achieve here. My understanding is that we can easily pipe messages into a downstream Kafka server from Dipatch Router. But downstream receivers would then directly attach to Kafka in order to process the messages, or wouldn't they? |
Using the bridge you should be able to put message to Kafka using AMQP 1.0 so through the router network. |
@sophokles73 With this approach we would have yet another place where we need to enforce security. We already have problems to do this for two interfaces (Hono and QDR). Or do you think Kafka is not part of Hono platform in this case? I think in a shared cloud environment this is not feasible. |
one could argue that Kafka is a means for (optionally) persisting telemetry data that is specific to the application. In that case I would consider it NOT part of Hono. @ppatierno: I have a hard time imagining how data can be consumed through an AMQP 1.0 bridge given the different concepts of how a client works (Kafka vs. AMQP). But maybe you have found a good way to map AMQP to Kafka's consumer protocol ... |
@sophokles73 yes it's what I tried to do with the bridge ... but feedback are welcome ... you can see more information on mapping on the project landing page and the Wiki section ;) |
Similar implementation: |
Our friends from the Eclipse Ditto project have added support for forwarding messages received from Hono to a user provided Kafka instance. So, people who want the messages end up in a Kafka cluster should use Hono together with Ditto and follow the instructions provided by the Ditto project. |
In my view, replacing the AMQP messaging network with a Kafka cluster could still be an interesting option. I think it makes sense to keep the discussion in this issue, even though other goals and strategies have been discussed here. |
@b-abel sure |
…on processing error. When the creation of a message in the class implementing AbstractAtLeastOnceKafkaConsumer or the message handler throw an unexpected exception, the close handler is invoked with that exception. Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
…ility. Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
…ender. Use KafkaMessageHelper.createKafkaHeader() for the encoding of Kafka headers in AbstractKafkaBasedMessageSender. Encoding is error-prone, so use a single place where this is implemented. Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
…tionClientFactory. Document that (a) the message handler passed into KafkaApplicationClientFactory, respectively DownstreamMessageConsumer should not throw exceptions and (b) that the close handler is invoked if an exception is thrown in the message handler. Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
Use KafkaMessageHelper.createKafkaHeader() for the encoding of Kafka headers in AbstractKafkaBasedMessageSender. Encoding is error-prone, so use a single place where this is implemented. Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
…the processed offsets. Instead of committing the collected offsets only in case of an error and calling the parameterless commit method otherwise, AbstractAtLeastOnceKafkaConsumer now always use the collected offsets when committing. Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
Mocked handlers that are invoked immediately could cause a stack overflow when polling too often. Changed the tests to prevent this. Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
The message handler passed into AbstractAtLeastOnceKafkaConsumer may throw a ServerErrorException to indicate a transient error. In contrast to other exceptions thrown by the message handler, the consumer will not be closed in this case. Instead, the current offsets will be committed and the failed message will be fetched again with the next poll operation. Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
If the message handler passed into AbstractAtLeastOnceKafkaConsumer throws a runtime exception, the current offsets will be committed and the failed message will be again in the next batch to be polled. The consumer will not be closed. Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
…throws an exception. Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
If an error occurs while processing a message or when the consumer is stopped, the offsets for the already processed messages from the current batch are now committed. This is to prevent all messages from being processed again. Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
Hi @sophokles73 |
We are planning to release 1.7.0 this week. Note that support for Kafka is still considered experimental, though. |
@calohmn @b-abel @kaniyan @fkaltner There seem to be no more related issues/PRs pending for this, so I wonder if we can close this issue for the time being. I am sure that there will be additions/improvements being made in the (near) future. But I guess that the ground work has been done by you already 👍 |
Yes, issue can be closed FMPOV. |
FMPOV too this issue can be closed. |
Yes, let's close it. |
Hono does not strive to implement a new message broker but instead tries to leverage existing messaging infrastructure to meet its quality goals. It seems reasonable to assume that the amount of telemetry data flowing upstream from devices to back end applications will be orders of magnitude larger than the data being sent to devices from back end applications in order to invoke operations or configure properties on the devices.
It seems therefore feasible to leverage Apache Kafka (at least) for transporting telemetry data downstream taking advantage of Kafka's qualities regarding horizontal scalability, low latency and fault tolerance.
We will need to come up with ideas of how to map downstream data to Kafka's topics and how to allow for parallel message production/consumption in the context of Hono.
The text was updated successfully, but these errors were encountered: