Skip to content

Commit

Permalink
JdbcTokenStore should only commit connections when they are not in au…
Browse files Browse the repository at this point in the history
…to-commit mode

Before committing a transaction on a connection, the JdbcTokenStore now checks if the connection is in auto-commit mode. If so, the commit() is skipped.

Fixes issue AxonFramework#250
  • Loading branch information
abuijze committed Jan 23, 2017
1 parent ba2966a commit 4fb23a7
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
import java.time.temporal.TemporalAmount;

import static java.lang.String.format;
import static org.axonframework.common.jdbc.JdbcUtils.executeQuery;
import static org.axonframework.common.jdbc.JdbcUtils.executeUpdates;
import static org.axonframework.common.jdbc.JdbcUtils.*;

/**
* Implementation of a token store that uses JDBC to save and load tokens. Before using this store make sure the
Expand Down Expand Up @@ -96,42 +95,75 @@ public JdbcTokenStore(ConnectionProvider connectionProvider, Serializer serializ
* @throws EventStoreException when an error occurs executing SQL statements
*/
public void createSchema(TokenTableFactory schemaFactory) {
executeUpdates(getConnection(), e -> {
throw new JdbcException("Failed to create token tables", e);
}, connection -> schemaFactory.createTable(connection, schema));
Connection c = getConnection();
try {
executeUpdates(c, e -> {
throw new JdbcException("Failed to create token tables", e);
}, connection -> schemaFactory.createTable(connection, schema));
} finally {
closeQuietly(c);
}
}

@Override
public void storeToken(TrackingToken token, String processorName, int segment) throws UnableToClaimTokenException {
Connection connection = getConnection();
executeQuery(connection, c -> selectForUpdate(c, processorName, segment), resultSet -> {
insertOrUpdateToken(resultSet, token, processorName, segment);
connection.commit();
return null;
}, e -> new JdbcException(
format("Could not store token [%s] for processor [%s] and segment [%d]", token, processorName, segment),
e));
try {
executeQuery(connection,
c -> selectForUpdate(c, processorName, segment),
resultSet -> {
insertOrUpdateToken(resultSet, token, processorName, segment);
if (!connection.getAutoCommit()) {
connection.commit();
}
return null;
},
e -> new JdbcException(format("Could not store token [%s] for processor [%s] and segment [%d]",
token, processorName, segment), e));
} finally {
closeQuietly(connection);
}
}

@Override
public TrackingToken fetchToken(String processorName, int segment) throws UnableToClaimTokenException {
Connection connection = getConnection();
return executeQuery(connection, c -> selectForUpdate(c, processorName, segment), resultSet -> {
TrackingToken result = loadOrInsertToken(resultSet, processorName, segment);
connection.commit();
return result;
}, e -> new JdbcException(
format("Could not load token for processor [%s] and segment [%d]", processorName, segment), e));
try {
return executeQuery(connection, c -> selectForUpdate(c, processorName, segment), resultSet -> {
TrackingToken result = loadOrInsertToken(resultSet, processorName, segment);
if (!connection.getAutoCommit()) {
connection.commit();
}
return result;
}, e -> new JdbcException(
format("Could not load token for processor [%s] and segment [%d]", processorName, segment), e));
} finally {
closeQuietly(connection);
}
}

@Override
public void releaseClaim(String processorName, int segment) {
int[] result = executeUpdates(getConnection(), e -> new JdbcException(
format("Could not load token for processor [%s] and segment " + "[%d]",
processorName, segment), e),
connection -> releaseClaim(connection, processorName, segment));
if (result[0] < 1) {
logger.warn("Releasing claim of token {}/{} failed. It was owned by another node.", processorName, segment);
Connection connection = getConnection();
try {
int[] result = executeUpdates(connection, e -> {
throw new JdbcException(
format("Could not load token for processor [%s] and segment " + "[%d]",
processorName, segment), e);
},
c -> releaseClaim(c, processorName, segment));
try {
if (!connection.isClosed() && !connection.getAutoCommit()) {
connection.commit();
}
} catch (SQLException e) {
// ignore
}
if (result[0] < 1) {
logger.warn("Releasing claim of token {}/{} failed. It was owned by another node.", processorName, segment);
}
} finally {
closeQuietly(connection);
}
}

Expand Down Expand Up @@ -275,9 +307,9 @@ protected AbstractTokenEntry<?> readTokenEntry(ResultSet resultSet) throws SQLEx
* Creates a new {@link PreparedStatement} to release the current claim this node has on a token belonging to a
* processor with given {@code processorName} and {@code segment}.
*
* @param connection the connection that should be used to create a {@link PreparedStatement}
* @param connection the connection that should be used to create a {@link PreparedStatement}
* @param processorName the name of the processor for which to release this node's claim
* @param segment the segment of the processor for which to release this node's claim
* @param segment the segment of the processor for which to release this node's claim
* @return a {@link PreparedStatement} that will release the claim this node has on the token entry
* @throws SQLException if the statement to release a claim cannot be created
*/
Expand All @@ -293,21 +325,20 @@ protected PreparedStatement releaseClaim(Connection connection, String processor
preparedStatement.setString(3, processorName);
preparedStatement.setInt(4, segment);
preparedStatement.setString(5, nodeId);
connection.commit();
return preparedStatement;
}

/**
* Returns the serialized token data from the given {@code resultSet} at given {@code columnName}.
*
* @param resultSet the result set to get serialized data from
* @param resultSet the result set to get serialized data from
* @param columnName the name of the column containing the serialized token
* @param <T> the type of data to return
* @param <T> the type of data to return
* @return the serialized data of the token
* @throws SQLException if the token cannot be read from the entry
*/
@SuppressWarnings("unchecked")
protected <T> T readSerializedData(ResultSet resultSet, String columnName) throws SQLException {
protected <T> T readSerializedData(ResultSet resultSet, String columnName) throws SQLException {
if (byte[].class.equals(contentType)) {
return (T) resultSet.getBytes(columnName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ public void testClaimAndUpdateToken() throws Exception {
transactionManager.executeInTransaction(() -> assertEquals(token, tokenStore.fetchToken("test", 0)));
}

@Test
public void testClaimAndUpdateTokenWithoutTransaction() throws Exception {
assertNull(tokenStore.fetchToken("test", 0));
TrackingToken token = new GlobalSequenceTrackingToken(1L);
tokenStore.storeToken(token, "test", 0);
assertEquals(token, tokenStore.fetchToken("test", 0));
}

@Test
public void testClaimTokenConcurrently() {
transactionManager.executeInTransaction(() -> assertNull(tokenStore.fetchToken("concurrent", 0)));
Expand Down

0 comments on commit 4fb23a7

Please sign in to comment.