Skip to content

Commit

Permalink
Defer FPRule counter updates during normalization
Browse files Browse the repository at this point in the history
  • Loading branch information
sevein committed Nov 1, 2024
1 parent bade7ad commit 6310d85
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 21 deletions.
76 changes: 72 additions & 4 deletions src/MCPClient/lib/clientScripts/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
import shutil
import traceback
import uuid
from collections import defaultdict
from types import TracebackType
from typing import Callable
from typing import DefaultDict
from typing import Dict
from typing import List
from typing import Optional
from typing import Type

import django
import transcoder
Expand All @@ -20,17 +25,21 @@
import databaseFunctions
import fileOperations
from client.job import Job
from custom_handlers import get_script_logger
from dicts import ReplacementDict
from django.conf import settings as mcpclient_settings
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models import F
from fpr.models import FPRule
from lib import setup_dicts
from main.models import Derivation
from main.models import File
from main.models import FileFormatVersion
from main.models import FileID

logger = get_script_logger("archivematica.mcp.client.normalize")

# Return codes
SUCCESS = 0
RULE_FAILED = 1
Expand Down Expand Up @@ -355,7 +364,59 @@ def get_default_rule(purpose: str) -> FPRule:
return FPRule.active.get(purpose="default_" + purpose)


def main(job: Job, opts: NormalizeArgs) -> int:
class DeferredFPRuleCounter:
"""Deferred counter for FPRule attempts, successes, and failures.
This class postpones database writes to aggregate updates and minimize the
duration of transactions, which is beneficial when dealing with long-running
batches.
"""

def __init__(self) -> None:
self._counters: DefaultDict[uuid.UUID, Dict[str, int]] = defaultdict(
lambda: {"count_attempts": 0, "count_okay": 0, "count_not_okay": 0}
)

def __enter__(self) -> "DeferredFPRuleCounter":
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
try:
self.save()
except Exception as err:
logger.error("Failed to save counters: %s", err, exc_info=True)

def record_attempt(self, fprule: FPRule) -> None:
self._counters[fprule.uuid]["count_attempts"] += 1

def record_success(self, fprule: FPRule) -> None:
self._counters[fprule.uuid]["count_okay"] += 1

def record_failure(self, fprule: FPRule) -> None:
self._counters[fprule.uuid]["count_not_okay"] += 1

def save(self) -> None:
"""Persist all aggregated FPRule counters in a single transaction.
This method updates the success and failure rates of FPRules by
incrementing their respective counters. It uses Django's F() expressions
to ensure atomic updates and prevent race conditions.
"""
with transaction.atomic():
for fprule_id, increments in self._counters.items():
FPRule.objects.filter(uuid=fprule_id).update(
count_attempts=F("count_attempts") + increments["count_attempts"],
count_okay=F("count_okay") + increments["count_okay"],
count_not_okay=F("count_not_okay") + increments["count_not_okay"],
)


def main(job: Job, opts: NormalizeArgs, counter: DeferredFPRuleCounter) -> int:
"""Find and execute normalization commands on input file."""
# TODO fix for maildir working only on attachments

Expand Down Expand Up @@ -489,7 +550,13 @@ def main(job: Job, opts: NormalizeArgs) -> int:

replacement_dict = get_replacement_dict(job, opts)
cl = transcoder.CommandLinker(
job, rule, command, replacement_dict, opts, once_normalized_callback(job)
job,
rule,
command,
replacement_dict,
opts,
once_normalized_callback(job),
counter,
)
exitstatus = cl.execute()

Expand Down Expand Up @@ -540,6 +607,7 @@ def main(job: Job, opts: NormalizeArgs) -> int:
replacement_dict,
opts,
once_normalized_callback(job),
counter,
)
exitstatus = cl.execute()

Expand Down Expand Up @@ -611,7 +679,7 @@ def parse_args(parser: argparse.ArgumentParser, job: Job) -> NormalizeArgs:
def call(jobs: List[Job]) -> None:
parser = get_parser()

