Skip to content

Commit

Permalink
Merge pull request #90 from openedx/cag/tag-txn
Browse files Browse the repository at this point in the history
fix: wait for transaction commit before trying to sink models
  • Loading branch information
Cristhian Garcia authored Oct 4, 2024
2 parents 9867200 + 1fbe85c commit 0775832
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 94 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ Change Log
Unreleased
**********

0.11.2 - 2024-10-04
*******************

Fixes
=====

* wait for transaction commit before trying to sink models.

0.11.1 - 2024-09-06
*******************

Expand Down
2 changes: 1 addition & 1 deletion platform_plugin_aspects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
import os
from pathlib import Path

__version__ = "0.11.1"
__version__ = "0.11.2"

ROOT_DIRECTORY = Path(os.path.dirname(os.path.abspath(__file__)))
210 changes: 129 additions & 81 deletions platform_plugin_aspects/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.db import transaction
from django.db.models.signals import post_delete, post_save
from django.dispatch import Signal, receiver
from opaque_keys import InvalidKeyError

from platform_plugin_aspects.sinks import (
CourseEnrollmentSink,
Expand Down Expand Up @@ -63,34 +64,33 @@ def receive_course_enrollment_changed( # pylint: disable=unused-argument # pra
)


def on_user_profile_updated(instance):
"""
Queues the UserProfile dump job when the parent transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = UserProfileSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)


def on_user_profile_updated_txn(**kwargs):
def on_user_profile_updated_txn(*args, **kwargs):
"""
Handle user_profile saves in the middle of a transaction.
Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
transaction.on_commit(
lambda: on_user_profile_updated(kwargs["instance"])
) # pragma: no cover

def on_user_profile_updated(instance, **kwargs):
"""
Queues the UserProfile dump job when the parent transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = UserProfileSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_user_profile_updated(*args, **kwargs))


# Connect the UserProfile.post_save signal handler only if we have a model to attach to.
Expand All @@ -102,30 +102,42 @@ def on_user_profile_updated_txn(**kwargs):
) # pragma: no cover


def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
def on_externalid_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.
Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ExternalIdSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ExternalIdSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_externalid_saved(*args, **kwargs))


# Connect the ExternalId.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_external_id = get_model("external_id")
if _external_id:
post_save.connect(on_externalid_saved, sender=_external_id) # pragma: no cover
post_save.connect(on_externalid_saved_txn, sender=_external_id) # pragma: no cover


@receiver(USER_RETIRE_LMS_MISC)
Expand All @@ -148,75 +160,111 @@ def on_user_retirement( # pylint: disable=unused-argument # pragma: no cover
)


def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
def on_tag_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.
Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_tag_saved(*args, **kwargs))


# Connect the ExternalId.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_tag = get_model("tag")
if _tag:
post_save.connect(on_tag_saved, sender=_tag) # pragma: no cover
post_save.connect(on_tag_saved_txn, sender=_tag) # pragma: no cover


def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
def on_taxonomy_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.
Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TaxonomySink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover
sender, instance, **kwargs
):
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = TaxonomySink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

transaction.on_commit(lambda: on_taxonomy_saved(*args, **kwargs))


# Connect the ExternalId.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_taxonomy = get_model("taxonomy")
if _taxonomy:
post_save.connect(on_taxonomy_saved, sender=_taxonomy) # pragma: no cover
post_save.connect(on_taxonomy_saved_txn, sender=_taxonomy) # pragma: no cover


def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover
def on_object_tag_saved_txn(*args, **kwargs):
"""
Receives post save signal and queues the dump job.
Handle external_id saves in the middle of a transaction.
Handle saves in the middle of a transaction.
If this gets fired before the transaction commits, the task may try to
query an id that doesn't exist yet and throw an error. This should postpone
queuing the Celery task until after the transaction is committed.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ObjectTagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover
"""
Receives post save signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel
dump_data_to_clickhouse,
)

sink = ObjectTagSink(None, None)
dump_data_to_clickhouse.delay(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)

on_object_tag_deleted(sender, instance, **kwargs)

on_object_tag_deleted(sender, instance, **kwargs)
transaction.on_commit(lambda: on_object_tag_saved(*args, **kwargs))


def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cover
Expand All @@ -235,13 +283,13 @@ def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cove
try:
CourseOverview.objects.get(id=instance.object_id)
dump_course_to_clickhouse.delay(instance.object_id)
except CourseOverview.DoesNotExist:
except (CourseOverview.DoesNotExist, InvalidKeyError):
pass


# Connect the ExternalId.post_save signal handler only if we have a model to attach to.
# (prevents celery errors during tests)
_object_tag = get_model("object_tag")
if _object_tag: # pragma: no cover
post_save.connect(on_object_tag_saved, sender=_object_tag)
post_save.connect(on_object_tag_saved_txn, sender=_object_tag)
post_delete.connect(on_object_tag_deleted, sender=_object_tag)
17 changes: 5 additions & 12 deletions platform_plugin_aspects/tests/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
from django.test import TestCase

from platform_plugin_aspects.signals import (
on_externalid_saved,
on_externalid_saved_txn,
on_user_retirement,
receive_course_publish,
)
from platform_plugin_aspects.sinks.external_id_sink import ExternalIdSink
from platform_plugin_aspects.sinks.user_retire_sink import UserRetirementSink


Expand All @@ -31,22 +30,16 @@ def test_receive_course_publish(self, mock_dump_task):

mock_dump_task.delay.assert_called_once_with(course_key)

@patch("platform_plugin_aspects.tasks.dump_data_to_clickhouse")
def test_on_externalid_saved(self, mock_dump_task):
@patch("platform_plugin_aspects.signals.transaction")
def test_on_externalid_saved(self, mock_transaction):
"""
Test that on_externalid_saved calls dump_data_to_clickhouse.
"""
instance = Mock()
sender = Mock()
on_externalid_saved(sender, instance)

sink = ExternalIdSink(None, None)
on_externalid_saved_txn(sender, instance)

mock_dump_task.delay.assert_called_once_with(
sink_module=sink.__module__,
sink_name=sink.__class__.__name__,
object_id=str(instance.id),
)
mock_transaction.on_commit.assert_called_once()

@patch("platform_plugin_aspects.tasks.dump_data_to_clickhouse")
def test_on_user_retirement(self, mock_dump_task):
Expand Down

0 comments on commit 0775832

Please sign in to comment.