Skip to content

Commit

Permalink
Merge branch 'axon-3.0.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
abuijze committed Mar 10, 2017
2 parents a6f2104 + 64a73aa commit 82e1347
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 68 deletions.
40 changes: 33 additions & 7 deletions core/src/main/java/org/axonframework/config/SagaConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessor;
Expand All @@ -12,7 +13,11 @@
import org.axonframework.eventhandling.saga.repository.inmemory.InMemorySagaStore;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

/**
Expand All @@ -25,6 +30,7 @@ public class SagaConfiguration<S> implements ModuleConfiguration {
private final Component<AnnotatedSagaManager<S>> sagaManager;
private final Component<SagaRepository<S>> sagaRepository;
private final Component<SagaStore<? super S>> sagaStore;
private final List<Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>>> handlerInterceptors = new ArrayList<>();
private Configuration config;

/**
Expand All @@ -51,12 +57,16 @@ public static <S> SagaConfiguration<S> subscribingSagaManager(Class<S> sagaType)
*/
public static <S> SagaConfiguration<S> trackingSagaManager(Class<S> sagaType) {
SagaConfiguration<S> configuration = new SagaConfiguration<>(sagaType);
configuration.processor.update(c -> new TrackingEventProcessor(
sagaType.getSimpleName() + "Processor",
configuration.sagaManager.get(),
c.eventBus(),
c.getComponent(TokenStore.class, InMemoryTokenStore::new),
c.getComponent(TransactionManager.class, NoTransactionManager::instance)));
configuration.processor.update(c -> {
TrackingEventProcessor processor = new TrackingEventProcessor(
sagaType.getSimpleName() + "Processor",
configuration.sagaManager.get(),
c.eventBus(),
c.getComponent(TokenStore.class, InMemoryTokenStore::new),
c.getComponent(TransactionManager.class, NoTransactionManager::instance));
processor.registerInterceptor(new CorrelationDataInterceptor<>(c.correlationDataProviders()));
return processor;
});
return configuration;
}

Expand All @@ -72,7 +82,11 @@ private SagaConfiguration(Class<S> sagaType) {
sagaManager = new Component<>(() -> config, managerName, c -> new AnnotatedSagaManager<>(sagaType, sagaRepository.get(),
c.parameterResolverFactory()));
processor = new Component<>(() -> config, processorName,
c -> new SubscribingEventProcessor(managerName, sagaManager.get(), c.eventBus()));
c -> {
SubscribingEventProcessor processor = new SubscribingEventProcessor(managerName, sagaManager.get(), c.eventBus());
processor.registerInterceptor(new CorrelationDataInterceptor<>(c.correlationDataProviders()));
return processor;
});
}

