diff --git a/architecture-decisions/0004-deleting-records.md b/architecture-decisions/0004-deleting-records.md new file mode 100644 index 00000000..f3efe59a --- /dev/null +++ b/architecture-decisions/0004-deleting-records.md @@ -0,0 +1,60 @@ +# 4. Deleting Records + +Date: 2025-02-18 + +## Status + +Accepted + +## Context + +When resources are deleted in Figgy, a DeletionMarker resource is created at the +same time. The DeletionMarker stores the deleted resource's identifier, +resource type, and a serialized copy of the metadata (in the `deleted_object` +field). We need a method for processing DeletionMarkers in DPUL-C to remove the +corresponding record from the Solr index. + +## Decision + +#### Hydration Consumer + +1. We will process DeletionMarkers that reference a deleted resource with a +resource type that we currently index into DPUL-C. In addition, we will check +if a hydration cache entry exists for the deleted resource and discard the +DeletionMarker if not. +1. A special CacheMarker is created from the DeletionMarker that uses the + deleted resource's id as the id and the updated_at value from the + DeletionMarker as the timestamp. +1. Special hydration cache entry attributes are generated. The hydration cache + entry created from these attributes will replace the hydration cache entry of + the deleted resource. + - Existing metadata is replaced with a simple deleted => true kv pair + - The entry id is set to the deleted resource's id + - The entry internal_resource type is set to that of the deleted resource + +#### Transformation Consumer + +1. A special solr document is generated from the deleted object hydration cache +entry with the following structure. + ``` + %{ id: "id", deleted: true } + ``` + +#### Indexing Consumer + +1. Messages with the `deleted: true` field are handled sperately and assigned to + the `delete` batcher. +1. The delete batcher sends the deleted record ids to a the Solr.delete_batch + function which iterates over them, deletes each record, and then commits the + batch of deletes. The additional batcher doesn't create a potential race + condition because there is only one entry for the resource earlier in the + pipeline. + +## Consequences + +- DeletionMarkers will stay in the Figgy database unless the resource is +restored. This means that DPUL-C will have to triage an ever increasing number +over time. + +- Deleted resource hydration and transformation cache entries will stay in the +cache after the resource is remove from Solr until the next full reindex. diff --git a/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex b/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex index 8222521f..a3306e3f 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex @@ -29,6 +29,16 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer.CacheEntryMarker do end @spec from(%Resource{}) :: t() + def from(%Resource{ + updated_at: updated_at, + internal_resource: "DeletionMarker", + metadata: %{"resource_id" => [%{"id" => id}]} + }) do + # A CacheMarker for a DeletionMarker resource has a standard timestamp, but + # the id is set from the deleted resource id. + %__MODULE__{timestamp: updated_at, id: id} + end + def from(%Resource{updated_at: updated_at, id: id}) do %__MODULE__{timestamp: updated_at, id: id} end diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex index c592a65c..46e956e1 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex @@ -21,10 +21,25 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationCacheEntry do end @spec to_solr_document(%__MODULE__{}) :: %{} - def to_solr_document(hydration_cache_entry) do - %{record_id: id} = hydration_cache_entry - %{data: data = %{"metadata" => metadata}, related_data: related_data} = hydration_cache_entry + def to_solr_document(%{ + record_id: id, + data: %{ + "metadata" => %{"deleted" => true} + } + }) do + # Generate a small json document for deleted resources that indicates that + # the Solr record with that id should be deleted from the index. + %{ + id: id, + deleted: true + } + end + def to_solr_document(%{ + record_id: id, + data: data = %{"metadata" => metadata}, + related_data: related_data + }) do %{ id: id, title_txtm: extract_title(metadata), diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex index 521ef2dd..78ed803e 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex @@ -88,6 +88,44 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do |> Broadway.Message.put_data(message_map) end + def handle_message( + _processor, + message = %Broadway.Message{ + data: %{ + internal_resource: internal_resource, + metadata: %{ + "resource_id" => [%{"id" => resource_id}], + "resource_type" => [resource_type] + } + } + }, + %{cache_version: cache_version} + ) + when internal_resource in ["DeletionMarker"] and + resource_type in ["EphemeraFolder", "EphemeraTerm"] do + # Only process messages where the deleted resource has an existing + # hydration cache entry. If one does not exist, it means that the resource + # has not been indexed into DPUL-C. + hydration_cache_entry = + IndexingPipeline.get_hydration_cache_entry!(resource_id, cache_version) + + cond do + hydration_cache_entry -> + marker = CacheEntryMarker.from(message) + + message_map = + %{marker: marker, incoming_message_data: message.data} + |> Map.merge(Figgy.Resource.to_hydration_cache_attrs(message.data)) + + message + |> Broadway.Message.put_data(message_map) + + true -> + message + |> Broadway.Message.put_batcher(:noop) + end + end + # If it's not selected above, ack the message but don't do anything with it. def handle_message(_processor, message, _state) do message diff --git a/lib/dpul_collections/indexing_pipeline/figgy/indexing_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/indexing_consumer.ex index c6182d20..d03ec3c5 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/indexing_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/indexing_consumer.ex @@ -42,7 +42,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingConsumer do default: [] ], batchers: [ - default: [batch_size: options[:batch_size]] + default: [batch_size: options[:batch_size]], + delete: [batch_size: options[:batch_size]] ], context: %{ cache_version: options[:cache_version], @@ -55,10 +56,29 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingConsumer do # return the message so we can handle it in the batch @spec handle_message(any(), Broadway.Message.t(), %{required(:cache_version) => integer()}) :: Broadway.Message.t() + def handle_message(_processor, %{data: %{data: %{"deleted" => true}}} = message, %{ + cache_version: _cache_version + }) do + message + |> Broadway.Message.put_batcher(:delete) + end + def handle_message(_processor, message, %{cache_version: _cache_version}) do message end + @impl Broadway + @spec handle_batch(any(), list(Broadway.Message.t()), any(), any()) :: + list(Broadway.Message.t()) + def handle_batch(:delete, messages, _batch_info, context) do + messages + |> Enum.map(&unwrap/1) + |> Enum.map(& &1["id"]) + |> Solr.delete_batch(context[:write_collection]) + + messages + end + @impl Broadway @spec handle_batch(any(), list(Broadway.Message.t()), any(), any()) :: list(Broadway.Message.t()) diff --git a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex index a59ee872..7a652d20 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex @@ -22,6 +22,13 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do handled_data: map(), related_data: related_data() } + def to_hydration_cache_attrs(resource = %__MODULE__{internal_resource: "DeletionMarker"}) do + %{ + handled_data: resource |> to_map, + related_data: %{} + } + end + def to_hydration_cache_attrs(resource = %__MODULE__{internal_resource: "EphemeraTerm"}) do %{ handled_data: resource |> to_map, @@ -59,6 +66,22 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do end @spec to_map(resource :: %__MODULE__{}) :: map() + defp to_map(resource = %__MODULE__{internal_resource: "DeletionMarker"}) do + %{ + "resource_id" => [%{"id" => deleted_resource_id}], + "resource_type" => [deleted_resource_type] + } = resource.metadata + + %{ + id: deleted_resource_id, + internal_resource: deleted_resource_type, + lock_version: resource.lock_version, + created_at: resource.created_at, + updated_at: resource.updated_at, + metadata: %{"deleted" => true} + } + end + defp to_map(resource = %__MODULE__{}) do resource |> Map.from_struct() diff --git a/lib/dpul_collections/solr.ex b/lib/dpul_collections/solr.ex index 2a4eb1a8..074907ff 100644 --- a/lib/dpul_collections/solr.ex +++ b/lib/dpul_collections/solr.ex @@ -156,6 +156,14 @@ defmodule DpulCollections.Solr do ) end + @spec soft_commit(String.t()) :: {:ok, Req.Response.t()} | {:error, Exception.t()} + def soft_commit(collection \\ read_collection()) do + Req.get( + update_url(collection), + params: [commit: true, softCommit: true] + ) + end + @spec delete_all(String.t()) :: {:ok, Req.Response.t()} | {:error, Exception.t()} | Exception.t() def delete_all(collection \\ read_collection()) do @@ -167,6 +175,20 @@ defmodule DpulCollections.Solr do commit(collection) end + @spec delete_batch(list(), String.t()) :: + {:ok, Req.Response.t()} | {:error, Exception.t()} | Exception.t() + def delete_batch(ids, collection \\ read_collection()) do + ids + |> Enum.each(fn id -> + Req.post!( + update_url(collection), + json: %{delete: %{query: "id:#{id}"}} + ) + end) + + soft_commit(collection) + end + defp select_url(collection) do client() |> Req.merge(url: "/solr/#{collection}/select") diff --git a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs index 0dc908e5..4d8210d5 100644 --- a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs +++ b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs @@ -1,6 +1,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumerTest do use DpulCollections.DataCase + alias DpulCollections.IndexingPipeline alias DpulCollections.IndexingPipeline.Figgy describe "Figgy.HydrationConsumer" do @@ -62,18 +63,129 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumerTest do } } + file_set_deletion_marker_message = %Broadway.Message{ + acknowledger: nil, + data: %Figgy.Resource{ + id: "9773417d-1c36-4692-bf81-f387be688460", + updated_at: ~U[2025-01-02 19:47:21.726083Z], + internal_resource: "DeletionMarker", + metadata: %{ + "resource_id" => [%{"id" => "a521113e-e77a-4000-b00a-17c09b3aa757"}], + "resource_type" => ["FileSet"] + } + } + } + transformed_messages = [ ephemera_folder_message, pending_ephemera_folder_message, restricted_ephemera_folder_message, ephemera_term_message, - scanned_resource_message + scanned_resource_message, + file_set_deletion_marker_message ] |> Enum.map(&Figgy.HydrationConsumer.handle_message(nil, &1, %{cache_version: 1})) |> Enum.map(&Map.get(&1, :batcher)) - assert transformed_messages == [:default, :noop, :noop, :default, :noop] + assert transformed_messages == [:default, :noop, :noop, :default, :noop, :noop] + end + + test "handle_batch/3 only processes deletion markers with related resources in the HydrationCache" do + ephemera_folder_message = %Broadway.Message{ + acknowledger: nil, + data: %Figgy.Resource{ + id: "47276197-e223-471c-99d7-405c5f6c5285", + updated_at: ~U[2018-03-09 20:19:34.486004Z], + internal_resource: "EphemeraFolder", + metadata: %{ + "state" => ["complete"], + "visibility" => ["open"] + } + } + } + + # DeletionMarker that corresponds to a resource with a hydration cache entry + ephemera_folder_deletion_marker_message = %Broadway.Message{ + acknowledger: nil, + data: %Figgy.Resource{ + id: "dced9109-6980-4035-9764-84c08ed5d7db", + updated_at: ~U[2025-01-02 19:47:21.726083Z], + internal_resource: "DeletionMarker", + metadata: %{ + "resource_id" => [%{"id" => "47276197-e223-471c-99d7-405c5f6c5285"}], + "resource_type" => ["EphemeraFolder"] + } + } + } + + # DeletionMarker that does not correspond to a resource with a hydration cache entry + orphaned_deletion_marker_message1 = %Broadway.Message{ + acknowledger: nil, + data: %Figgy.Resource{ + id: "f8f62bdf-9d7b-438f-9870-1793358e5fe1", + updated_at: ~U[2025-01-02 19:47:21.726083Z], + internal_resource: "DeletionMarker", + metadata: %{ + "resource_id" => [%{"id" => "fc8d345b-6e87-461e-9182-41eaede1fab6"}], + "resource_type" => ["EphemeraFolder"] + } + } + } + + # DeletionMarker that does not correspond to a resource with a hydration cache entry + orphaned_deletion_marker_message2 = %Broadway.Message{ + acknowledger: nil, + data: %Figgy.Resource{ + id: "1a2e9bef-e50d-4a9c-81f1-8d8e82f3a8e4", + updated_at: ~U[2025-01-02 19:47:21.726083Z], + internal_resource: "DeletionMarker", + metadata: %{ + "resource_id" => [%{"id" => "e0d4e6f6-29f2-4fd7-9c8a-7293ae0d7689"}], + "resource_type" => ["EphemeraFolder"] + } + } + } + + # Create a hydration cache entry from an ephemera folder message + create_messages = + [ephemera_folder_message] + |> Enum.map(&Figgy.HydrationConsumer.handle_message(nil, &1, %{cache_version: 1})) + + Figgy.HydrationConsumer.handle_batch(:default, create_messages, nil, %{cache_version: 1}) + + # Process deletion marker messages + delete_messages = + [ + ephemera_folder_deletion_marker_message, + orphaned_deletion_marker_message1, + orphaned_deletion_marker_message2 + ] + |> Enum.map(&Figgy.HydrationConsumer.handle_message(nil, &1, %{cache_version: 1})) + + # Only the deletion marker message that has a corresponding resource with + # an exisiting hydration cache entry is handled by the default batcher. + batchers = delete_messages |> Enum.map(&Map.get(&1, :batcher)) + assert batchers == [:default, :noop, :noop] + + # Send the one message with corresponding resource to the default batch handler. + Figgy.HydrationConsumer.handle_batch(:default, [delete_messages |> hd], nil, %{ + cache_version: 1 + }) + + # A transformed hydration cache entry is created which replaces the + # existing ephemera folder hydration cache entry. It's metadata field + # only has the value `deleted` set to true. This signals the + # transformation consumer to create a transformation cache entry with a + # solr record that indicates it should be deleted from the index. + hydration_cache_entries = IndexingPipeline.list_hydration_cache_entries() + assert hydration_cache_entries |> length == 1 + + hydration_cache_entry = hydration_cache_entries |> hd + assert hydration_cache_entry.data["internal_resource"] == "EphemeraFolder" + assert hydration_cache_entry.record_id == ephemera_folder_message.data.id + assert hydration_cache_entry.data["id"] == ephemera_folder_message.data.id + assert hydration_cache_entry.data["metadata"]["deleted"] == true end end end diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index e86abbdc..0032edf9 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -16,11 +16,19 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do transformation_cache_entries = IndexingPipeline.list_transformation_cache_entries() |> length ephemera_folder_count = FiggyTestSupport.ephemera_folder_count() + # Count of resources to be deleted. In this test there is one added per + # DeletionMarker. A transformation cache entry with the key `deleted: true` + # stays in the transformation cache so the total number of records must take + # this into account. + deleted_resource_count = FiggyTestSupport.deletion_marker_count() + total_records = ephemera_folder_count + deleted_resource_count + continue = - if transformation_cache_entries == ephemera_folder_count do + if transformation_cache_entries == total_records do DpulCollections.Solr.commit(active_collection()) - if DpulCollections.Solr.document_count() == transformation_cache_entries do + if DpulCollections.Solr.document_count() == + transformation_cache_entries - deleted_resource_count do true end end @@ -44,6 +52,14 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do # dev and prod (slightly simplified) cache_version = 1 + # Pre-index records for testing deletes. DeletionMarkers in the test Figgy + # database do not have related resources. We need to add the resources so we + # can test that they get deleted. + records_to_be_deleted = + FiggyTestSupport.deletion_markers() + |> FiggyTestFixtures.resources_from_deletion_markers() + |> Enum.map(&FiggyTestSupport.index_record/1) + children = [ {Figgy.IndexingConsumer, cache_version: cache_version, batch_size: 50, write_collection: active_collection()}, @@ -70,16 +86,27 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do Task.await(task, 15000) - # the hydrator pulled all ephemera folders and terms + # The hydrator pulled all ephemera folders, terms, deletion markers and + # removed the hydration cache markers for the deletion marker deleted resource. entry_count = Repo.aggregate(Figgy.HydrationCacheEntry, :count) assert FiggyTestSupport.total_resource_count() == entry_count - # the transformer only processes ephemera folders + # The transformer processed ephemera folders and deletion markers + # removed the transformation cache markers for the deletion marker deleted resource. transformation_cache_entry_count = Repo.aggregate(Figgy.TransformationCacheEntry, :count) - assert FiggyTestSupport.ephemera_folder_count() == transformation_cache_entry_count - # indexed all the documents - assert Solr.document_count() == transformation_cache_entry_count + deletion_marker_count = FiggyTestSupport.deletion_marker_count() + total_transformed_count = FiggyTestSupport.ephemera_folder_count() + deletion_marker_count + + assert total_transformed_count == transformation_cache_entry_count + + # indexed all the documents and deleted the extra record solr doc + assert Solr.document_count() == transformation_cache_entry_count - deletion_marker_count + + # Ensure that deleted records from deletion markers are removed from Solr + Enum.each(records_to_be_deleted, fn record -> + assert Solr.find_by_id(record[:id]) == nil + end) # Ensure that the processor markers have the correct cache version hydration_processor_marker = IndexingPipeline.get_processor_marker!("figgy_hydrator", 1) @@ -108,7 +135,7 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do # Make sure it got reindexed assert latest_document["_version_"] != latest_document_again["_version_"] # Make sure we didn't add another one - assert Solr.document_count() == transformation_cache_entry_count + assert Solr.document_count() == transformation_cache_entry_count - deletion_marker_count # transformation entries weren't updated transformation_entry_again = Repo.get_by(Figgy.TransformationCacheEntry, record_id: latest_document["id"]) diff --git a/test/support/figgy_test_support.ex b/test/support/figgy_test_support.ex index 3f992a6a..3f1de525 100644 --- a/test/support/figgy_test_support.ex +++ b/test/support/figgy_test_support.ex @@ -9,7 +9,9 @@ defmodule FiggyTestSupport do def total_resource_count do query = from r in Figgy.Resource, - where: r.internal_resource == "EphemeraFolder" or r.internal_resource == "EphemeraTerm" + where: + r.internal_resource == "EphemeraFolder" or r.internal_resource == "EphemeraTerm" or + r.internal_resource == "DeletionMarker" FiggyRepo.aggregate(query, :count) end @@ -31,6 +33,51 @@ defmodule FiggyTestSupport do FiggyRepo.aggregate(query, :count) end + def deletion_marker_count do + query = + from r in Figgy.Resource, + where: r.internal_resource == "DeletionMarker" + + FiggyRepo.aggregate(query, :count) + end + + def deletion_markers do + query = + from r in Figgy.Resource, + where: r.internal_resource == "DeletionMarker" + + FiggyRepo.all(query) + end + + def index_record(record, cache_version \\ 1) do + # Write a current hydration marker right before that marker. + marker = IndexingPipeline.DatabaseProducer.CacheEntryMarker.from(record) + cache_attrs = Figgy.Resource.to_hydration_cache_attrs(record) + + {:ok, cache_entry} = + IndexingPipeline.write_hydration_cache_entry(%{ + cache_version: cache_version, + record_id: marker.id, + source_cache_order: marker.timestamp, + data: cache_attrs.handled_data + }) + + hydration_cache_entry = IndexingPipeline.get_hydration_cache_entry!(cache_entry.id) + solr_doc = Figgy.HydrationCacheEntry.to_solr_document(hydration_cache_entry) + + IndexingPipeline.write_transformation_cache_entry(%{ + cache_version: cache_version, + record_id: hydration_cache_entry |> Map.get(:record_id), + source_cache_order: hydration_cache_entry |> Map.get(:cache_order), + data: solr_doc + }) + + Solr.add(solr_doc) + Solr.commit() + + solr_doc + end + def index_record_id(id) do cache_version = 1 # Get record with a description. diff --git a/test/support/fixtures/figgy_test_fixtures.ex b/test/support/fixtures/figgy_test_fixtures.ex index 71443e00..6a3032d6 100644 --- a/test/support/fixtures/figgy_test_fixtures.ex +++ b/test/support/fixtures/figgy_test_fixtures.ex @@ -1,6 +1,7 @@ defmodule FiggyTestFixtures do alias DpulCollections.IndexingPipeline.DatabaseProducer.CacheEntryMarker alias DpulCollections.IndexingPipeline + alias DpulCollections.IndexingPipeline.Figgy # These are the first three known resource markers in the test database. # They're here so that if they change, we don't have to change them in the @@ -30,6 +31,34 @@ defmodule FiggyTestFixtures do |> CacheEntryMarker.from() end + def resources_from_deletion_markers(deletion_markers) do + deletion_markers + |> Enum.map(fn marker -> + %{ + "resource_id" => [%{"id" => id}], + "resource_type" => [resource_type], + "deleted_object" => [deleted_object] + } = marker.metadata + + %{"title" => title, "created_at" => created_at, "updated_at" => updated_at} = deleted_object + {:ok, utc_created_at, _} = DateTime.from_iso8601(created_at) + {:ok, utc_updated_at, _} = DateTime.from_iso8601(updated_at) + + %Figgy.Resource{ + id: id, + internal_resource: resource_type, + metadata: %{ + "title" => title, + "visibility" => ["open"], + "state" => ["complete"], + "member_ids" => [] + }, + created_at: utc_created_at |> DateTime.add(-365, :day), + updated_at: utc_updated_at |> DateTime.add(-365, :day) + } + end) + end + def hydration_cache_entries(cache_version \\ 0) do # This id actually corresponds to an EphemeraTerm # description, date_created and date range taken from diff --git a/test/support/mock_figgy_hydration_producer.ex b/test/support/mock_figgy_hydration_producer.ex index 20bd5494..67268a12 100644 --- a/test/support/mock_figgy_hydration_producer.ex +++ b/test/support/mock_figgy_hydration_producer.ex @@ -65,6 +65,7 @@ defmodule MockFiggyHydrationProducer do def process(demand, cache_version \\ 0) do # Get the PID for TestFiggyProducer GenServer, # then cast fulfill message to itself + Broadway.producer_names( String.to_existing_atom("#{Figgy.HydrationConsumer}_#{cache_version}") )