Skip to content

Commit

Permalink
[#3574] Fix wrong context used in PubSubPublisherClient.publish().
Browse files Browse the repository at this point in the history
  • Loading branch information
calohmn committed Oct 27, 2023
1 parent e8c8c5c commit 8351c95
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -118,18 +119,22 @@ public void close() {
* Otherwise, the future will be failed with a {@link ServiceInvocationException}
* indicating the reason for the failure.
*/
@Override
public Future<String> publish(final PubsubMessage pubsubMessage) {
final Promise<String> result = Promise.promise();
final Context context = vertx.getOrCreateContext();
final ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<>() {

@Override
public void onSuccess(final String messageId) {
// handle result on vert.x event loop instead of Publisher's Thread pool
vertx.runOnContext(ok -> result.complete(messageId));
// handle result on original vert.x context instead of Publisher's Thread pool
context.runOnContext(ok -> result.complete(messageId));
}

@Override
public void onFailure(final Throwable t) {
vertx.runOnContext(ok -> {
context.runOnContext(ok -> {
LOG.debug("error publishing messages to Pub/Sub", t);
result.fail(new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE, t));
});
Expand Down
3 changes: 3 additions & 0 deletions site/homepage/content/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ description = "Information about changes in recent Hono releases. Includes new f
* When running in a Kubernetes cluster with nodes using cgroups v2, the 'hono.command_internal.*' Kafka topics were not
being cleaned up. This has been fixed. Note that the solution requires the Hono protocol adapter pods to have
a service account with an assigned RBAC role that allows to perform "get" on the "pods" resource.
* When using Pub/Sub messaging, there were potentially issues concerning the AMQP connection between protocol adapter
and command router, leading for example to timeouts when MQTT devices subscribed/unsubscribed to the command topic.
This has been fixed.

### Deprecations

Expand Down

0 comments on commit 8351c95

Please sign in to comment.