Skip to content

Commit

Permalink
Added option to configure MessageHandlerInterceptors on EventProcesso…
Browse files Browse the repository at this point in the history
…rs via Configuration API

Until now, the only way to do so, was by configuring a custom EventProcessor builder. The two newly added methods on EventHandlingConfiguration allow for easier configuration of the interceptors.

Fixes issue AxonFramework#243
  • Loading branch information
abuijze committed Jan 23, 2017
1 parent a4d4e9d commit 7b79fc4
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.axonframework.eventhandling.*;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;

import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

Expand All @@ -40,6 +42,8 @@
public class EventHandlingConfiguration implements ModuleConfiguration {

private final List<Component<Object>> eventHandlers = new ArrayList<>();
private final List<BiFunction<Configuration, String, MessageHandlerInterceptor<? super EventMessage<?>>>> defaultHandlerInterceptors = new ArrayList<>();
private final Map<String, List<Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>>>> handlerInterceptors = new HashMap<>();
private final Map<String, EventProcessorBuilder> eventProcessors = new HashMap<>();
private final List<ProcessorSelector> selectors = new ArrayList<>();
private final List<EventProcessor> initializedProcessors = new ArrayList<>();
Expand Down Expand Up @@ -71,19 +75,42 @@ private SubscribingEventProcessor defaultEventProcessor(Configuration conf, Stri

private SubscribingEventProcessor subscribingEventProcessor(Configuration conf, String name, List<?> eh,
Function<Configuration, SubscribableMessageSource<? extends EventMessage<?>>> messageSource) {
SubscribingEventProcessor processor = new SubscribingEventProcessor(name,
new SimpleEventHandlerInvoker(eh,
conf.parameterResolverFactory(),
conf.getComponent(
ListenerInvocationErrorHandler.class,
LoggingErrorHandler::new)),
messageSource.apply(conf),
DirectEventProcessingStrategy.INSTANCE,
PropagatingErrorHandler.INSTANCE,
conf.messageMonitor(SubscribingEventProcessor.class,
name));
processor.registerInterceptor(new CorrelationDataInterceptor<>(conf.correlationDataProviders()));
return processor;
return new SubscribingEventProcessor(name,
new SimpleEventHandlerInvoker(eh,
conf.parameterResolverFactory(),
conf.getComponent(
ListenerInvocationErrorHandler.class,
LoggingErrorHandler::new)),
messageSource.apply(conf),
DirectEventProcessingStrategy.INSTANCE,
PropagatingErrorHandler.INSTANCE,
conf.messageMonitor(SubscribingEventProcessor.class,
name));
}

/**
* Returns the list of Message Handler Interceptors registered for the given {@code processorName}.
*
* @param configuration The main configuration
* @param processorName The name of the processor to retrieve interceptors for
* @return a list of Interceptors
* @see EventHandlingConfiguration#registerHandlerInterceptor(BiFunction)
* @see EventHandlingConfiguration#registerHandlerInterceptor(String, Function)
*/
public List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptorsFor(Configuration configuration,
String processorName) {
List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptors = new ArrayList<>();
defaultHandlerInterceptors.stream()
.map(f -> f.apply(configuration, processorName))
.filter(Objects::nonNull)
.forEach(interceptors::add);
handlerInterceptors.getOrDefault(processorName, Collections.emptyList())
.stream()
.map(f -> f.apply(configuration))
.filter(Objects::nonNull)
.forEach(interceptors::add);
interceptors.add(new CorrelationDataInterceptor<>(configuration.correlationDataProviders()));
return interceptors;
}

/**
Expand Down Expand Up @@ -129,21 +156,15 @@ public EventHandlingConfiguration registerTrackingProcessor(String name, Functio

private EventProcessor buildTrackingEventProcessor(Configuration conf, String name, List<?> handlers,
Function<Configuration, StreamableMessageSource<TrackedEventMessage<?>>> source) {
TrackingEventProcessor processor = new TrackingEventProcessor(name, new SimpleEventHandlerInvoker(handlers,
conf.parameterResolverFactory(),
conf.getComponent(
ListenerInvocationErrorHandler.class,
LoggingErrorHandler::new)),
source.apply(conf),
conf.getComponent(TokenStore.class,
InMemoryTokenStore::new),
conf.getComponent(TransactionManager.class,
NoTransactionManager::instance),
conf.messageMonitor(EventProcessor.class,
name));
CorrelationDataInterceptor<EventMessage<?>> interceptor = new CorrelationDataInterceptor<>(conf.correlationDataProviders());
processor.registerInterceptor(interceptor);
return processor;
return new TrackingEventProcessor(name, new SimpleEventHandlerInvoker(handlers,
conf.parameterResolverFactory(),
conf.getComponent(
ListenerInvocationErrorHandler.class,
LoggingErrorHandler::new)),
source.apply(conf),
conf.getComponent(TokenStore.class, InMemoryTokenStore::new),
conf.getComponent(TransactionManager.class, NoTransactionManager::instance),
conf.messageMonitor(EventProcessor.class, name));
}

/**
Expand Down Expand Up @@ -195,6 +216,45 @@ public EventHandlingConfiguration byDefaultAssignTo(String name) {
return this;
}

/**
* Register the given {@code interceptorBuilder} to build an Message Handling Interceptor for the Event Processor
* with given {@code processorName}.
* <p>
* The {@code interceptorBuilder} may return {@code null}, in which case the return value is ignored.
* <p>
* Note that a CorrelationDataInterceptor is registered by default. To change correlation data attached to messages,
* see {@link Configurer#configureCorrelationDataProviders(Function)}.
*
* @param processorName The name of the processor to register the interceptor on
* @param interceptorBuilder The function providing the interceptor to register, or {@code null}
* @return this EventHandlingConfiguration instance for further configuration
*/
public EventHandlingConfiguration registerHandlerInterceptor(String processorName,
Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>> interceptorBuilder) {
handlerInterceptors
.computeIfAbsent(processorName, k -> new ArrayList<>())
.add(interceptorBuilder);
return this;
}

/**
* Register the given {@code interceptorBuilder} to build an Message Handling Interceptor for Event Processors
* created in this configuration.
* <p>
* The {@code interceptorBuilder} is invoked once for each processor created, and may return {@code null}, in which
* case the return value is ignored.
* <p>
* Note that a CorrelationDataInterceptor is registered by default. To change correlation data attached to messages,
* see {@link Configurer#configureCorrelationDataProviders(Function)}.
*
* @param interceptorBuilder The builder function that provides an interceptor for each available processor
* @return this EventHandlingConfiguration instance for further configuration
*/
public EventHandlingConfiguration registerHandlerInterceptor(BiFunction<Configuration, String, MessageHandlerInterceptor<? super EventMessage<?>>> interceptorBuilder) {
defaultHandlerInterceptors.add(interceptorBuilder);
return this;
}

/**
* Registers a function that defines the Event Processor name to assign Event Handler beans to when no other rule
* matches.
Expand Down Expand Up @@ -268,9 +328,12 @@ public void initialize(Configuration config) {
assignments.computeIfAbsent(processor, k -> new ArrayList<>()).add(handler);
});

assignments.forEach((name, handlers) -> initializedProcessors
.add(eventProcessors.getOrDefault(name, defaultEventProcessorBuilder)
.createEventProcessor(config, name, handlers)));
assignments.forEach((name, handlers) -> {
EventProcessor eventProcessor = eventProcessors.getOrDefault(name, defaultEventProcessorBuilder)
.createEventProcessor(config, name, handlers);
interceptorsFor(config, name).forEach(eventProcessor::registerInterceptor);
initializedProcessors.add(eventProcessor);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
public abstract class AbstractEventProcessor implements EventProcessor {

private final Logger logger = LoggerFactory.getLogger(getClass());
private final Set<MessageHandlerInterceptor<EventMessage<?>>> interceptors = new CopyOnWriteArraySet<>();
private final Set<MessageHandlerInterceptor<? super EventMessage<?>>> interceptors = new CopyOnWriteArraySet<>();
private final String name;
private final EventHandlerInvoker eventHandlerInvoker;
private final RollbackConfiguration rollbackConfiguration;
Expand Down Expand Up @@ -90,7 +90,7 @@ public String getName() {
}

@Override
public Registration registerInterceptor(MessageHandlerInterceptor<EventMessage<?>> interceptor) {
public Registration registerInterceptor(MessageHandlerInterceptor<? super EventMessage<?>> interceptor) {
interceptors.add(interceptor);
return () -> interceptors.remove(interceptor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public interface EventProcessor {
* @return a handle to unregister the {@code interceptor}. When unregistered the {@code interceptor} will
* no longer receive events from this event processor.
*/
Registration registerInterceptor(MessageHandlerInterceptor<EventMessage<?>> interceptor);
Registration registerInterceptor(MessageHandlerInterceptor<? super EventMessage<?>> interceptor);

/**
* Start processing events.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,29 @@
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.messaging.InterceptorChain;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

public class EventHandlingConfigurationTest {

private Configuration configuration;

@Before
public void setUp() throws Exception {
configuration = mock(Configuration.class);
configuration = DefaultConfigurer.defaultConfiguration().buildConfiguration();
}

@Test
Expand Down Expand Up @@ -92,14 +95,43 @@ public void testAssignmentRulesOverrideThoseWithLowerPriority() {

assertEquals(3, processors.size());
assertTrue(processors.get("java.util.concurrent2").getEventHandlers().contains("concurrent"));
assertTrue(processors.get("java.util.concurrent2").getInterceptors().get(0) instanceof CorrelationDataInterceptor);
assertTrue(processors.get("java.util.concurrent").getEventHandlers().contains(map));
assertTrue(processors.get("java.util.concurrent").getInterceptors().get(0) instanceof CorrelationDataInterceptor);
assertTrue(processors.get("java.lang").getEventHandlers().contains(""));
assertTrue(processors.get("java.lang").getInterceptors().get(0) instanceof CorrelationDataInterceptor);
}

@Test
public void testAssignInterceptors() {
Map<String, StubEventProcessor> processors = new HashMap<>();
EventHandlingConfiguration module = new EventHandlingConfiguration()
.usingTrackingProcessors()
.registerEventProcessor("default", (config, name, handlers) -> {
StubEventProcessor processor = new StubEventProcessor(name, handlers);
processors.put(name, processor);
return processor;
});
module.byDefaultAssignTo("default");
module.assignHandlersMatching("concurrent", 1, "concurrent"::equals);
module.registerEventHandler(c -> new Object()); // --> java.lang
module.registerEventHandler(c -> "concurrent"); // --> java.util.concurrent2

StubInterceptor interceptor1 = new StubInterceptor();
StubInterceptor interceptor2 = new StubInterceptor();
module.registerHandlerInterceptor("default", c -> interceptor1);
module.registerHandlerInterceptor((c, n) -> interceptor2);
module.initialize(configuration);

// CorrelationDataInterceptor is automatically configured
assertEquals(3, processors.get("default").getInterceptors().size());
}

private static class StubEventProcessor implements EventProcessor {

private final String name;
private final List<?> eventHandlers;
private final List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptors = new ArrayList<>();

public StubEventProcessor(String name, List<?> eventHandlers) {
this.name = name;
Expand All @@ -116,8 +148,9 @@ public List<?> getEventHandlers() {
}

@Override
public Registration registerInterceptor(MessageHandlerInterceptor<EventMessage<?>> interceptor) {
return () -> true;
public Registration registerInterceptor(MessageHandlerInterceptor<? super EventMessage<?>> interceptor) {
interceptors.add(interceptor);
return () -> interceptors.remove(interceptor);
}

@Override
Expand All @@ -129,6 +162,10 @@ public void start() {
public void shutDown() {

}

public List<MessageHandlerInterceptor<? super EventMessage<?>>> getInterceptors() {
return interceptors;
}
}

@ProcessingGroup("processingGroup")
Expand All @@ -138,4 +175,11 @@ public static class AnnotatedBean {
public static class AnnotatedBeanSubclass extends AnnotatedBean {

}

private static class StubInterceptor implements MessageHandlerInterceptor<EventMessage<?>> {
@Override
public Object handle(UnitOfWork<? extends EventMessage<?>> unitOfWork, InterceptorChain interceptorChain) throws Exception {
return interceptorChain.proceed();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class EventHandlerRegistrar implements InitializingBean, SmartLifecycle {
private final AxonConfiguration axonConfiguration;
private final EventHandlingConfiguration delegate;
private volatile boolean running = false;
private volatile boolean initialized;

/**
* Initialize the registrar to register beans discovered with the given {@code eventHandlingConfiguration}.
Expand Down Expand Up @@ -62,6 +63,10 @@ public void stop(Runnable callback) {

@Override
public void start() {
if (!initialized) {
initialized = true;
delegate.initialize(axonConfiguration);
}
delegate.start();
running = true;
}
Expand All @@ -84,6 +89,5 @@ public int getPhase() {

@Override
public void afterPropertiesSet() throws Exception {
delegate.initialize(axonConfiguration);
}
}

0 comments on commit 7b79fc4

Please sign in to comment.