with transaction.atomic():
with DeferredFPRuleCounter() as counter, transaction.atomic():
for job in jobs:
with job.JobContext():
opts = parse_args(parser, job)
Expand All @@ -625,7 +693,7 @@ def call(jobs: List[Job]) -> None:
continue

try:
job.set_status(main(job, opts))
job.set_status(main(job, opts, counter))
except Exception as e:
job.print_error(str(e))
job.set_status(1)
15 changes: 7 additions & 8 deletions src/MCPClient/lib/clientScripts/transcoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#
# You should have received a copy of the GNU General Public License
# along with Archivematica. If not, see <http://www.gnu.org/licenses/>.
from django.db.models import F
from executeOrRunSubProcess import executeOrRun


Expand Down Expand Up @@ -109,7 +108,9 @@ def execute(self, skip_on_success=False):


class CommandLinker:
def __init__(self, job, fprule, command, replacement_dict, opts, on_success):
def __init__(
self, job, fprule, command, replacement_dict, opts, on_success, counter
):
self.fprule = fprule
self.command = command
self.replacement_dict = replacement_dict
Expand All @@ -118,6 +119,7 @@ def __init__(self, job, fprule, command, replacement_dict, opts, on_success):
self.commandObject = Command(
job, self.command, replacement_dict, self.on_success, opts
)
self.counter = counter

def __str__(self):
return (
Expand All @@ -128,13 +130,10 @@ def execute(self):
"""Execute the command, and track the success statistics.
Returns 0 on success, non-0 on failure."""
# Track success/failure rates of FP Rules
# Use Django's F() to prevent race condition updating the counts
self.fprule.count_attempts = F("count_attempts") + 1
self.counter.record_attempt(self.fprule)
ret = self.commandObject.execute()
if ret:
self.fprule.count_not_okay = F("count_not_okay") + 1
self.counter.record_failure(self.fprule)
else:
self.fprule.count_okay = F("count_okay") + 1
self.fprule.save()
self.counter.record_success(self.fprule)
return ret
27 changes: 18 additions & 9 deletions tests/MCPClient/test_normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def test_normalization_fails_if_original_file_does_not_exist() -> None:
job = mock.Mock(spec=Job)
opts = mock.Mock(file_uuid=file_uuid)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.NO_RULE_FOUND
job.print_error.assert_called_once_with(
Expand All @@ -67,7 +68,8 @@ def test_normalization_skips_submission_documentation_file_if_group_use_does_not
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
assert job.print_output.mock_calls == [
Expand All @@ -93,7 +95,8 @@ def test_normalization_skips_file_if_group_use_does_not_match(
normalize_file_grp_use="access",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
assert job.print_output.mock_calls == [
Expand Down Expand Up @@ -178,7 +181,8 @@ def test_manual_normalization_creates_event_and_derivation(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
assert job.print_output.mock_calls == [
Expand Down Expand Up @@ -250,7 +254,8 @@ def test_manual_normalization_fails_with_invalid_normalization_csv(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.NO_RULE_FOUND
assert job.print_error.mock_calls == [
Expand Down Expand Up @@ -297,7 +302,8 @@ def test_manual_normalization_matches_by_filename_instead_of_normalization_csv(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
assert job.print_error.mock_calls == []
Expand Down Expand Up @@ -350,7 +356,8 @@ def test_manual_normalization_matches_from_multiple_filenames(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
assert job.print_error.mock_calls == []
Expand Down Expand Up @@ -413,7 +420,8 @@ def test_normalization_falls_back_to_default_rule(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
command_linker.assert_called_once()
Expand Down Expand Up @@ -474,7 +482,8 @@ def test_normalization_finds_rule_by_file_format_version(
normalize_file_grp_use="original",
)

result = normalize.main(job, opts)
with normalize.DeferredFPRuleCounter() as counter:
result = normalize.main(job, opts, counter)

assert result == normalize.SUCCESS
command_linker.assert_called_once()
Expand Down

0 comments on commit 6310d85

Please sign in to comment.