From a465b85b555e07123ad6ce08696908ed76d4fbcc Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Wed, 11 Sep 2024 15:35:56 -0500 Subject: [PATCH 1/8] fix: tagging xblocks triggers InvalidKeyError --- platform_plugin_aspects/signals.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/platform_plugin_aspects/signals.py b/platform_plugin_aspects/signals.py index fc630f8..f03ec07 100644 --- a/platform_plugin_aspects/signals.py +++ b/platform_plugin_aspects/signals.py @@ -5,6 +5,7 @@ from django.db import transaction from django.db.models.signals import post_delete, post_save from django.dispatch import Signal, receiver +from opaque_keys import InvalidKeyError from platform_plugin_aspects.sinks import ( CourseEnrollmentSink, @@ -235,7 +236,7 @@ def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cove try: CourseOverview.objects.get(id=instance.object_id) dump_course_to_clickhouse.delay(instance.object_id) - except CourseOverview.DoesNotExist: + except (CourseOverview.DoesNotExist, InvalidKeyError): pass From 1d25e86446efe5b13b74202f9df05ffe5a1c95de Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Wed, 11 Sep 2024 17:00:47 -0500 Subject: [PATCH 2/8] fix: wait for transaction commit before trying to sink models --- platform_plugin_aspects/signals.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/platform_plugin_aspects/signals.py b/platform_plugin_aspects/signals.py index f03ec07..00fd131 100644 --- a/platform_plugin_aspects/signals.py +++ b/platform_plugin_aspects/signals.py @@ -81,17 +81,19 @@ def on_user_profile_updated(instance): ) -def on_user_profile_updated_txn(**kwargs): +def on_txn_wrapper(func, *args, **kwargs): """ - Handle user_profile saves in the middle of a transaction. + Handle saves in the middle of a transaction. If this gets fired before the transaction commits, the task may try to query an id that doesn't exist yet and throw an error. This should postpone queuing the Celery task until after the transaction is committed. """ - transaction.on_commit( - lambda: on_user_profile_updated(kwargs["instance"]) - ) # pragma: no cover + + def on_txn(**kwargs): + transaction.on_commit(lambda: func(*args, **kwargs)) + + return on_txn # Connect the UserProfile.post_save signal handler only if we have a model to attach to. @@ -99,7 +101,7 @@ def on_user_profile_updated_txn(**kwargs): _user_profile = get_model("user_profile") if _user_profile: post_save.connect( - on_user_profile_updated_txn, sender=_user_profile + on_txn_wrapper(on_user_profile_updated), sender=_user_profile ) # pragma: no cover @@ -126,7 +128,9 @@ def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover # (prevents celery errors during tests) _external_id = get_model("external_id") if _external_id: - post_save.connect(on_externalid_saved, sender=_external_id) # pragma: no cover + post_save.connect( + on_txn_wrapper(on_externalid_saved), sender=_external_id + ) # pragma: no cover @receiver(USER_RETIRE_LMS_MISC) @@ -172,7 +176,7 @@ def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover # (prevents celery errors during tests) _tag = get_model("tag") if _tag: - post_save.connect(on_tag_saved, sender=_tag) # pragma: no cover + post_save.connect(on_txn_wrapper(on_tag_saved), sender=_tag) # pragma: no cover def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover @@ -198,7 +202,9 @@ def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover # (prevents celery errors during tests) _taxonomy = get_model("taxonomy") if _taxonomy: - post_save.connect(on_taxonomy_saved, sender=_taxonomy) # pragma: no cover + post_save.connect( + on_txn_wrapper(on_taxonomy_saved), sender=_taxonomy + ) # pragma: no cover def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover @@ -244,5 +250,5 @@ def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cove # (prevents celery errors during tests) _object_tag = get_model("object_tag") if _object_tag: # pragma: no cover - post_save.connect(on_object_tag_saved, sender=_object_tag) - post_delete.connect(on_object_tag_deleted, sender=_object_tag) + post_save.connect(on_txn_wrapper(on_object_tag_saved), sender=_object_tag) + post_delete.connect(on_txn_wrapper(on_object_tag_deleted), sender=_object_tag) From d7c08b08d9e1db69f8101ff8531a19933b3e827c Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Mon, 23 Sep 2024 15:32:29 -0500 Subject: [PATCH 3/8] fix: accept kwargs on_user_profile_updated --- platform_plugin_aspects/signals.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/platform_plugin_aspects/signals.py b/platform_plugin_aspects/signals.py index 00fd131..423ad3f 100644 --- a/platform_plugin_aspects/signals.py +++ b/platform_plugin_aspects/signals.py @@ -64,7 +64,7 @@ def receive_course_enrollment_changed( # pylint: disable=unused-argument # pra ) -def on_user_profile_updated(instance): +def on_user_profile_updated(instance, **kwargs): """ Queues the UserProfile dump job when the parent transaction is committed. """ From 21cb02104a17d2354ba985365cd0db8c1fe43587 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Thu, 26 Sep 2024 16:02:18 -0500 Subject: [PATCH 4/8] refactor(sink): move signals from generic txn wrapper to function in functions --- platform_plugin_aspects/signals.py | 210 ++++++++++-------- platform_plugin_aspects/tests/test_signals.py | 17 +- 2 files changed, 128 insertions(+), 99 deletions(-) diff --git a/platform_plugin_aspects/signals.py b/platform_plugin_aspects/signals.py index 423ad3f..9f80b1d 100644 --- a/platform_plugin_aspects/signals.py +++ b/platform_plugin_aspects/signals.py @@ -64,36 +64,32 @@ def receive_course_enrollment_changed( # pylint: disable=unused-argument # pra ) -def on_user_profile_updated(instance, **kwargs): - """ - Queues the UserProfile dump job when the parent transaction is committed. - """ - # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel - dump_data_to_clickhouse, - ) - - sink = UserProfileSink(None, None) - dump_data_to_clickhouse.delay( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) - - -def on_txn_wrapper(func, *args, **kwargs): +def on_user_profile_updated_txn(**kwargs): """ + Handle user_profile saves in the middle of a transaction. Handle saves in the middle of a transaction. - If this gets fired before the transaction commits, the task may try to query an id that doesn't exist yet and throw an error. This should postpone queuing the Celery task until after the transaction is committed. """ - def on_txn(**kwargs): - transaction.on_commit(lambda: func(*args, **kwargs)) + def on_user_profile_updated(instance): + """ + Queues the UserProfile dump job when the parent transaction is committed. + """ + # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded + from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel + dump_data_to_clickhouse, + ) - return on_txn + sink = UserProfileSink(None, None) + dump_data_to_clickhouse.delay( + sink_module=sink.__module__, + sink_name=sink.__class__.__name__, + object_id=str(instance.id), + ) + + transaction.on_commit(lambda: on_user_profile_updated(**kwargs)) # Connect the UserProfile.post_save signal handler only if we have a model to attach to. @@ -101,36 +97,45 @@ def on_txn(**kwargs): _user_profile = get_model("user_profile") if _user_profile: post_save.connect( - on_txn_wrapper(on_user_profile_updated), sender=_user_profile + on_user_profile_updated_txn, sender=_user_profile ) # pragma: no cover -def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover - sender, instance, **kwargs -): +def on_externalid_saved_txn(*args, **kwargs): """ - Receives post save signal and queues the dump job. + Handle external_id saves in the middle of a transaction. + Handle saves in the middle of a transaction. + If this gets fired before the transaction commits, the task may try to + query an id that doesn't exist yet and throw an error. This should postpone + queuing the Celery task until after the transaction is committed. """ - # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel - dump_data_to_clickhouse, - ) - sink = ExternalIdSink(None, None) - dump_data_to_clickhouse.delay( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) + def on_externalid_saved( # pylint: disable=unused-argument # pragma: no cover + sender, instance, **kwargs + ): + """ + Receives post save signal and queues the dump job. + """ + # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded + from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel + dump_data_to_clickhouse, + ) + + sink = ExternalIdSink(None, None) + dump_data_to_clickhouse.delay( + sink_module=sink.__module__, + sink_name=sink.__class__.__name__, + object_id=str(instance.id), + ) + + transaction.on_commit(lambda: on_externalid_saved(*args, **kwargs)) # Connect the ExternalId.post_save signal handler only if we have a model to attach to. # (prevents celery errors during tests) _external_id = get_model("external_id") if _external_id: - post_save.connect( - on_txn_wrapper(on_externalid_saved), sender=_external_id - ) # pragma: no cover + post_save.connect(on_externalid_saved_txn, sender=_external_id) # pragma: no cover @receiver(USER_RETIRE_LMS_MISC) @@ -153,77 +158,108 @@ def on_user_retirement( # pylint: disable=unused-argument # pragma: no cover ) -def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover - sender, instance, **kwargs -): +def on_tag_saved_txn(*args, **kwargs): """ - Receives post save signal and queues the dump job. + Handle external_id saves in the middle of a transaction. + Handle saves in the middle of a transaction. + If this gets fired before the transaction commits, the task may try to + query an id that doesn't exist yet and throw an error. This should postpone + queuing the Celery task until after the transaction is committed. """ - # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel - dump_data_to_clickhouse, - ) - sink = TagSink(None, None) - dump_data_to_clickhouse.delay( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) + def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover + sender, instance, **kwargs + ): + """ + Receives post save signal and queues the dump job. + """ + # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded + from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel + dump_data_to_clickhouse, + ) + + sink = TagSink(None, None) + dump_data_to_clickhouse.delay( + sink_module=sink.__module__, + sink_name=sink.__class__.__name__, + object_id=str(instance.id), + ) + + transaction.on_commit(lambda: on_tag_saved(*args, **kwargs)) # Connect the ExternalId.post_save signal handler only if we have a model to attach to. # (prevents celery errors during tests) _tag = get_model("tag") if _tag: - post_save.connect(on_txn_wrapper(on_tag_saved), sender=_tag) # pragma: no cover + post_save.connect(on_tag_saved_txn, sender=_tag) # pragma: no cover -def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover - sender, instance, **kwargs -): +def on_taxonomy_saved_txn(*args, **kwargs): """ - Receives post save signal and queues the dump job. + Handle external_id saves in the middle of a transaction. + Handle saves in the middle of a transaction. + If this gets fired before the transaction commits, the task may try to + query an id that doesn't exist yet and throw an error. This should postpone + queuing the Celery task until after the transaction is committed. """ - # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel - dump_data_to_clickhouse, - ) - sink = TaxonomySink(None, None) - dump_data_to_clickhouse.delay( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) + def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover + sender, instance, **kwargs + ): + """ + Receives post save signal and queues the dump job. + """ + # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded + from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel + dump_data_to_clickhouse, + ) + + sink = TaxonomySink(None, None) + dump_data_to_clickhouse.delay( + sink_module=sink.__module__, + sink_name=sink.__class__.__name__, + object_id=str(instance.id), + ) + + transaction.on_commit(lambda: on_taxonomy_saved(*args, **kwargs)) # Connect the ExternalId.post_save signal handler only if we have a model to attach to. # (prevents celery errors during tests) _taxonomy = get_model("taxonomy") if _taxonomy: - post_save.connect( - on_txn_wrapper(on_taxonomy_saved), sender=_taxonomy - ) # pragma: no cover + post_save.connect(on_taxonomy_saved_txn, sender=_taxonomy) # pragma: no cover -def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover +def on_object_tag_saved_txn(*args, **kwargs): """ - Receives post save signal and queues the dump job. + Handle external_id saves in the middle of a transaction. + Handle saves in the middle of a transaction. + If this gets fired before the transaction commits, the task may try to + query an id that doesn't exist yet and throw an error. This should postpone + queuing the Celery task until after the transaction is committed. """ - # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel - dump_data_to_clickhouse, - ) - sink = ObjectTagSink(None, None) - dump_data_to_clickhouse.delay( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) + def on_object_tag_saved(sender, instance, **kwargs): # pragma: no cover + """ + Receives post save signal and queues the dump job. + """ + # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded + from platform_plugin_aspects.tasks import ( # pylint: disable=import-outside-toplevel + dump_data_to_clickhouse, + ) + + sink = ObjectTagSink(None, None) + dump_data_to_clickhouse.delay( + sink_module=sink.__module__, + sink_name=sink.__class__.__name__, + object_id=str(instance.id), + ) + + on_object_tag_deleted(sender, instance, **kwargs) - on_object_tag_deleted(sender, instance, **kwargs) + transaction.on_commit(lambda: on_object_tag_saved(*args, **kwargs)) def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cover @@ -250,5 +286,5 @@ def on_object_tag_deleted( # pylint: disable=unused-argument # pragma: no cove # (prevents celery errors during tests) _object_tag = get_model("object_tag") if _object_tag: # pragma: no cover - post_save.connect(on_txn_wrapper(on_object_tag_saved), sender=_object_tag) - post_delete.connect(on_txn_wrapper(on_object_tag_deleted), sender=_object_tag) + post_save.connect(on_object_tag_saved_txn, sender=_object_tag) + post_delete.connect(on_object_tag_deleted, sender=_object_tag) diff --git a/platform_plugin_aspects/tests/test_signals.py b/platform_plugin_aspects/tests/test_signals.py index a720ae1..5e1fe66 100644 --- a/platform_plugin_aspects/tests/test_signals.py +++ b/platform_plugin_aspects/tests/test_signals.py @@ -7,11 +7,10 @@ from django.test import TestCase from platform_plugin_aspects.signals import ( - on_externalid_saved, + on_externalid_saved_txn, on_user_retirement, receive_course_publish, ) -from platform_plugin_aspects.sinks.external_id_sink import ExternalIdSink from platform_plugin_aspects.sinks.user_retire_sink import UserRetirementSink @@ -31,22 +30,16 @@ def test_receive_course_publish(self, mock_dump_task): mock_dump_task.delay.assert_called_once_with(course_key) - @patch("platform_plugin_aspects.tasks.dump_data_to_clickhouse") - def test_on_externalid_saved(self, mock_dump_task): + @patch("platform_plugin_aspects.signals.transaction") + def test_on_externalid_saved(self, mock_transaction): """ Test that on_externalid_saved calls dump_data_to_clickhouse. """ instance = Mock() sender = Mock() - on_externalid_saved(sender, instance) - - sink = ExternalIdSink(None, None) + on_externalid_saved_txn(sender, instance) - mock_dump_task.delay.assert_called_once_with( - sink_module=sink.__module__, - sink_name=sink.__class__.__name__, - object_id=str(instance.id), - ) + mock_transaction.on_commit.assert_called_once() @patch("platform_plugin_aspects.tasks.dump_data_to_clickhouse") def test_on_user_retirement(self, mock_dump_task): From 6ee1813bcc8235863003b709cbb4db7cac01437e Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Mon, 30 Sep 2024 13:30:15 -0500 Subject: [PATCH 5/8] chore: quality fixes --- platform_plugin_aspects/signals.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/platform_plugin_aspects/signals.py b/platform_plugin_aspects/signals.py index 9f80b1d..6a350f6 100644 --- a/platform_plugin_aspects/signals.py +++ b/platform_plugin_aspects/signals.py @@ -67,6 +67,7 @@ def receive_course_enrollment_changed( # pylint: disable=unused-argument # pra def on_user_profile_updated_txn(**kwargs): """ Handle user_profile saves in the middle of a transaction. + Handle saves in the middle of a transaction. If this gets fired before the transaction commits, the task may try to query an id that doesn't exist yet and throw an error. This should postpone @@ -104,6 +105,7 @@ def on_user_profile_updated(instance): def on_externalid_saved_txn(*args, **kwargs): """ Handle external_id saves in the middle of a transaction. + Handle saves in the middle of a transaction. If this gets fired before the transaction commits, the task may try to query an id that doesn't exist yet and throw an error. This should postpone @@ -161,6 +163,7 @@ def on_user_retirement( # pylint: disable=unused-argument # pragma: no cover def on_tag_saved_txn(*args, **kwargs): """ Handle external_id saves in the middle of a transaction. + Handle saves in the middle of a transaction. If this gets fired before the transaction commits, the task may try to query an id that doesn't exist yet and throw an error. This should postpone @@ -198,6 +201,7 @@ def on_tag_saved( # pylint: disable=unused-argument # pragma: no cover def on_taxonomy_saved_txn(*args, **kwargs): """ Handle external_id saves in the middle of a transaction. + Handle saves in the middle of a transaction. If this gets fired before the transaction commits, the task may try to query an id that doesn't exist yet and throw an error. This should postpone @@ -235,6 +239,7 @@ def on_taxonomy_saved( # pylint: disable=unused-argument # pragma: no cover def on_object_tag_saved_txn(*args, **kwargs): """ Handle external_id saves in the middle of a transaction. + Handle saves in the middle of a transaction. If this gets fired before the transaction commits, the task may try to query an id that doesn't exist yet and throw an error. This should postpone From 4c94b559b3c3eaf3e04895466c7f920fdc2fe549 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Tue, 1 Oct 2024 10:57:34 -0500 Subject: [PATCH 6/8] fix(sink): send args and kwargs --- platform_plugin_aspects/signals.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/platform_plugin_aspects/signals.py b/platform_plugin_aspects/signals.py index 6a350f6..6c528f6 100644 --- a/platform_plugin_aspects/signals.py +++ b/platform_plugin_aspects/signals.py @@ -64,7 +64,7 @@ def receive_course_enrollment_changed( # pylint: disable=unused-argument # pra ) -def on_user_profile_updated_txn(**kwargs): +def on_user_profile_updated_txn(*args, **kwargs): """ Handle user_profile saves in the middle of a transaction. @@ -74,7 +74,7 @@ def on_user_profile_updated_txn(**kwargs): queuing the Celery task until after the transaction is committed. """ - def on_user_profile_updated(instance): + def on_user_profile_updated(sender, instance, **kwargs): """ Queues the UserProfile dump job when the parent transaction is committed. """ @@ -90,7 +90,7 @@ def on_user_profile_updated(instance): object_id=str(instance.id), ) - transaction.on_commit(lambda: on_user_profile_updated(**kwargs)) + transaction.on_commit(lambda: on_user_profile_updated(*args, **kwargs)) # Connect the UserProfile.post_save signal handler only if we have a model to attach to. From 76b480f6996b49f55246392bced0a1bd3d3d6266 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Tue, 1 Oct 2024 11:04:32 -0500 Subject: [PATCH 7/8] chore: remove unused parameter --- platform_plugin_aspects/signals.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/platform_plugin_aspects/signals.py b/platform_plugin_aspects/signals.py index 6c528f6..3dba75b 100644 --- a/platform_plugin_aspects/signals.py +++ b/platform_plugin_aspects/signals.py @@ -74,7 +74,7 @@ def on_user_profile_updated_txn(*args, **kwargs): queuing the Celery task until after the transaction is committed. """ - def on_user_profile_updated(sender, instance, **kwargs): + def on_user_profile_updated(instance, **kwargs): """ Queues the UserProfile dump job when the parent transaction is committed. """ From 1fbe85c803356e338a05742b03724e503ddfb2da Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Fri, 4 Oct 2024 13:52:03 -0500 Subject: [PATCH 8/8] chore: bump version to v0.11.2 --- CHANGELOG.rst | 8 ++++++++ platform_plugin_aspects/__init__.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 983647a..48f616a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,14 @@ Change Log Unreleased ********** +0.11.2 - 2024-10-04 +******************* + +Fixes +===== + +* wait for transaction commit before trying to sink models. + 0.11.1 - 2024-09-06 ******************* diff --git a/platform_plugin_aspects/__init__.py b/platform_plugin_aspects/__init__.py index 1c3b1a0..61d8f8b 100644 --- a/platform_plugin_aspects/__init__.py +++ b/platform_plugin_aspects/__init__.py @@ -5,6 +5,6 @@ import os from pathlib import Path -__version__ = "0.11.1" +__version__ = "0.11.2" ROOT_DIRECTORY = Path(os.path.dirname(os.path.abspath(__file__)))