Skip to content

Commit

Permalink
Merge pull request #1358 from treasure-data/WM-620-notransaction
Browse files Browse the repository at this point in the history
[Master to v0_10] Guarantee single thread for migration for noTransaction migrations
  • Loading branch information
naritta authored Feb 14, 2020
2 parents 43c1d71 + d446cc5 commit ec9ba0e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public DatabaseMigrator(DBI dbi, DatabaseConfig config)
this(dbi, config.getType());
}

public DatabaseMigrator(DBI dbi, String databaseType)
DatabaseMigrator(DBI dbi, String databaseType)
{
this.dbi = dbi;
this.databaseType = databaseType;
Expand All @@ -79,70 +79,84 @@ public String getSchemaVersion()
}
}

public void migrateWithRetry()
{
final int maxRetry = 3;
for (int i = 0; i < maxRetry; i++) {
try {
logger.info("Database migration started");
migrate();
logger.info("Database migration successfully finished.");
return;
}
catch (RuntimeException re) {
logger.warn(re.toString());
if (i == maxRetry - 1) {
logger.error("Critical error!!. Database migration failed.");
}
else {
logger.warn("Database migration failed. Retry");
try {
Thread.sleep(30 * 1000);
}
catch (InterruptedException ie) {

}
}
}
}
logger.error("Database migration aborted.");
}

// Call from cli
public int migrate()
{
int numApplied = 0;
MigrationContext context = new MigrationContext(databaseType);
Set<String> appliedSet;
try (Handle handle = dbi.open()) {
boolean isInitial = !existsSchemaMigrationsTable(handle);
if (isInitial) {
createSchemaMigrationsTable(handle, context);
}

for (Migration m : migrations) {
Set<String> appliedSet = getAppliedMigrationNames(handle);
if (appliedSet.add(m.getVersion())) {
logger.info("Applying database migration:" + m.getVersion());
appliedSet = getAppliedMigrationNames(handle);
}
for (Migration m : migrations) {
if (appliedSet.add(m.getVersion())) {
if (applyMigrationIfNotDone(context, m)) {
numApplied++;
if (m.noTransaction()) {
// In no transaction we can't lock schema_migrations table
}
}
}
if (numApplied > 0) {
if (context.isPostgres()) {
logger.info("{} migrations applied.", numApplied);
}
else {
logger.debug("{} migrations applied.", numApplied);
}
}
return numApplied;
}

// add "synchronized" so that multiple threads don't run the same migration on the same database
private synchronized boolean applyMigrationIfNotDone(MigrationContext context, Migration m)
{
// recreate Handle for each time to be able to discard pg_advisory_lock.
try (Handle handle = dbi.open()) {
if (m.noTransaction(context)) {
// Advisory lock if available -> migrate
if (context.isPostgres()) {
handle.select("SELECT pg_advisory_lock(23299, 0)");
// re-check migration status after lock
if (!checkIfMigrationApplied(handle, m.getVersion())) {
logger.info("Applying database migration:" + m.getVersion());
applyMigration(m, handle, context);
return true;
}
else {
handle.inTransaction((h, session) -> {
if (context.isPostgres()) {
// lock tables not to run migration concurrently.
// h2 doesn't support table lock.
h.update("LOCK TABLE schema_migrations IN EXCLUSIVE MODE");
}
return false;
}
else {
// h2 doesn't support table lock and it's unnecessary because of synchronized
logger.debug("Applying database migration:" + m.getVersion());
applyMigration(m, handle, context);
return true;
}

}
else {
// Start transaction -> Lock table -> Re-check migration status -> migrate
return handle.inTransaction((h, session) -> {
if (context.isPostgres()) {
// lock tables not to run migration concurrently.
h.update("LOCK TABLE schema_migrations IN EXCLUSIVE MODE");
// re-check migration status after lock
if (!checkIfMigrationApplied(h, m.getVersion())) {
logger.info("Applying database migration:" + m.getVersion());
applyMigration(m, handle, context);
return true;
});
}
return false;
}
}
else {
// h2 doesn't support table lock and it's unnecessary because of synchronized
logger.debug("Applying database migration:" + m.getVersion());
applyMigration(m, handle, context);
return true;
}
});
}
}
return numApplied;
}

// Called from cli migrate
Expand Down Expand Up @@ -171,12 +185,21 @@ public List<Migration> getApplicableMigration()
return applicableMigrations;
}

Set<String> getAppliedMigrationNames(Handle handle)
private Set<String> getAppliedMigrationNames(Handle handle)
{
return new HashSet<>(
handle.createQuery("select name from schema_migrations")
.mapTo(String.class)
.list());
.mapTo(String.class)
.list());
}

private boolean checkIfMigrationApplied(Handle handle, String name)
{
return handle.createQuery("select name from schema_migrations where name = :name limit 1")
.bind("name", name)
.mapTo(String.class)
.list()
.size() > 0;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public AutoMigrator(DataSource ds, DatabaseConfig config)
public void migrate()
{
if (migrator != null) {
migrator.migrateWithRetry();
migrator.migrate();
migrator = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@ default String getVersion()

/**
* If true, this Migration apply without transaction.
* This is introduced because 'create index concurrently' cannot run in transaction.
* This is introduced specifically for PostgreSQL's CREATE INDEX CONCURRENTLY statement.
*
* If this method returns true, the migration MUST RUN ONLY ONE DDL STATEMENT.
* Otherwise, a failure in the middle of multiple statements causes inconsistency, and
* retrying the migration may never success because the first DDL statement is already
* applied and next migration attempt will also apply the same DDL statement.
*
* @return
*/
default boolean noTransaction() { return false; }
default boolean noTransaction(MigrationContext context)
{
return false;
}

void migrate(Handle handle, MigrationContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@ public void migrate(Handle handle, MigrationContext context)
}

@Override
public boolean noTransaction() { return true; }
public boolean noTransaction(MigrationContext context)
{
return context.isPostgres();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static DatabaseFactory setupDatabase(boolean autoAutoCommit)
DBI dbi = new DBI(dsp.get());
TransactionManager tm = new ThreadLocalTransactionManager(dsp.get(), autoAutoCommit);
// FIXME
new DatabaseMigrator(dbi, config).migrateWithRetry();
new DatabaseMigrator(dbi, config).migrate();

cleanDatabase(config.getType(), dbi);

Expand Down

0 comments on commit ec9ba0e

Please sign in to comment.