From a7044d04807968e98e12e6b18d18f4768bf21d1b Mon Sep 17 00:00:00 2001
From: Francisco Javier Tirado Sarti
 <65240126+fjtirado@users.noreply.github.com>
Date: Fri, 4 Oct 2024 17:44:55 +0200
Subject: [PATCH] [incubator-kie-issues#1457] Allow grouping of events (#3654)

* [Fix apache/incubator-kie-issues#1457] Allow grouping of events

* Add test for coverage

* [Fix apache/incubator-kie-issues#1457] Grouping of event serialization

* [Fix apache/incubator-kie-issues#1457] Adding default constructor

* [Fix apache/incubator-kie-issues#1457] Adding TYPE constant

---------

Co-authored-by: gmunozfe <gmunozfe@redhat.com>
---
 .../kie/kogito/event/AbstractDataEvent.java   |  16 +-
 .../MultipleProcessInstanceDataEvent.java     |  34 ++
 .../process/ProcessInstanceDataEvent.java     |   6 +
 .../MultipleUserTaskInstanceDataEvent.java    |  34 ++
 .../usertask/UserTaskInstanceDataEvent.java   |   5 +
 kogito-build/kogito-dependencies-bom/pom.xml  |   2 +-
 quarkus/addons/events/process/runtime/pom.xml |  11 +
 .../AbstractMessagingEventPublisher.java      | 188 +++++++++
 .../GroupingMessagingEventPublisher.java      |  72 ++++
 .../ReactiveMessagingEventPublisher.java      | 135 +-----
 .../GroupingMessagingEventPublisherTest.java  | 392 ++++++++++++++++++
 11 files changed, 757 insertions(+), 138 deletions(-)
 create mode 100644 api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
 create mode 100644 api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/MultipleUserTaskInstanceDataEvent.java
 create mode 100644 quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java
 create mode 100644 quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java
 create mode 100644 quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java

diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java
index 2fcdd704196..065111c8519 100644
--- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java
+++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java
@@ -177,6 +177,15 @@ public abstract class AbstractDataEvent<T> implements DataEvent<T> {
     protected AbstractDataEvent() {
     }
 
+    protected AbstractDataEvent(String type, URI source, T body) {
+        this.specVersion = SpecVersion.parse(SPEC_VERSION);
+        this.id = UUID.randomUUID().toString();
+        this.source = source;
+        this.type = type;
+        this.time = ZonedDateTime.now().toOffsetDateTime();
+        this.data = body;
+    }
+
     protected AbstractDataEvent(String type,
             String source,
             T body,
@@ -201,12 +210,7 @@ protected AbstractDataEvent(String type,
             String subject,
             String dataContentType,
             String dataSchema) {
-        this.specVersion = SpecVersion.parse(SPEC_VERSION);
-        this.id = UUID.randomUUID().toString();
-        this.source = Optional.ofNullable(source).map(URI::create).orElse(null);
-        this.type = type;
-        this.time = ZonedDateTime.now().toOffsetDateTime();
-        this.data = body;
+        this(type, Optional.ofNullable(source).map(URI::create).orElse(null), body);
         setKogitoProcessInstanceId(kogitoProcessInstanceId);
         setKogitoRootProcessInstanceId(kogitoRootProcessInstanceId);
         setKogitoProcessId(kogitoProcessId);
diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
new file mode 100644
index 00000000000..7db8c0e7659
--- /dev/null
+++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.process;
+
+import java.net.URI;
+import java.util.Collection;
+
+public class MultipleProcessInstanceDataEvent extends ProcessInstanceDataEvent<Collection<ProcessInstanceDataEvent<?>>> {
+
+    public static final String TYPE = "MultipleProcessInstanceDataEvent";
+
+    public MultipleProcessInstanceDataEvent() {
+    }
+
+    public MultipleProcessInstanceDataEvent(URI source, Collection<ProcessInstanceDataEvent<?>> body) {
+        super(TYPE, source, body);
+    }
+}
diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java
index 8069df7ee39..31131563dcc 100644
--- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java
+++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java
@@ -18,6 +18,8 @@
  */
 package org.kie.kogito.event.process;
 
+import java.net.URI;
+
 import org.kie.kogito.event.AbstractDataEvent;
 
 public class ProcessInstanceDataEvent<T> extends AbstractDataEvent<T> {
@@ -29,6 +31,10 @@ public ProcessInstanceDataEvent(T body) {
         setData(body);
     }
 
+    protected ProcessInstanceDataEvent(String type, URI source, T body) {
+        super(type, source, body);
+    }
+
     public ProcessInstanceDataEvent(String type,
             String source,
             T body,
diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/MultipleUserTaskInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/MultipleUserTaskInstanceDataEvent.java
new file mode 100644
index 00000000000..b2b62c61d83
--- /dev/null
+++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/MultipleUserTaskInstanceDataEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.event.usertask;
+
+import java.net.URI;
+import java.util.Collection;
+
+public class MultipleUserTaskInstanceDataEvent extends UserTaskInstanceDataEvent<Collection<UserTaskInstanceDataEvent<?>>> {
+
+    public static final String TYPE = "MultipleUserTaskInstanceDataEvent";
+
+    public MultipleUserTaskInstanceDataEvent() {
+    }
+
+    public MultipleUserTaskInstanceDataEvent(URI source, Collection<UserTaskInstanceDataEvent<?>> body) {
+        super(TYPE, source, body);
+    }
+}
diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java
index 98fc6528094..c4b3e0af5c9 100644
--- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java
+++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java
@@ -18,6 +18,7 @@
  */
 package org.kie.kogito.event.usertask;
 
+import java.net.URI;
 import java.util.Set;
 
 import org.kie.kogito.event.AbstractDataEvent;
@@ -48,6 +49,10 @@ public UserTaskInstanceDataEvent(T body) {
         setData(body);
     }
 
+    protected UserTaskInstanceDataEvent(String type, URI source, T body) {
+        super(type, source, body);
+    }
+
     public UserTaskInstanceDataEvent(String type,
             String source,
             T body,
diff --git a/kogito-build/kogito-dependencies-bom/pom.xml b/kogito-build/kogito-dependencies-bom/pom.xml
index 0c815fc2353..26fae00aa0d 100644
--- a/kogito-build/kogito-dependencies-bom/pom.xml
+++ b/kogito-build/kogito-dependencies-bom/pom.xml
@@ -52,7 +52,7 @@
     <version.io.quarkiverse.jackson-jq>2.0.2</version.io.quarkiverse.jackson-jq>
     <version.io.quarkiverse.openapi.generator>2.4.1</version.io.quarkiverse.openapi.generator>
     <version.io.quarkiverse.asyncapi>0.3.0</version.io.quarkiverse.asyncapi>
-    <version.io.quarkiverse.reactivemessaging.http>2.2.0</version.io.quarkiverse.reactivemessaging.http>
+    <version.io.quarkiverse.reactivemessaging.http>2.4.1</version.io.quarkiverse.reactivemessaging.http>
     <version.io.quarkiverse.embedded.postgresql>0.2.3</version.io.quarkiverse.embedded.postgresql>
     <version.com.github.haifengl.smile>1.5.2</version.com.github.haifengl.smile>
     <version.com.github.javaparser>3.25.8</version.com.github.javaparser>
diff --git a/quarkus/addons/events/process/runtime/pom.xml b/quarkus/addons/events/process/runtime/pom.xml
index dc5d417218d..50ef92684c1 100644
--- a/quarkus/addons/events/process/runtime/pom.xml
+++ b/quarkus/addons/events/process/runtime/pom.xml
@@ -78,6 +78,17 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+    
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-junit5-mockito</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java
new file mode 100644
index 00000000000..f8092c83575
--- /dev/null
+++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.events.process;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Consumer;
+
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.eclipse.microprofile.reactive.messaging.OnOverflow;
+import org.eclipse.microprofile.reactive.messaging.OnOverflow.Strategy;
+import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
+import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.event.EventPublisher;
+import org.kie.kogito.events.config.EventsRuntimeConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.smallrye.reactive.messaging.MutinyEmitter;
+import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+
+public abstract class AbstractMessagingEventPublisher implements EventPublisher {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractMessagingEventPublisher.class);
+
+    @Inject
+    ObjectMapper json;
+
+    @Inject
+    @Channel(PROCESS_INSTANCES_TOPIC_NAME)
+    @OnOverflow(Strategy.UNBOUNDED_BUFFER)
+    MutinyEmitter<String> processInstancesEventsEmitter;
+    private AbstractMessageEmitter processInstanceConsumer;
+
+    @Inject
+    @Channel(PROCESS_DEFINITIONS_TOPIC_NAME)
+    MutinyEmitter<String> processDefinitionEventsEmitter;
+    private AbstractMessageEmitter processDefinitionConsumer;
+
+    @Inject
+    @Channel(USER_TASK_INSTANCES_TOPIC_NAME)
+    MutinyEmitter<String> userTasksEventsEmitter;
+    private AbstractMessageEmitter userTaskConsumer;
+    @Inject
+    EventsRuntimeConfig eventsRuntimeConfig;
+
+    @Inject
+    Instance<MessageDecoratorProvider> decoratorProviderInstance;
+
+    private MessageDecoratorProvider decoratorProvider;
+
+    @PostConstruct
+    public void init() {
+        decoratorProvider = decoratorProviderInstance.isResolvable() ? decoratorProviderInstance.get() : null;
+        processDefinitionConsumer = eventsRuntimeConfig.isProcessInstancesPropagateError() ? new BlockingMessageEmitter(processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME)
+                : new ReactiveMessageEmitter(processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME);
+        processInstanceConsumer = eventsRuntimeConfig.isProcessDefinitionPropagateError() ? new BlockingMessageEmitter(processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME)
+                : new ReactiveMessageEmitter(processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME);
+        userTaskConsumer = eventsRuntimeConfig.isUserTasksPropagateError() ? new BlockingMessageEmitter(userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME)
+                : new ReactiveMessageEmitter(userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME);
+    }
+
+    protected Optional<AbstractMessageEmitter> getConsumer(DataEvent<?> event) {
+        if (event == null) {
+            return Optional.empty();
+        }
+        switch (event.getType()) {
+            case "ProcessDefinitionEvent":
+                return eventsRuntimeConfig.isProcessDefinitionEventsEnabled() ? Optional.of(processDefinitionConsumer) : Optional.empty();
+
+            case "ProcessInstanceErrorDataEvent":
+            case "ProcessInstanceNodeDataEvent":
+            case "ProcessInstanceSLADataEvent":
+            case "ProcessInstanceStateDataEvent":
+            case "ProcessInstanceVariableDataEvent":
+                return eventsRuntimeConfig.isProcessInstancesEventsEnabled() ? Optional.of(processInstanceConsumer) : Optional.empty();
+
+            case "UserTaskInstanceAssignmentDataEvent":
+            case "UserTaskInstanceAttachmentDataEvent":
+            case "UserTaskInstanceCommentDataEvent":
+            case "UserTaskInstanceDeadlineDataEvent":
+            case "UserTaskInstanceStateDataEvent":
+            case "UserTaskInstanceVariableDataEvent":
+                return eventsRuntimeConfig.isUserTasksEventsEnabled() ? Optional.of(userTaskConsumer) : Optional.empty();
+
+            default:
+                return Optional.empty();
+        }
+    }
+
+    @Override
+    public void publish(Collection<DataEvent<?>> events) {
+        for (DataEvent<?> event : events) {
+            publish(event);
+        }
+    }
+
+    protected void publishToTopic(AbstractMessageEmitter emitter, Object event) {
+        logger.debug("About to publish event {} to topic {}", event, emitter.topic);
+        Message<String> message = null;
+        try {
+            String eventString = json.writeValueAsString(event);
+            logger.debug("Event payload '{}'", eventString);
+            message = decorateMessage(ContextAwareMessage.of(eventString));
+        } catch (Exception e) {
+            logger.error("Error while creating event to topic {} for event {}", emitter.topic, event);
+        }
+        if (message != null) {
+            emitter.accept(message);
+        }
+    }
+
+    protected Message<String> decorateMessage(Message<String> message) {
+        return decoratorProvider != null ? decoratorProvider.decorate(message) : message;
+    }
+
+    protected static abstract class AbstractMessageEmitter implements Consumer<Message<String>> {
+
+        protected final String topic;
+        protected final MutinyEmitter<String> emitter;
+
+        protected AbstractMessageEmitter(MutinyEmitter<String> emitter, String topic) {
+            this.emitter = emitter;
+            this.topic = topic;
+        }
+    }
+
+    private static class BlockingMessageEmitter extends AbstractMessageEmitter {
+        protected BlockingMessageEmitter(MutinyEmitter<String> emitter, String topic) {
+            super(emitter, topic);
+        }
+
+        @Override
+        public void accept(Message<String> message) {
+            emitter.sendMessageAndAwait(message);
+            logger.debug("Successfully published message {}", message.getPayload());
+        }
+    }
+
+    private static class ReactiveMessageEmitter extends AbstractMessageEmitter {
+        protected ReactiveMessageEmitter(MutinyEmitter<String> emitter, String topic) {
+            super(emitter, topic);
+        }
+
+        @Override
+        public void accept(Message<String> message) {
+            emitter.sendMessageAndForget(message
+                    .withAck(() -> onAck(message))
+                    .withNack(reason -> onNack(reason, message)));
+        }
+
+        private CompletionStage<Void> onAck(Message<String> message) {
+            logger.debug("Successfully published message {}", message.getPayload());
+            return CompletableFuture.completedFuture(null);
+        }
+
+        private CompletionStage<Void> onNack(Throwable reason, Message<String> message) {
+            logger.error("Error while publishing message {}", message, reason);
+            return CompletableFuture.completedFuture(null);
+        }
+
+    }
+}
diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java
new file mode 100644
index 00000000000..ee1f284b3a8
--- /dev/null
+++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.events.process;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
+
+import io.quarkus.arc.properties.IfBuildProperty;
+
+import jakarta.inject.Singleton;
+
+@Singleton
+@IfBuildProperty(name = "kogito.events.grouping", stringValue = "true")
+public class GroupingMessagingEventPublisher extends AbstractMessagingEventPublisher {
+
+    @Override
+    public void publish(DataEvent<?> event) {
+        publish(Collections.singletonList(event));
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public void publish(Collection<DataEvent<?>> events) {
+        Map<AbstractMessageEmitter, Collection> eventsByChannel = new HashMap<>();
+        for (DataEvent<?> event : events) {
+            getConsumer(event).ifPresent(c -> eventsByChannel.computeIfAbsent(c, k -> new ArrayList<>()).add(event));
+        }
+        eventsByChannel.entrySet().forEach(this::publishEvents);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private void publishEvents(Map.Entry<AbstractMessageEmitter, Collection> entry) {
+        DataEvent<?> firstEvent = (DataEvent<?>) entry.getValue().iterator().next();
+        URI source = firstEvent.getSource();
+        if (firstEvent instanceof UserTaskInstanceDataEvent) {
+            publishToTopic(entry.getKey(), new MultipleUserTaskInstanceDataEvent(source, (Collection<UserTaskInstanceDataEvent<?>>) entry.getValue()));
+        } else if (firstEvent instanceof ProcessInstanceDataEvent) {
+            publishToTopic(entry.getKey(), new MultipleProcessInstanceDataEvent(source, (Collection<ProcessInstanceDataEvent<?>>) entry.getValue()));
+        } else {
+            for (DataEvent<?> event : (Collection<DataEvent<?>>) entry.getValue()) {
+                publishToTopic(entry.getKey(), event);
+            }
+        }
+    }
+}
diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java
index c232a1eb471..c6aa4424f0c 100644
--- a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java
+++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java
@@ -19,102 +19,20 @@
 package org.kie.kogito.events.process;
 
 import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.BiConsumer;
 
-import org.eclipse.microprofile.reactive.messaging.Channel;
-import org.eclipse.microprofile.reactive.messaging.Message;
-import org.eclipse.microprofile.reactive.messaging.OnOverflow;
-import org.eclipse.microprofile.reactive.messaging.OnOverflow.Strategy;
-import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
 import org.kie.kogito.event.DataEvent;
-import org.kie.kogito.event.EventPublisher;
-import org.kie.kogito.events.config.EventsRuntimeConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import io.quarkus.arc.properties.UnlessBuildProperty;
 
-import io.smallrye.reactive.messaging.MutinyEmitter;
-import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
-
-import jakarta.annotation.PostConstruct;
-import jakarta.enterprise.inject.Instance;
-import jakarta.inject.Inject;
 import jakarta.inject.Singleton;
 
 @Singleton
-public class ReactiveMessagingEventPublisher implements EventPublisher {
-
-    private static final Logger logger = LoggerFactory.getLogger(ReactiveMessagingEventPublisher.class);
-
-    @Inject
-    ObjectMapper json;
-
-    @Inject
-    @Channel(PROCESS_INSTANCES_TOPIC_NAME)
-    @OnOverflow(Strategy.UNBOUNDED_BUFFER)
-    MutinyEmitter<String> processInstancesEventsEmitter;
-    private BiConsumer<MutinyEmitter<String>, Message<String>> processInstanceConsumer;
-
-    @Inject
-    @Channel(PROCESS_DEFINITIONS_TOPIC_NAME)
-    MutinyEmitter<String> processDefinitionEventsEmitter;
-    private BiConsumer<MutinyEmitter<String>, Message<String>> processDefinitionConsumer;
-
-    @Inject
-    @Channel(USER_TASK_INSTANCES_TOPIC_NAME)
-    MutinyEmitter<String> userTasksEventsEmitter;
-    private BiConsumer<MutinyEmitter<String>, Message<String>> userTaskConsumer;
-    @Inject
-    EventsRuntimeConfig eventsRuntimeConfig;
-
-    @Inject
-    Instance<MessageDecoratorProvider> decoratorProviderInstance;
-
-    private MessageDecoratorProvider decoratorProvider;
-
-    @PostConstruct
-    public void init() {
-        decoratorProvider = decoratorProviderInstance.isResolvable() ? decoratorProviderInstance.get() : null;
-        processInstanceConsumer = eventsRuntimeConfig.isProcessInstancesPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
-        processDefinitionConsumer = eventsRuntimeConfig.isProcessDefinitionPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
-        userTaskConsumer = eventsRuntimeConfig.isUserTasksPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
-    }
+@UnlessBuildProperty(name = "kogito.events.grouping", stringValue = "true", enableIfMissing = true)
+public class ReactiveMessagingEventPublisher extends AbstractMessagingEventPublisher {
 
     @Override
     public void publish(DataEvent<?> event) {
-
-        switch (event.getType()) {
-            case "ProcessDefinitionEvent":
-                if (eventsRuntimeConfig.isProcessDefinitionEventsEnabled()) {
-                    publishToTopic(processDefinitionConsumer, event, processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME);
-                }
-                break;
-            case "ProcessInstanceErrorDataEvent":
-            case "ProcessInstanceNodeDataEvent":
-            case "ProcessInstanceSLADataEvent":
-            case "ProcessInstanceStateDataEvent":
-            case "ProcessInstanceVariableDataEvent":
-                if (eventsRuntimeConfig.isProcessInstancesEventsEnabled()) {
-                    publishToTopic(processInstanceConsumer, event, processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME);
-                }
-                break;
-
-            case "UserTaskInstanceAssignmentDataEvent":
-            case "UserTaskInstanceAttachmentDataEvent":
-            case "UserTaskInstanceCommentDataEvent":
-            case "UserTaskInstanceDeadlineDataEvent":
-            case "UserTaskInstanceStateDataEvent":
-            case "UserTaskInstanceVariableDataEvent":
-                if (eventsRuntimeConfig.isUserTasksEventsEnabled()) {
-                    publishToTopic(userTaskConsumer, event, userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME);
-                }
-                break;
-            default:
-                logger.debug("Unknown type of event '{}', ignoring for this publisher", event.getType());
-        }
+        getConsumer(event).ifPresent(emitter -> publishToTopic(emitter, event));
     }
 
     @Override
@@ -124,49 +42,4 @@ public void publish(Collection<DataEvent<?>> events) {
         }
     }
 
-    protected void publishToTopic(BiConsumer<MutinyEmitter<String>, Message<String>> consumer, DataEvent<?> event, MutinyEmitter<String> emitter, String topic) {
-        logger.debug("About to publish event {} to topic {}", event, topic);
-        Message<String> message = null;
-        try {
-            String eventString = json.writeValueAsString(event);
-            logger.debug("Event payload '{}'", eventString);
-            message = decorateMessage(ContextAwareMessage.of(eventString));
-        } catch (Exception e) {
-            logger.error("Error while creating event to topic {} for event {}", topic, event);
-        }
-        if (message != null) {
-            consumer.accept(emitter, message);
-        }
-    }
-
-    protected CompletionStage<Void> onAck(Message<String> message) {
-        logger.debug("Successfully published message {}", message.getPayload());
-        return CompletableFuture.completedFuture(null);
-    }
-
-    protected CompletionStage<Void> onNack(Throwable reason, Message<String> message) {
-        logger.error("Error while publishing message {}", message, reason);
-        return CompletableFuture.completedFuture(null);
-    }
-
-    protected Message<String> decorateMessage(Message<String> message) {
-        return decoratorProvider != null ? decoratorProvider.decorate(message) : message;
-    }
-
-    private class BlockingMessageEmitter implements BiConsumer<MutinyEmitter<String>, Message<String>> {
-        @Override
-        public void accept(MutinyEmitter<String> emitter, Message<String> message) {
-            emitter.sendMessageAndAwait(message);
-            logger.debug("Successfully published message {}", message.getPayload());
-        }
-    }
-
-    private class ReactiveMessageEmitter implements BiConsumer<MutinyEmitter<String>, Message<String>> {
-        @Override
-        public void accept(MutinyEmitter<String> emitter, Message<String> message) {
-            emitter.sendMessageAndForget(message
-                    .withAck(() -> onAck(message))
-                    .withNack(reason -> onNack(reason, message)));
-        }
-    }
 }
diff --git a/quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java b/quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java
new file mode 100644
index 00000000000..0d784d3a53b
--- /dev/null
+++ b/quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kie.kogito.events.process;
+
+import java.util.*;
+
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
+import org.kie.kogito.event.DataEvent;
+import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
+import org.kie.kogito.event.process.ProcessInstanceDataEvent;
+import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
+import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
+import org.kie.kogito.events.config.EventsRuntimeConfig;
+import org.kie.kogito.events.process.AbstractMessagingEventPublisher.AbstractMessageEmitter;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.smallrye.reactive.messaging.MutinyEmitter;
+
+import jakarta.enterprise.inject.Instance;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+@SuppressWarnings("unchecked")
+public class GroupingMessagingEventPublisherTest {
+
+    @Mock
+    private ObjectMapper json;
+
+    @Mock
+    private MutinyEmitter<String> processInstancesEventsEmitter;
+
+    @Mock
+    private MutinyEmitter<String> processDefinitionEventsEmitter;
+
+    @Mock
+    private MutinyEmitter<String> userTasksEventsEmitter;
+
+    @Mock
+    private EventsRuntimeConfig eventsRuntimeConfig;
+
+    @Mock
+    private MessageDecoratorProvider decoratorProvider;
+
+    @Mock
+    private Message<String> decoratedMessage;
+
+    @Mock
+    private Instance<MessageDecoratorProvider> decoratorProviderInstance;
+
+    @Mock
+    private AbstractMessagingEventPublisher.AbstractMessageEmitter processInstanceConsumer;
+
+    @Mock
+    private AbstractMessagingEventPublisher.AbstractMessageEmitter userTaskConsumer;
+
+    @Mock
+    private AbstractMessagingEventPublisher.AbstractMessageEmitter processDefinitionConsumer;
+
+    @Spy
+    @InjectMocks
+    private GroupingMessagingEventPublisher groupingMessagingEventPublisher;
+
+    @Spy
+    @InjectMocks
+    private ReactiveMessagingEventPublisher reactiveMessagingEventPublisher;
+
+    @BeforeEach
+    void setUp() {
+        MockitoAnnotations.openMocks(this);
+
+        when(decoratorProviderInstance.isResolvable()).thenReturn(true);
+        when(decoratorProviderInstance.get()).thenReturn(decoratorProvider);
+
+        when(eventsRuntimeConfig.isProcessInstancesPropagateError()).thenReturn(false);
+        when(eventsRuntimeConfig.isProcessDefinitionPropagateError()).thenReturn(false);
+        when(eventsRuntimeConfig.isUserTasksPropagateError()).thenReturn(false);
+
+        when(eventsRuntimeConfig.isProcessInstancesEventsEnabled()).thenReturn(true);
+        when(eventsRuntimeConfig.isUserTasksEventsEnabled()).thenReturn(true);
+    }
+
+    @Test
+    public void testGroupingMessagingEventPublisher_publish() throws Exception {
+        DataEvent<String> event = mock(DataEvent.class);
+        when(event.getType()).thenReturn("ProcessInstanceErrorDataEvent");
+
+        // Test initialization
+        groupingMessagingEventPublisher.init();
+        when(decoratorProvider.decorate(any(Message.class))).thenReturn(decoratedMessage);
+
+        // Mock the message behavior
+        mockMessageForBothAckNack(decoratedMessage);
+
+        // Call method
+        groupingMessagingEventPublisher.publish(event);
+
+        // Verify that the consumer has been invoked
+        verify(processInstancesEventsEmitter).sendMessageAndForget(any());
+    }
+
+    @Test
+    public void testReactiveMessagingEventPublisher_publish() throws Exception {
+        DataEvent<String> event = mock(DataEvent.class);
+        when(event.getType()).thenReturn("ProcessInstanceErrorDataEvent");
+
+        // Test initialization
+        reactiveMessagingEventPublisher.init();
+        when(decoratorProvider.decorate(any(Message.class))).thenReturn(decoratedMessage);
+
+        // Mock the message behavior
+        mockMessageForBothAckNack(decoratedMessage);
+
+        // Call method
+        reactiveMessagingEventPublisher.publish(event);
+
+        // Verify that the consumer has been invoked
+        verify(processInstancesEventsEmitter).sendMessageAndForget(any());
+    }
+
+    @Test
+    public void testPublishGroupingByChannel() {
+        // Create mock events
+        DataEvent<String> processInstanceEvent = mock(ProcessInstanceDataEvent.class);
+        when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent");
+
+        DataEvent<String> userTaskEvent = mock(UserTaskInstanceDataEvent.class);
+        when(userTaskEvent.getType()).thenReturn("UserTaskInstanceStateDataEvent");
+
+        // Mock getConsumer() to return different emitters based on event type
+        doReturn(Optional.of(processInstanceConsumer)).when(groupingMessagingEventPublisher).getConsumer(processInstanceEvent);
+        doReturn(Optional.of(userTaskConsumer)).when(groupingMessagingEventPublisher).getConsumer(userTaskEvent);
+
+        // Create a collection of events with different types (ProcessInstance and UserTask)
+        Collection<DataEvent<?>> events = Arrays.asList(processInstanceEvent, userTaskEvent);
+
+        // Spy on the publisher's internal method to verify the calls
+        doNothing().when(groupingMessagingEventPublisher).publishToTopic(any(), any());
+
+        // Invoke the method to test
+        groupingMessagingEventPublisher.publish(events);
+
+        // Capture and verify that the correct emitter was used for each event
+        verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class));
+        verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), any(MultipleUserTaskInstanceDataEvent.class));
+    }
+
+    @Test
+    public void testPublishMultipleEventsGroupedByChannel() {
+        // Create multiple events of different types
+        DataEvent<String> processInstanceEvent1 = mock(ProcessInstanceDataEvent.class);
+        DataEvent<String> processInstanceEvent2 = mock(ProcessInstanceDataEvent.class);
+        DataEvent<String> userTaskEvent1 = mock(UserTaskInstanceDataEvent.class);
+        DataEvent<String> userTaskEvent2 = mock(UserTaskInstanceDataEvent.class);
+
+        when(processInstanceEvent1.getType()).thenReturn("ProcessInstanceStateDataEvent");
+        when(processInstanceEvent2.getType()).thenReturn("ProcessInstanceStateDataEvent");
+        when(userTaskEvent1.getType()).thenReturn("UserTaskInstanceStateDataEvent");
+        when(userTaskEvent2.getType()).thenReturn("UserTaskInstanceStateDataEvent");
+
+        // Mock getConsumer() to return corresponding emitters for event types
+        doReturn(Optional.of(processInstanceConsumer)).when(groupingMessagingEventPublisher).getConsumer(processInstanceEvent1);
+        doReturn(Optional.of(processInstanceConsumer)).when(groupingMessagingEventPublisher).getConsumer(processInstanceEvent2);
+        doReturn(Optional.of(userTaskConsumer)).when(groupingMessagingEventPublisher).getConsumer(userTaskEvent1);
+        doReturn(Optional.of(userTaskConsumer)).when(groupingMessagingEventPublisher).getConsumer(userTaskEvent2);
+
+        // Create a collection of events that would be grouped by channel
+        Collection<DataEvent<?>> events = Arrays.asList(processInstanceEvent1, processInstanceEvent2, userTaskEvent1, userTaskEvent2);
+
+        // Spy on the internal publishToTopic to verify grouping
+        doNothing().when(groupingMessagingEventPublisher).publishToTopic(any(), any());
+
+        // Invoke the method to test
+        groupingMessagingEventPublisher.publish(events);
+
+        // Verify that two grouped publishToTopic calls are made: one for processInstanceConsumer, one for userTaskConsumer
+        verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class));
+        verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), any(MultipleUserTaskInstanceDataEvent.class));
+
+        // Verify that the right number of events was grouped and passed to each emitter
+        ArgumentCaptor<MultipleProcessInstanceDataEvent> captorPI = ArgumentCaptor.forClass(MultipleProcessInstanceDataEvent.class);
+
+        verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), captorPI.capture());
+        MultipleProcessInstanceDataEvent groupedProcessInstanceEvents = captorPI.getValue();
+        assertEquals(2, groupedProcessInstanceEvents.getData().size()); // both processInstanceEvents are grouped
+
+        ArgumentCaptor<MultipleUserTaskInstanceDataEvent> captorUT = ArgumentCaptor.forClass(MultipleUserTaskInstanceDataEvent.class);
+
+        verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), captorUT.capture());
+        MultipleUserTaskInstanceDataEvent groupedUserTaskEvents = captorUT.getValue();
+        assertEquals(2, groupedUserTaskEvents.getData().size()); // both userTaskEvents are grouped
+    }
+
+    @Test
+    public void testPublishEmptyEventsCollection() {
+        Collection<DataEvent<?>> events = Collections.emptyList();
+
+        // Spy on the internal publishToTopic to verify no calls are made
+        doNothing().when(groupingMessagingEventPublisher).publishToTopic(any(), any());
+
+        groupingMessagingEventPublisher.publish(events);
+
+        // Verify that publishToTopic is never called
+        verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), anyCollection());
+    }
+
+    @Test
+    public void testNoConsumersFound() {
+        DataEvent<String> processInstanceEvent = mock(DataEvent.class);
+        when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent");
+
+        DataEvent<String> userTaskEvent = mock(DataEvent.class);
+        when(userTaskEvent.getType()).thenReturn("UserTaskInstanceStateDataEvent");
+
+        // Mock getConsumer() to return empty optionals (no consumers found)
+        doReturn(Optional.empty()).when(groupingMessagingEventPublisher).getConsumer(processInstanceEvent);
+        doReturn(Optional.empty()).when(groupingMessagingEventPublisher).getConsumer(userTaskEvent);
+
+        // Create a collection of events
+        Collection<DataEvent<?>> events = Arrays.asList(processInstanceEvent, userTaskEvent);
+
+        // Spy on the publisher's internal method to verify no calls are made
+        doNothing().when(groupingMessagingEventPublisher).publishToTopic(any(), any());
+
+        // Invoke the method to test
+        groupingMessagingEventPublisher.publish(events);
+
+        // Verify that publishToTopic is never called since no consumers were found
+        verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), anyCollection());
+    }
+
+    @Test
+    void testPublishToTopic_ExceptionHandling() throws Exception {
+        DataEvent<String> event = mock(DataEvent.class);
+        when(event.getType()).thenReturn("ProcessInstanceErrorDataEvent");
+
+        groupingMessagingEventPublisher.init();
+        when(decoratorProvider.decorate(any(Message.class))).thenThrow(new RuntimeException("Serialization error"));
+
+        // Mock the message behavior
+        mockMessageForBothAckNack(decoratedMessage);
+
+        // Call method
+        groupingMessagingEventPublisher.publish(event);
+
+        // Check that emitter.sendMessageAndForget was never called
+        verify(processInstancesEventsEmitter, never()).sendMessageAndForget(any());
+    }
+
+    @Test
+    public void testPublishUnsupportedEventType() {
+        DataEvent<String> unsupportedEvent = mock(DataEvent.class);
+        when(unsupportedEvent.getType()).thenReturn("UnsupportedEvent");
+
+        doReturn(Optional.empty()).when(groupingMessagingEventPublisher).getConsumer(unsupportedEvent);
+
+        Collection<DataEvent<?>> events = Collections.singletonList(unsupportedEvent);
+
+        groupingMessagingEventPublisher.publish(events);
+
+        // Verify no publishing occurred since no consumer exists for unsupported event
+        verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), anyCollection());
+    }
+
+    @Test
+    public void testEventsDisabledInConfig() {
+        DataEvent<String> processInstanceEvent = mock(DataEvent.class);
+        when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent");
+
+        DataEvent<String> userTaskEvent = mock(DataEvent.class);
+        when(userTaskEvent.getType()).thenReturn("UserTaskInstanceStateDataEvent");
+
+        // Disable process and user task events in the config
+        when(eventsRuntimeConfig.isProcessInstancesEventsEnabled()).thenReturn(false);
+        when(eventsRuntimeConfig.isUserTasksEventsEnabled()).thenReturn(false);
+
+        Collection<DataEvent<?>> events = Arrays.asList(processInstanceEvent, userTaskEvent);
+
+        groupingMessagingEventPublisher.publish(events);
+
+        // Verify no publishing occurred since events are disabled
+        verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), anyCollection());
+    }
+
+    @Test
+    public void testNullEventInCollection() {
+        DataEvent<String> validEvent = mock(ProcessInstanceDataEvent.class);
+        when(validEvent.getType()).thenReturn("ProcessInstanceStateDataEvent");
+
+        Collection<DataEvent<?>> events = Arrays.asList(validEvent, null); // One valid event and one null event
+
+        // Return a mock consumer for the valid event
+        doReturn(Optional.of(processInstanceConsumer)).when(groupingMessagingEventPublisher).getConsumer(validEvent);
+
+        // Call the method
+        groupingMessagingEventPublisher.publish(events);
+
+        // Verify the valid event is processed
+        verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class));
+    }
+
+    @Test
+    public void testDecorateMessage() {
+        Message<String> rawMessage = mock(Message.class);
+        when(decoratorProvider.decorate(rawMessage)).thenReturn(decoratedMessage);
+
+        reactiveMessagingEventPublisher.init();
+
+        Message<String> result = reactiveMessagingEventPublisher.decorateMessage(rawMessage);
+        assertEquals(decoratedMessage, result);
+
+        verify(decoratorProvider).decorate(rawMessage);
+    }
+
+    @Test
+    public void testPublishToTopicWithDecorator() throws Exception {
+        Object event = new Object();
+        when(json.writeValueAsString(event)).thenReturn("eventString");
+
+        reactiveMessagingEventPublisher.init();
+
+        // Mock the message emitter
+        AbstractMessagingEventPublisher.AbstractMessageEmitter mockEmitter = mock(AbstractMessagingEventPublisher.AbstractMessageEmitter.class);
+
+        // Ensure decorated message is used
+        when(decoratorProvider.decorate(any(Message.class))).thenReturn(decoratedMessage);
+
+        // Spy on the reactiveMessagingEventPublisher to allow publishToTopic
+        reactiveMessagingEventPublisher.publishToTopic(mockEmitter, event);
+
+        // Verify that the message was decorated and sent
+        verify(decoratorProvider).decorate(any(Message.class));
+        verify(mockEmitter).accept(decoratedMessage);
+    }
+
+    @Test
+    public void testPublishWithMultipleEventTypesSomeWithoutConsumers() {
+        DataEvent<String> processInstanceEvent = mock(ProcessInstanceDataEvent.class);
+        when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent");
+
+        DataEvent<String> unsupportedEvent = mock(DataEvent.class);
+        when(unsupportedEvent.getType()).thenReturn("UnsupportedEvent");
+
+        doReturn(Optional.of(processInstanceConsumer)).when(groupingMessagingEventPublisher).getConsumer(processInstanceEvent);
+        doReturn(Optional.empty()).when(groupingMessagingEventPublisher).getConsumer(unsupportedEvent);
+
+        Collection<DataEvent<?>> events = Arrays.asList(processInstanceEvent, unsupportedEvent);
+
+        groupingMessagingEventPublisher.publish(events);
+
+        // Ensure that only the supported event was published
+        verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class));
+        verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), eq(Collections.singletonList(unsupportedEvent)));
+    }
+
+    private void mockMessageForBothAckNack(Message<String> message) {
+        when(message.withAck(any())).thenReturn(message);
+        when(message.withNack(any())).thenReturn(message);
+    }
+}