/**
Expand All @@ -89,9 +103,21 @@ public SagaConfiguration<S> configureSagaStore(Function<Configuration, SagaStore
return this;
}

public SagaConfiguration<S> registerHandlerInterceptor(Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>> handlerInterceptor) {
if (config != null) {
processor.get().registerInterceptor(handlerInterceptor.apply(config));
} else {
handlerInterceptors.add(handlerInterceptor);
}
return this;
}

@Override
public void initialize(Configuration config) {
this.config = config;
for (Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>> handlerInterceptor : handlerInterceptors) {
processor.get().registerInterceptor(handlerInterceptor.apply(config));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,10 @@ protected List<? extends DomainEventData<?>> fetchDomainEvents(String aggregateI
int batchSize) {
Transaction tx = transactionManager.startTransaction();
try {
return executeQuery(getConnection(), connection -> {
PreparedStatement statement = readEventData(connection, aggregateIdentifier, firstSequenceNumber);
statement.setMaxRows(batchSize);
return statement;
}, listResults(this::getDomainEventData), e -> new EventStoreException(
format("Failed to read events for aggregate [%s]", aggregateIdentifier), e));
return executeQuery(getConnection(),
connection -> readEventData(connection, aggregateIdentifier, firstSequenceNumber, batchSize),
listResults(this::getDomainEventData), e -> new EventStoreException(
format("Failed to read events for aggregate [%s]", aggregateIdentifier), e));
} finally {
tx.commit();
}
Expand All @@ -264,20 +262,18 @@ protected List<? extends DomainEventData<?>> fetchDomainEvents(String aggregateI
protected List<? extends TrackedEventData<?>> fetchTrackedEvents(TrackingToken lastToken, int batchSize) {
Transaction tx = transactionManager.startTransaction();
try {
return executeQuery(getConnection(), connection -> {
PreparedStatement statement = readEventData(connection, lastToken);
statement.setMaxRows(batchSize);
return statement;
}, resultSet -> {
TrackingToken previousToken = lastToken;
List<TrackedEventData<?>> results = new ArrayList<>();
while (resultSet.next()) {
TrackedEventData<?> next = getTrackedEventData(resultSet, previousToken);
results.add(next);
previousToken = next.trackingToken();
}
return results;
}, e -> new EventStoreException(format("Failed to read events from token [%s]", lastToken), e));
return executeQuery(getConnection(),
connection -> readEventData(connection, lastToken, batchSize),
resultSet -> {
TrackingToken previousToken = lastToken;
List<TrackedEventData<?>> results = new ArrayList<>();
while (resultSet.next()) {
TrackedEventData<?> next = getTrackedEventData(resultSet, previousToken);
results.add(next);
previousToken = next.trackingToken();
}
return results;
}, e -> new EventStoreException(format("Failed to read events from token [%s]", lastToken), e));
} finally {
tx.commit();
}
Expand Down Expand Up @@ -309,15 +305,16 @@ protected Optional<? extends DomainEventData<?>> readSnapshotData(String aggrega
*/

