Skip to content

Commit

Permalink
Add retry logic for concurrency sensitive database transactions
Browse files Browse the repository at this point in the history
When the MCP Server inserts rows into the `Tasks` table, it holds a
lock on the table primary key index.  When the MCP Client updates
existing tasks to set their start time, it does the same.  If these
transactions overlap, MySQL may detect this condition as a deadlock
and abort one the transactions with a retriable/retryable(?!)
exception.

To handle this, we can retry the failed transaction after a short
wait.  It should always succeed eventually.

Addresses issue #1198
  • Loading branch information
marktriggs authored and sevein committed Jul 11, 2018
1 parent d3fc3b5 commit 98052fb
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 39 deletions.
77 changes: 44 additions & 33 deletions src/MCPClient/lib/archivematicaClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import gearman

from main.models import Task
from databaseFunctions import getUTCDate
from databaseFunctions import getUTCDate, retryOnFailure

from django.db import transaction
import shlex
Expand Down Expand Up @@ -124,9 +124,13 @@ def handle_batch_task(gearman_job):
caller_wants_output=task_data['wants_output'])
jobs.append(job)

# Set their start times...
Task.objects.filter(
taskuuid__in=[item.UUID for item in jobs]).update(starttime=utc_date)
# Set their start times. If we collide with the MCP Server inserting new
# Tasks (which can happen under heavy concurrent load), retry as needed.
def set_start_times():
Task.objects.filter(
taskuuid__in=[item.UUID for item in jobs]).update(starttime=utc_date)

retryOnFailure("Set task start times", set_start_times)

module = importlib.import_module("clientScripts." + module_name)

Expand All @@ -148,10 +152,14 @@ def fail_all_tasks(gearman_job, reason):
# we got to this point because the DB is unavailable this isn't going to
# work...
try:
for task_uuid in gearman_data['tasks']:
Task.objects.filter(taskuuid=task_uuid).update(stderror=str(reason),
exitcode=1,
endtime=getUTCDate())
def fail_all_tasks_callback():
for task_uuid in gearman_data['tasks']:
Task.objects.filter(taskuuid=task_uuid).update(stderror=str(reason),
exitcode=1,
endtime=getUTCDate())

retryOnFailure("Fail all tasks", fail_all_tasks_callback)

except Exception as e:
logger.exception("Failed to update tasks in DB: %s", e)

Expand All @@ -172,31 +180,34 @@ def execute_command(gearman_worker, gearman_job):
jobs = handle_batch_task(gearman_job)
results = {}

with transaction.atomic():
for job in jobs:
logger.info("\n\n*** Completed job: %s" % (job.dump()))

kwargs = {
'exitcode': job.get_exit_code(),
'endtime': getUTCDate(),
}
if django_settings.CAPTURE_CLIENT_SCRIPT_OUTPUT:
kwargs.update({
'stdout': job.get_stdout(),
'stderror': job.get_stderr(),
})
Task.objects.filter(taskuuid=job.UUID).update(**kwargs)

results[job.UUID] = {'exitCode': job.get_exit_code()}

if job.caller_wants_output:
# Send back stdout/stderr so it can be written to files.
# Most cases don't require this (logging to the database is
# enough), but the ones that do are coordinated through the
# MCP Server so that multiple MCP Client instances don't try
# to write the same file at the same time.
results[job.UUID]['stdout'] = job.get_stdout()
results[job.UUID]['stderror'] = job.get_stderr()
def write_task_results_callback():
with transaction.atomic():
for job in jobs:
logger.info("\n\n*** Completed job: %s" % (job.dump()))

kwargs = {
'exitcode': job.get_exit_code(),
'endtime': getUTCDate(),
}
if django_settings.CAPTURE_CLIENT_SCRIPT_OUTPUT:
kwargs.update({
'stdout': job.get_stdout(),
'stderror': job.get_stderr(),
})
Task.objects.filter(taskuuid=job.UUID).update(**kwargs)

results[job.UUID] = {'exitCode': job.get_exit_code()}

if job.caller_wants_output:
# Send back stdout/stderr so it can be written to files.
# Most cases don't require this (logging to the database is
# enough), but the ones that do are coordinated through the
# MCP Server so that multiple MCP Client instances don't try
# to write the same file at the same time.
results[job.UUID]['stdout'] = job.get_stdout()
results[job.UUID]['stderror'] = job.get_stderr()

retryOnFailure("Write task results", write_task_results_callback)

return cPickle.dumps({'task_results': results})
except SystemExit:
Expand Down
15 changes: 9 additions & 6 deletions src/MCPServer/lib/taskGroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,15 @@ def logTaskCreatedSQL(self):
with self.groupTasksLock:
self.finalised = True

with transaction.atomic():
for task in self.groupTasks:
databaseFunctions.logTaskCreatedSQL(self.linkTaskManager,
task.commandReplacementDic,
task.UUID,
task.arguments)
def insertTasks():
with transaction.atomic():
for task in self.groupTasks:
databaseFunctions.logTaskCreatedSQL(self.linkTaskManager,
task.commandReplacementDic,
task.UUID,
task.arguments)

databaseFunctions.retryOnFailure("Insert tasks", insertTasks)

def calculateExitCode(self):
"""
Expand Down
18 changes: 18 additions & 0 deletions src/archivematicaCommon/lib/databaseFunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import os
import string
import sys
import random
import time
import uuid

from django.db import close_old_connections
Expand Down Expand Up @@ -353,3 +355,19 @@ def deUnicode(str):
if str is None:
return None
return unicode(str).encode('utf-8')


def retryOnFailure(description, callback, retries=10):
for retry in range(0, retries + 1):
try:
callback()
break
except Exception as e:
if retry == retries:
LOGGER.error('Failed to complete transaction "%s" after %s retries',
description, retries)
raise e
else:
LOGGER.debug('Retrying "%s" transaction after caught exception (retry %d): %s',
description, retry + 1, e)
time.sleep(random.uniform(0, 2))

0 comments on commit 98052fb

Please sign in to comment.