protected PreparedStatement readEventData(Connection connection, String identifier,
long firstSequenceNumber) throws SQLException {
long firstSequenceNumber, int batchSize) throws SQLException {
Transaction tx = transactionManager.startTransaction();
try {
final String sql = "SELECT " + trackedEventFields() + " FROM " + schema.domainEventTable() + " WHERE " +
schema.aggregateIdentifierColumn() + " = ? AND " + schema.sequenceNumberColumn() + " >= ? ORDER BY " +
schema.sequenceNumberColumn() + " ASC";
schema.aggregateIdentifierColumn() + " = ? AND " + schema.sequenceNumberColumn() + " >= ? AND " +
schema.sequenceNumberColumn() + " < ? ORDER BY " + schema.sequenceNumberColumn() + " ASC";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, identifier);
preparedStatement.setLong(2, firstSequenceNumber);
preparedStatement.setLong(3, firstSequenceNumber + batchSize);
return preparedStatement;
} finally {
tx.commit();
Expand All @@ -334,12 +331,13 @@ protected PreparedStatement readEventData(Connection connection, String identifi
* @return A {@link PreparedStatement} that returns event entries for the given query when executed
* @throws SQLException when an exception occurs while creating the prepared statement
*/
protected PreparedStatement readEventData(Connection connection, TrackingToken lastToken) throws SQLException {
protected PreparedStatement readEventData(Connection connection, TrackingToken lastToken,
int batchSize) throws SQLException {
Assert.isTrue(lastToken == null || lastToken instanceof GapAwareTrackingToken,
() -> format("Token [%s] is of the wrong type", lastToken));
GapAwareTrackingToken previousToken = (GapAwareTrackingToken) lastToken;
String sql = "SELECT " + trackedEventFields() + " FROM " + schema.domainEventTable() + " WHERE " +
schema.globalIndexColumn() + " > ? ";
String sql = "SELECT " + trackedEventFields() + " FROM " + schema.domainEventTable() +
" WHERE (" + schema.globalIndexColumn() + " > ? AND " + schema.globalIndexColumn() + " <= ?) ";
List<Long> gaps;
if (previousToken != null) {
gaps = previousToken.getGaps().stream().collect(Collectors.toList());
Expand All @@ -352,9 +350,11 @@ protected PreparedStatement readEventData(Connection connection, TrackingToken l
}
sql += "ORDER BY " + schema.globalIndexColumn() + " ASC";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setLong(1, previousToken == null ? -1 : previousToken.getIndex());
long globalIndex = previousToken == null ? -1 : previousToken.getIndex();
preparedStatement.setLong(1, globalIndex);
preparedStatement.setLong(2, globalIndex + batchSize);
for (int i = 0; i < gaps.size(); i++) {
preparedStatement.setLong(i + 2, gaps.get(i));
preparedStatement.setLong(i + 3, gaps.get(i));
}
return preparedStatement;
}
Expand Down Expand Up @@ -413,7 +413,15 @@ protected TrackedEventData<?> getTrackedEventData(ResultSet resultSet,
* @throws SQLException when an exception occurs while creating the event data
*/
protected DomainEventData<?> getDomainEventData(ResultSet resultSet) throws SQLException {
return (DomainEventData<?>) getTrackedEventData(resultSet, null);
return new GenericDomainEventEntry<>(resultSet.getString(schema.typeColumn()),
resultSet.getString(schema.aggregateIdentifierColumn()),
resultSet.getLong(schema.sequenceNumberColumn()),
resultSet.getString(schema.eventIdentifierColumn()),
readTimeStamp(resultSet, schema.timestampColumn()),
resultSet.getString(schema.payloadTypeColumn()),
resultSet.getString(schema.payloadRevisionColumn()),
readPayload(resultSet, schema.payloadColumn()),
readPayload(resultSet, schema.metaDataColumn()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class SpringCloudCommandRouter implements CommandRouter {
private static final String SERIALIZED_COMMAND_FILTER = "serializedCommandFilter";
private static final String SERIALIZED_COMMAND_FILTER_CLASS_NAME = "serializedCommandFilterClassName";

private static final boolean OVERWRITE_MEMBERS = true;
private static final boolean DO_NOT_OVERWRITE_MEMBERS = false;

private final DiscoveryClient discoveryClient;
private final RoutingStrategy routingStrategy;
private final XStreamSerializer serializer = new XStreamSerializer();
Expand Down Expand Up @@ -83,7 +86,7 @@ public void updateMembership(int loadFactor, Predicate<CommandMessage<?>> comman
localServiceInstanceMetadata.put(SERIALIZED_COMMAND_FILTER, serializedCommandFilter.getData());
localServiceInstanceMetadata.put(SERIALIZED_COMMAND_FILTER_CLASS_NAME, serializedCommandFilter.getType().getName());

updateMemberships(Collections.singleton(localServiceInstance));
updateMemberships(Collections.singleton(localServiceInstance), DO_NOT_OVERWRITE_MEMBERS);
}

@EventListener
Expand All @@ -96,10 +99,23 @@ public void updateMemberships(HeartbeatEvent event) {
serviceInstance.getMetadata().containsKey(SERIALIZED_COMMAND_FILTER) &&
serviceInstance.getMetadata().containsKey(SERIALIZED_COMMAND_FILTER_CLASS_NAME))
.collect(Collectors.toSet());
updateMemberships(allServiceInstances);
updateMemberships(allServiceInstances, OVERWRITE_MEMBERS);
}

private void updateMemberships(Set<ServiceInstance> serviceInstances) {
/**
* Update the router memberships.
*
* @param serviceInstances Services instances to add
* @param overwrite True to evict members absent from serviceInstances
*/
private void updateMemberships(Set<ServiceInstance> serviceInstances, boolean overwrite) {
AtomicReference<ConsistentHash> updatedConsistentHash;
if (overwrite) {
updatedConsistentHash = new AtomicReference<>(new ConsistentHash());
} else {
updatedConsistentHash = atomicConsistentHash;
}

ServiceInstance localServiceInstance = discoveryClient.getLocalServiceInstance();
String localServiceId = localServiceInstance.getServiceId();
URI localServiceUri = localServiceInstance.getUri();
Expand All @@ -110,7 +126,7 @@ private void updateMemberships(Set<ServiceInstance> serviceInstances) {
boolean local = localServiceId.equals(serviceId) && localServiceUri.equals(serviceUri);

SimpleMember<URI> simpleMember = new SimpleMember<>(
serviceId.toUpperCase(),
serviceId.toUpperCase() + "[" + serviceUri + "]",
serviceUri,
local,
member -> atomicConsistentHash.updateAndGet(consistentHash -> consistentHash.without(member))
Expand All @@ -124,10 +140,12 @@ private void updateMemberships(Set<ServiceInstance> serviceInstances) {
serviceInstanceMetadata.get(SERIALIZED_COMMAND_FILTER_CLASS_NAME), null);
CommandNameFilter commandNameFilter = serializer.deserialize(serializedObject);

atomicConsistentHash.updateAndGet(
updatedConsistentHash.updateAndGet(
consistentHash -> consistentHash.with(simpleMember, loadFactor, commandNameFilter)
);
});

atomicConsistentHash.set(updatedConsistentHash.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.client.RestOperations;

@RestController
@RequestMapping("/spring-command-bus-connector")
Expand All @@ -37,12 +37,12 @@ public class SpringHttpCommandBusConnector implements CommandBusConnector {
private static final String COMMAND_BUS_CONNECTOR_PATH = "/spring-command-bus-connector/command";

private final CommandBus localCommandBus;
private final RestTemplate restTemplate;
private final RestOperations restOperations;
private final Serializer serializer;

public SpringHttpCommandBusConnector(CommandBus localCommandBus, RestTemplate restTemplate, Serializer serializer) {
public SpringHttpCommandBusConnector(CommandBus localCommandBus, RestOperations restOperations, Serializer serializer) {
this.localCommandBus = localCommandBus;
this.restTemplate = restTemplate;
this.restOperations = restOperations;
this.serializer = serializer;
}

Expand Down Expand Up @@ -86,11 +86,11 @@ private <C, R> ResponseEntity<SpringHttpReplyMessage<R>> sendRemotely(Member des
if (optionalEndpoint.isPresent()) {
URI endpointUri = optionalEndpoint.get();
URI destinationUri = buildURIForPath(endpointUri.getScheme(), endpointUri.getUserInfo(),
endpointUri.getHost(), endpointUri.getPort());
endpointUri.getHost(), endpointUri.getPort(), endpointUri.getPath());

SpringHttpDispatchMessage<C> dispatchMessage =
new SpringHttpDispatchMessage<>(commandMessage, serializer, expectReply);
return restTemplate.exchange(destinationUri, HttpMethod.POST, new HttpEntity<>(dispatchMessage),
return restOperations.exchange(destinationUri, HttpMethod.POST, new HttpEntity<>(dispatchMessage),
new ParameterizedTypeReference<SpringHttpReplyMessage<R>>(){});
} else {
String errorMessage = String.format("No Connection Endpoint found in Member [%s] for protocol [%s] " +
Expand All @@ -100,9 +100,9 @@ private <C, R> ResponseEntity<SpringHttpReplyMessage<R>> sendRemotely(Member des
}
}

private URI buildURIForPath(String scheme, String userInfo, String host, int port) {
private URI buildURIForPath(String scheme, String userInfo, String host, int port, String path) {
try {
return new URI(scheme, userInfo, host, port, COMMAND_BUS_CONNECTOR_PATH, null, null);
return new URI(scheme, userInfo, host, port, path + COMMAND_BUS_CONNECTOR_PATH, null, null);
} catch (URISyntaxException e) {
LOGGER.error("Failed to build URI for [{}{}{}], with user info [{}] and path [{}]",
scheme, host, port, userInfo, COMMAND_BUS_CONNECTOR_PATH, e);
Expand Down
Loading

0 comments on commit 82e1347

Please sign in to comment.