From 5cc948f1f91b3115cb8ca7091cfa0c30be40376a Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Thu, 13 Feb 2025 14:02:58 -0600 Subject: [PATCH 01/12] Delete solr documents linked to deletion markers --- .../figgy/hydration_cache_entry.ex | 20 +++++-- .../figgy/hydration_consumer.ex | 20 +++++++ .../figgy/indexing_consumer.ex | 22 +++++++- .../indexing_pipeline/figgy/resource.ex | 16 ++++++ .../figgy/transformation_consumer.ex | 19 +++++++ lib/dpul_collections/solr.ex | 14 +++++ .../figgy/hydration_consumer_test.exs | 18 ++++++- .../integration/full_integration_test.exs | 54 ++++++++++++++----- test/support/figgy_test_support.ex | 49 ++++++++++++++++- test/support/fixtures/figgy_test_fixtures.ex | 18 +++++++ test/support/mock_figgy_hydration_producer.ex | 1 + 11 files changed, 231 insertions(+), 20 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex index c592a65c..c302a96f 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex @@ -21,10 +21,24 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationCacheEntry do end @spec to_solr_document(%__MODULE__{}) :: %{} - def to_solr_document(hydration_cache_entry) do - %{record_id: id} = hydration_cache_entry - %{data: data = %{"metadata" => metadata}, related_data: related_data} = hydration_cache_entry + def to_solr_document(%{ + data: %{ + "internal_resource" => internal_resource, + "metadata" => %{"resource_id" => [%{"id" => id}]} + } + }) + when internal_resource in ["DeletionMarker"] do + %{ + id: id, + deleted: true + } + end + def to_solr_document(%{ + record_id: id, + data: data = %{"metadata" => metadata}, + related_data: related_data + }) do %{ id: id, title_txtm: extract_title(metadata), diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex index 521ef2dd..62bf70d9 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex @@ -88,6 +88,26 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do |> Broadway.Message.put_data(message_map) end + def handle_message( + _processor, + message = %Broadway.Message{ + data: %{ + internal_resource: internal_resource + } + }, + %{cache_version: _cache_version} + ) + when internal_resource in ["DeletionMarker"] do + marker = CacheEntryMarker.from(message) + + message_map = + %{marker: marker, incoming_message_data: message.data} + |> Map.merge(Figgy.Resource.to_hydration_cache_attrs(message.data)) + + message + |> Broadway.Message.put_data(message_map) + end + # If it's not selected above, ack the message but don't do anything with it. def handle_message(_processor, message, _state) do message diff --git a/lib/dpul_collections/indexing_pipeline/figgy/indexing_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/indexing_consumer.ex index c6182d20..d03ec3c5 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/indexing_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/indexing_consumer.ex @@ -42,7 +42,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingConsumer do default: [] ], batchers: [ - default: [batch_size: options[:batch_size]] + default: [batch_size: options[:batch_size]], + delete: [batch_size: options[:batch_size]] ], context: %{ cache_version: options[:cache_version], @@ -55,10 +56,29 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingConsumer do # return the message so we can handle it in the batch @spec handle_message(any(), Broadway.Message.t(), %{required(:cache_version) => integer()}) :: Broadway.Message.t() + def handle_message(_processor, %{data: %{data: %{"deleted" => true}}} = message, %{ + cache_version: _cache_version + }) do + message + |> Broadway.Message.put_batcher(:delete) + end + def handle_message(_processor, message, %{cache_version: _cache_version}) do message end + @impl Broadway + @spec handle_batch(any(), list(Broadway.Message.t()), any(), any()) :: + list(Broadway.Message.t()) + def handle_batch(:delete, messages, _batch_info, context) do + messages + |> Enum.map(&unwrap/1) + |> Enum.map(& &1["id"]) + |> Solr.delete_batch(context[:write_collection]) + + messages + end + @impl Broadway @spec handle_batch(any(), list(Broadway.Message.t()), any(), any()) :: list(Broadway.Message.t()) diff --git a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex index 4118c358..47279ec5 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex @@ -22,6 +22,13 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do handled_data: map(), related_data: related_data() } + def to_hydration_cache_attrs(resource = %__MODULE__{internal_resource: "DeletionMarker"}) do + %{ + handled_data: resource |> to_map, + related_data: %{} + } + end + def to_hydration_cache_attrs(resource = %__MODULE__{internal_resource: "EphemeraTerm"}) do %{ handled_data: resource |> to_map, @@ -65,6 +72,15 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do end @spec to_map(resource :: %__MODULE__{}) :: map() + defp to_map(resource = %__MODULE__{internal_resource: "DeletionMarker"}) do + %{"resource_id" => [%{"id" => deleted_resource_id}]} = resource.metadata + + resource + |> Map.from_struct() + |> Map.delete(:__meta__) + |> Map.put(:id, deleted_resource_id) + end + defp to_map(resource = %__MODULE__{}) do resource |> Map.from_struct() diff --git a/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex index c05d3aed..9d16bfb9 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex @@ -68,6 +68,25 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do }) end + def handle_message( + _processor, + message = %Broadway.Message{ + data: hydration_cache_entry = %{data: %{"internal_resource" => internal_resource}} + }, + %{cache_version: _cache_version} + ) + when internal_resource in ["DeletionMarker"] do + solr_doc = Figgy.HydrationCacheEntry.to_solr_document(hydration_cache_entry) + marker = CacheEntryMarker.from(message) + + message + |> Message.put_data(%{ + marker: marker, + incoming_message_data: hydration_cache_entry, + handled_data: solr_doc + }) + end + # If it's not matched above, put it in the no-op batcher - we want to ack it # but not save it. def handle_message(_processor, message, _) do diff --git a/lib/dpul_collections/solr.ex b/lib/dpul_collections/solr.ex index dffbb1e5..f5cdbad0 100644 --- a/lib/dpul_collections/solr.ex +++ b/lib/dpul_collections/solr.ex @@ -149,6 +149,20 @@ defmodule DpulCollections.Solr do commit(collection) end + @spec delete_batch(list()) :: + {:ok, Req.Response.t()} | {:error, Exception.t()} | Exception.t() + def delete_batch(ids, collection \\ read_collection()) do + ids + |> Enum.each(fn id -> + Req.post!( + update_url(collection), + json: %{delete: %{query: "id:#{id}"}} + ) + end) + + commit(collection) + end + defp select_url(collection) do client() |> Req.merge(url: "/solr/#{collection}/select") diff --git a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs index 0dc908e5..0074876c 100644 --- a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs +++ b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs @@ -62,18 +62,32 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumerTest do } } + deletion_marker_message = %Broadway.Message{ + acknowledger: nil, + data: %Figgy.Resource{ + id: "f8f62bdf-9d7b-438f-9870-1793358e5fe1", + updated_at: ~U[2025-01-02 19:47:21.726083Z], + internal_resource: "DeletionMarker", + metadata: %{ + "resource_id" => [%{"id" => "fc8d345b-6e87-461e-9182-41eaede1fab6"}], + "resource_type" => ["EphemeraFolder"] + } + } + } + transformed_messages = [ ephemera_folder_message, pending_ephemera_folder_message, restricted_ephemera_folder_message, ephemera_term_message, - scanned_resource_message + scanned_resource_message, + deletion_marker_message ] |> Enum.map(&Figgy.HydrationConsumer.handle_message(nil, &1, %{cache_version: 1})) |> Enum.map(&Map.get(&1, :batcher)) - assert transformed_messages == [:default, :noop, :noop, :default, :noop] + assert transformed_messages == [:default, :noop, :noop, :default, :noop, :default] end end end diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index e86abbdc..ff119676 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -12,20 +12,23 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do on_exit(fn -> Solr.delete_all(active_collection()) end) end - def wait_for_index_completion() do + def wait_for_index_completion(additional_record_count \\ 0) do transformation_cache_entries = IndexingPipeline.list_transformation_cache_entries() |> length ephemera_folder_count = FiggyTestSupport.ephemera_folder_count() + deletion_marker_count = FiggyTestSupport.deletion_marker_count() + total_records = ephemera_folder_count + deletion_marker_count + additional_record_count continue = - if transformation_cache_entries == ephemera_folder_count do + if transformation_cache_entries == total_records do DpulCollections.Solr.commit(active_collection()) - if DpulCollections.Solr.document_count() == transformation_cache_entries do + if DpulCollections.Solr.document_count() == + transformation_cache_entries - deletion_marker_count - additional_record_count do true end end - continue || (:timer.sleep(100) && wait_for_index_completion()) + continue || (:timer.sleep(100) && wait_for_index_completion(additional_record_count)) end def wait_for_solr_version_change(doc = %{"_version_" => version, "id" => id}) do @@ -39,11 +42,31 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do end end + def index_synthetic_records_for_deletion_markers() do + FiggyTestSupport.deletion_markers() + |> Enum.map(fn marker -> + %{"resource_id" => [%{"id" => id}]} = marker.metadata + id + end) + |> Enum.map(&index_synthetic_record/1) + end + + def index_synthetic_record(id) do + FiggyTestFixtures.ephemera_folder_resource(id) + |> FiggyTestSupport.index_record() + end + test "a full pipeline run of all 3 stages, then re-run of each stage" do # Start the figgy pipeline in a way that mimics how it is started in # dev and prod (slightly simplified) cache_version = 1 + # Pre-index records for testing deletes. DeletionMarkers in the test Figgy + # database do not have related resources. We need to add the resources so we + # can test that they get deleted. + synthetic_records = index_synthetic_records_for_deletion_markers() + synthetic_record_count = synthetic_records |> length + children = [ {Figgy.IndexingConsumer, cache_version: cache_version, batch_size: 50, write_collection: active_collection()}, @@ -66,20 +89,25 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do Supervisor.start_link(children, strategy: :one_for_one, name: DpulCollections.TestSupervisor) task = - Task.async(fn -> wait_for_index_completion() end) + Task.async(fn -> wait_for_index_completion(synthetic_record_count) end) Task.await(task, 15000) - # the hydrator pulled all ephemera folders and terms - entry_count = Repo.aggregate(Figgy.HydrationCacheEntry, :count) + # the hydrator pulled all ephemera folders, terms, deletion markers + entry_count = Repo.aggregate(Figgy.HydrationCacheEntry, :count) - synthetic_record_count assert FiggyTestSupport.total_resource_count() == entry_count - # the transformer only processes ephemera folders - transformation_cache_entry_count = Repo.aggregate(Figgy.TransformationCacheEntry, :count) - assert FiggyTestSupport.ephemera_folder_count() == transformation_cache_entry_count + # the transformer processes ephemera folders and deletion markers + transformation_cache_entry_count = + Repo.aggregate(Figgy.TransformationCacheEntry, :count) - synthetic_record_count + + deletion_marker_count = FiggyTestSupport.deletion_marker_count() + total_transformed_count = FiggyTestSupport.ephemera_folder_count() + deletion_marker_count + + assert total_transformed_count == transformation_cache_entry_count - # indexed all the documents - assert Solr.document_count() == transformation_cache_entry_count + # indexed all the documents and deleted the extra record solr doc + assert Solr.document_count() == transformation_cache_entry_count - deletion_marker_count # Ensure that the processor markers have the correct cache version hydration_processor_marker = IndexingPipeline.get_processor_marker!("figgy_hydrator", 1) @@ -108,7 +136,7 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do # Make sure it got reindexed assert latest_document["_version_"] != latest_document_again["_version_"] # Make sure we didn't add another one - assert Solr.document_count() == transformation_cache_entry_count + assert Solr.document_count() == transformation_cache_entry_count - deletion_marker_count # transformation entries weren't updated transformation_entry_again = Repo.get_by(Figgy.TransformationCacheEntry, record_id: latest_document["id"]) diff --git a/test/support/figgy_test_support.ex b/test/support/figgy_test_support.ex index 3f992a6a..3f1de525 100644 --- a/test/support/figgy_test_support.ex +++ b/test/support/figgy_test_support.ex @@ -9,7 +9,9 @@ defmodule FiggyTestSupport do def total_resource_count do query = from r in Figgy.Resource, - where: r.internal_resource == "EphemeraFolder" or r.internal_resource == "EphemeraTerm" + where: + r.internal_resource == "EphemeraFolder" or r.internal_resource == "EphemeraTerm" or + r.internal_resource == "DeletionMarker" FiggyRepo.aggregate(query, :count) end @@ -31,6 +33,51 @@ defmodule FiggyTestSupport do FiggyRepo.aggregate(query, :count) end + def deletion_marker_count do + query = + from r in Figgy.Resource, + where: r.internal_resource == "DeletionMarker" + + FiggyRepo.aggregate(query, :count) + end + + def deletion_markers do + query = + from r in Figgy.Resource, + where: r.internal_resource == "DeletionMarker" + + FiggyRepo.all(query) + end + + def index_record(record, cache_version \\ 1) do + # Write a current hydration marker right before that marker. + marker = IndexingPipeline.DatabaseProducer.CacheEntryMarker.from(record) + cache_attrs = Figgy.Resource.to_hydration_cache_attrs(record) + + {:ok, cache_entry} = + IndexingPipeline.write_hydration_cache_entry(%{ + cache_version: cache_version, + record_id: marker.id, + source_cache_order: marker.timestamp, + data: cache_attrs.handled_data + }) + + hydration_cache_entry = IndexingPipeline.get_hydration_cache_entry!(cache_entry.id) + solr_doc = Figgy.HydrationCacheEntry.to_solr_document(hydration_cache_entry) + + IndexingPipeline.write_transformation_cache_entry(%{ + cache_version: cache_version, + record_id: hydration_cache_entry |> Map.get(:record_id), + source_cache_order: hydration_cache_entry |> Map.get(:cache_order), + data: solr_doc + }) + + Solr.add(solr_doc) + Solr.commit() + + solr_doc + end + def index_record_id(id) do cache_version = 1 # Get record with a description. diff --git a/test/support/fixtures/figgy_test_fixtures.ex b/test/support/fixtures/figgy_test_fixtures.ex index 71443e00..b28f9dc9 100644 --- a/test/support/fixtures/figgy_test_fixtures.ex +++ b/test/support/fixtures/figgy_test_fixtures.ex @@ -1,6 +1,7 @@ defmodule FiggyTestFixtures do alias DpulCollections.IndexingPipeline.DatabaseProducer.CacheEntryMarker alias DpulCollections.IndexingPipeline + alias DpulCollections.IndexingPipeline.Figgy # These are the first three known resource markers in the test database. # They're here so that if they change, we don't have to change them in the @@ -30,6 +31,23 @@ defmodule FiggyTestFixtures do |> CacheEntryMarker.from() end + def ephemera_folder_resource(id) do + %Figgy.Resource{ + id: id, + internal_resource: "EphemeraFolder", + metadata: %{ + "title" => ["Deleted Folder"], + "visibility" => ["open"], + "downloadable" => ["none"], + "read_groups" => ["public"], + "member_ids" => [], + "state" => ["complete"] + }, + created_at: ~U[2017-03-09 20:19:33.414040Z], + updated_at: ~U[2017-03-09 20:19:33.414040Z] + } + end + def hydration_cache_entries(cache_version \\ 0) do # This id actually corresponds to an EphemeraTerm # description, date_created and date range taken from diff --git a/test/support/mock_figgy_hydration_producer.ex b/test/support/mock_figgy_hydration_producer.ex index 20bd5494..67268a12 100644 --- a/test/support/mock_figgy_hydration_producer.ex +++ b/test/support/mock_figgy_hydration_producer.ex @@ -65,6 +65,7 @@ defmodule MockFiggyHydrationProducer do def process(demand, cache_version \\ 0) do # Get the PID for TestFiggyProducer GenServer, # then cast fulfill message to itself + Broadway.producer_names( String.to_existing_atom("#{Figgy.HydrationConsumer}_#{cache_version}") ) From 1618c6829784bebf7cd6e1f158c6361cb138fbc9 Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Fri, 14 Feb 2025 14:53:33 -0600 Subject: [PATCH 02/12] Remove hydration and transformation cache entries for deleted items --- lib/dpul_collections/indexing_pipeline.ex | 11 +++++++ .../figgy/hydration_consumer.ex | 15 ++++++++++ .../figgy/transformation_consumer.ex | 15 ++++++++++ .../figgy/hydration_consumer_test.exs | 2 +- .../integration/full_integration_test.exs | 29 +++++++++++-------- 5 files changed, 59 insertions(+), 13 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline.ex b/lib/dpul_collections/indexing_pipeline.ex index e70c5338..722fb236 100644 --- a/lib/dpul_collections/indexing_pipeline.ex +++ b/lib/dpul_collections/indexing_pipeline.ex @@ -58,6 +58,8 @@ defmodule DpulCollections.IndexingPipeline do Repo.delete(hydration_cache_entry) end + def delete_hydration_cache_entry(_), do: nil + @doc """ Writes or updates hydration cache entries. """ @@ -317,6 +319,13 @@ defmodule DpulCollections.IndexingPipeline do """ def get_transformation_cache_entry!(id), do: Repo.get!(Figgy.TransformationCacheEntry, id) + def get_transformation_cache_entry!(record_id, cache_version) do + Repo.get_by(Figgy.TransformationCacheEntry, + record_id: record_id, + cache_version: cache_version + ) + end + @spec get_transformation_cache_entries_since!( marker :: CacheEntryMarker.t(), count :: integer, @@ -372,6 +381,8 @@ defmodule DpulCollections.IndexingPipeline do Repo.delete(transformation_cache_entry) end + def delete_transformation_cache_entry(_), do: nil + @doc """ Writes or updates transformation cache entries. """ diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex index 62bf70d9..beeacd03 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex @@ -38,6 +38,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do ], batchers: [ default: [batch_size: options[:batch_size]], + delete: [batch_size: options[:batch_size]], noop: [batch_size: options[:batch_size]] ], context: %{cache_version: options[:cache_version]} @@ -106,6 +107,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do message |> Broadway.Message.put_data(message_map) + |> Broadway.Message.put_batcher(:delete) end # If it's not selected above, ack the message but don't do anything with it. @@ -144,6 +146,19 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do messages end + def handle_batch(:delete, messages, _batch_info, %{cache_version: cache_version}) do + # Delete the hydration cache entries for each of the resources to be deleted + Enum.each(messages, fn message -> + message.data.handled_data.id + |> IndexingPipeline.get_hydration_cache_entry!(cache_version) + |> IndexingPipeline.delete_hydration_cache_entry() + end) + + # Write the deletion marker to the hydration cache + Enum.each(messages, &write_to_hydration_cache(&1, cache_version)) + messages + end + def handle_batch(:noop, messages, _batch_info, _state) do messages end diff --git a/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex index 9d16bfb9..fbebe95e 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex @@ -40,6 +40,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do ], batchers: [ default: [batch_size: options[:batch_size]], + delete: [batch_size: options[:batch_size]], noop: [concurrency: 5, batch_size: options[:batch_size]] ], context: %{cache_version: options[:cache_version]} @@ -85,6 +86,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do incoming_message_data: hydration_cache_entry, handled_data: solr_doc }) + |> Broadway.Message.put_batcher(:delete) end # If it's not matched above, put it in the no-op batcher - we want to ack it @@ -101,6 +103,19 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do messages end + def handle_batch(:delete, messages, _batch_info, %{cache_version: cache_version}) do + # Delete the hydration cache entries for each of the resources to be deleted + Enum.each(messages, fn message -> + message.data.handled_data.id + |> IndexingPipeline.get_transformation_cache_entry!(cache_version) + |> IndexingPipeline.delete_transformation_cache_entry() + end) + + # Write the deletion marker to the transformation cache + Enum.each(messages, &write_to_transformation_cache(&1, cache_version)) + messages + end + def handle_batch(:default, messages, _batch_info, %{cache_version: cache_version}) do Enum.each(messages, &write_to_transformation_cache(&1, cache_version)) messages diff --git a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs index 0074876c..c2ef6722 100644 --- a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs +++ b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs @@ -87,7 +87,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumerTest do |> Enum.map(&Figgy.HydrationConsumer.handle_message(nil, &1, %{cache_version: 1})) |> Enum.map(&Map.get(&1, :batcher)) - assert transformed_messages == [:default, :noop, :noop, :default, :noop, :default] + assert transformed_messages == [:default, :noop, :noop, :default, :noop, :delete] end end end diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index ff119676..8fae05ff 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -12,23 +12,23 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do on_exit(fn -> Solr.delete_all(active_collection()) end) end - def wait_for_index_completion(additional_record_count \\ 0) do + def wait_for_index_completion() do transformation_cache_entries = IndexingPipeline.list_transformation_cache_entries() |> length ephemera_folder_count = FiggyTestSupport.ephemera_folder_count() deletion_marker_count = FiggyTestSupport.deletion_marker_count() - total_records = ephemera_folder_count + deletion_marker_count + additional_record_count + total_records = ephemera_folder_count + deletion_marker_count continue = if transformation_cache_entries == total_records do DpulCollections.Solr.commit(active_collection()) if DpulCollections.Solr.document_count() == - transformation_cache_entries - deletion_marker_count - additional_record_count do + transformation_cache_entries - deletion_marker_count do true end end - continue || (:timer.sleep(100) && wait_for_index_completion(additional_record_count)) + continue || (:timer.sleep(100) && wait_for_index_completion()) end def wait_for_solr_version_change(doc = %{"_version_" => version, "id" => id}) do @@ -64,8 +64,7 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do # Pre-index records for testing deletes. DeletionMarkers in the test Figgy # database do not have related resources. We need to add the resources so we # can test that they get deleted. - synthetic_records = index_synthetic_records_for_deletion_markers() - synthetic_record_count = synthetic_records |> length + records_to_be_deleted = index_synthetic_records_for_deletion_markers() children = [ {Figgy.IndexingConsumer, @@ -89,17 +88,18 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do Supervisor.start_link(children, strategy: :one_for_one, name: DpulCollections.TestSupervisor) task = - Task.async(fn -> wait_for_index_completion(synthetic_record_count) end) + Task.async(fn -> wait_for_index_completion() end) Task.await(task, 15000) - # the hydrator pulled all ephemera folders, terms, deletion markers - entry_count = Repo.aggregate(Figgy.HydrationCacheEntry, :count) - synthetic_record_count + # The hydrator pulled all ephemera folders, terms, deletion markers and + # removed the hydration cache markers for the deletion marker deleted resource. + entry_count = Repo.aggregate(Figgy.HydrationCacheEntry, :count) assert FiggyTestSupport.total_resource_count() == entry_count - # the transformer processes ephemera folders and deletion markers - transformation_cache_entry_count = - Repo.aggregate(Figgy.TransformationCacheEntry, :count) - synthetic_record_count + # The transformer processed ephemera folders and deletion markers + # removed the transformation cache markers for the deletion marker deleted resource. + transformation_cache_entry_count = Repo.aggregate(Figgy.TransformationCacheEntry, :count) deletion_marker_count = FiggyTestSupport.deletion_marker_count() total_transformed_count = FiggyTestSupport.ephemera_folder_count() + deletion_marker_count @@ -109,6 +109,11 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do # indexed all the documents and deleted the extra record solr doc assert Solr.document_count() == transformation_cache_entry_count - deletion_marker_count + # Ensure that deleted records from deletion markers are removed from Solr + Enum.each(records_to_be_deleted, fn record -> + assert Solr.find_by_id(record[:id]) == nil + end) + # Ensure that the processor markers have the correct cache version hydration_processor_marker = IndexingPipeline.get_processor_marker!("figgy_hydrator", 1) From 7aa6abd439b85857df5527fd9c6ce8a086b8d286 Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Mon, 17 Feb 2025 13:25:37 -0600 Subject: [PATCH 03/12] Refactor record fixtures --- lib/dpul_collections/solr.ex | 2 +- .../integration/full_integration_test.exs | 19 ++------- test/support/fixtures/figgy_test_fixtures.ex | 41 ++++++++++++------- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/lib/dpul_collections/solr.ex b/lib/dpul_collections/solr.ex index f5cdbad0..dd8b34ba 100644 --- a/lib/dpul_collections/solr.ex +++ b/lib/dpul_collections/solr.ex @@ -149,7 +149,7 @@ defmodule DpulCollections.Solr do commit(collection) end - @spec delete_batch(list()) :: + @spec delete_batch(list(), String.t()) :: {:ok, Req.Response.t()} | {:error, Exception.t()} | Exception.t() def delete_batch(ids, collection \\ read_collection()) do ids diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index 8fae05ff..2487a7b0 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -42,20 +42,6 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do end end - def index_synthetic_records_for_deletion_markers() do - FiggyTestSupport.deletion_markers() - |> Enum.map(fn marker -> - %{"resource_id" => [%{"id" => id}]} = marker.metadata - id - end) - |> Enum.map(&index_synthetic_record/1) - end - - def index_synthetic_record(id) do - FiggyTestFixtures.ephemera_folder_resource(id) - |> FiggyTestSupport.index_record() - end - test "a full pipeline run of all 3 stages, then re-run of each stage" do # Start the figgy pipeline in a way that mimics how it is started in # dev and prod (slightly simplified) @@ -64,7 +50,10 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do # Pre-index records for testing deletes. DeletionMarkers in the test Figgy # database do not have related resources. We need to add the resources so we # can test that they get deleted. - records_to_be_deleted = index_synthetic_records_for_deletion_markers() + records_to_be_deleted = + FiggyTestSupport.deletion_markers() + |> FiggyTestFixtures.resources_from_deletion_markers() + |> Enum.map(&FiggyTestSupport.index_record/1) children = [ {Figgy.IndexingConsumer, diff --git a/test/support/fixtures/figgy_test_fixtures.ex b/test/support/fixtures/figgy_test_fixtures.ex index b28f9dc9..6a3032d6 100644 --- a/test/support/fixtures/figgy_test_fixtures.ex +++ b/test/support/fixtures/figgy_test_fixtures.ex @@ -31,21 +31,32 @@ defmodule FiggyTestFixtures do |> CacheEntryMarker.from() end - def ephemera_folder_resource(id) do - %Figgy.Resource{ - id: id, - internal_resource: "EphemeraFolder", - metadata: %{ - "title" => ["Deleted Folder"], - "visibility" => ["open"], - "downloadable" => ["none"], - "read_groups" => ["public"], - "member_ids" => [], - "state" => ["complete"] - }, - created_at: ~U[2017-03-09 20:19:33.414040Z], - updated_at: ~U[2017-03-09 20:19:33.414040Z] - } + def resources_from_deletion_markers(deletion_markers) do + deletion_markers + |> Enum.map(fn marker -> + %{ + "resource_id" => [%{"id" => id}], + "resource_type" => [resource_type], + "deleted_object" => [deleted_object] + } = marker.metadata + + %{"title" => title, "created_at" => created_at, "updated_at" => updated_at} = deleted_object + {:ok, utc_created_at, _} = DateTime.from_iso8601(created_at) + {:ok, utc_updated_at, _} = DateTime.from_iso8601(updated_at) + + %Figgy.Resource{ + id: id, + internal_resource: resource_type, + metadata: %{ + "title" => title, + "visibility" => ["open"], + "state" => ["complete"], + "member_ids" => [] + }, + created_at: utc_created_at |> DateTime.add(-365, :day), + updated_at: utc_updated_at |> DateTime.add(-365, :day) + } + end) end def hydration_cache_entries(cache_version \\ 0) do From baf0cc7f786355657ab00d6ea96dfcd29a2def23 Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Mon, 17 Feb 2025 15:01:31 -0600 Subject: [PATCH 04/12] Filter DeletionMarkers by resource type --- .../figgy/hydration_consumer.ex | 8 ++++++-- .../figgy/hydration_consumer_test.exs | 20 ++++++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex index beeacd03..4f504af1 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex @@ -93,12 +93,16 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do _processor, message = %Broadway.Message{ data: %{ - internal_resource: internal_resource + internal_resource: internal_resource, + metadata: %{ + "resource_type" => [resource_type] + } } }, %{cache_version: _cache_version} ) - when internal_resource in ["DeletionMarker"] do + when internal_resource in ["DeletionMarker"] and + resource_type in ["EphemeraFolder", "EphemeraTerm"] do marker = CacheEntryMarker.from(message) message_map = diff --git a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs index c2ef6722..7bb26afd 100644 --- a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs +++ b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs @@ -62,7 +62,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumerTest do } } - deletion_marker_message = %Broadway.Message{ + ephemera_folder_deletion_marker_message = %Broadway.Message{ acknowledger: nil, data: %Figgy.Resource{ id: "f8f62bdf-9d7b-438f-9870-1793358e5fe1", @@ -75,6 +75,19 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumerTest do } } + file_set_deletion_marker_message = %Broadway.Message{ + acknowledger: nil, + data: %Figgy.Resource{ + id: "9773417d-1c36-4692-bf81-f387be688460", + updated_at: ~U[2025-01-02 19:47:21.726083Z], + internal_resource: "DeletionMarker", + metadata: %{ + "resource_id" => [%{"id" => "a521113e-e77a-4000-b00a-17c09b3aa757"}], + "resource_type" => ["FileSet"] + } + } + } + transformed_messages = [ ephemera_folder_message, @@ -82,12 +95,13 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumerTest do restricted_ephemera_folder_message, ephemera_term_message, scanned_resource_message, - deletion_marker_message + ephemera_folder_deletion_marker_message, + file_set_deletion_marker_message ] |> Enum.map(&Figgy.HydrationConsumer.handle_message(nil, &1, %{cache_version: 1})) |> Enum.map(&Map.get(&1, :batcher)) - assert transformed_messages == [:default, :noop, :noop, :default, :noop, :delete] + assert transformed_messages == [:default, :noop, :noop, :default, :noop, :delete, :noop] end end end From d91ef420458e96bfbc140c193672f095ba2c49f8 Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Mon, 17 Feb 2025 20:06:07 -0600 Subject: [PATCH 05/12] Do not cache DeletionMarkers directly --- .../database_producer/cache_entry_marker.ex | 8 +++++++ .../figgy/hydration_cache_entry.ex | 7 +++--- .../figgy/hydration_consumer.ex | 15 ------------- .../indexing_pipeline/figgy/resource.ex | 6 ++++- .../figgy/transformation_consumer.ex | 22 +++---------------- .../figgy/hydration_consumer_test.exs | 2 +- .../integration/full_integration_test.exs | 11 +++++++--- 7 files changed, 28 insertions(+), 43 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex b/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex index 8222521f..d09fa281 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex @@ -29,6 +29,14 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer.CacheEntryMarker do end @spec from(%Resource{}) :: t() + def from(%Resource{ + updated_at: updated_at, + internal_resource: "DeletionMarker", + metadata: %{"resource_id" => [%{"id" => id}]} + }) do + %__MODULE__{timestamp: updated_at, id: id} + end + def from(%Resource{updated_at: updated_at, id: id}) do %__MODULE__{timestamp: updated_at, id: id} end diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex index c302a96f..5262d282 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex @@ -22,12 +22,11 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationCacheEntry do @spec to_solr_document(%__MODULE__{}) :: %{} def to_solr_document(%{ + record_id: id, data: %{ - "internal_resource" => internal_resource, - "metadata" => %{"resource_id" => [%{"id" => id}]} + "metadata" => %{"deleted" => true} } - }) - when internal_resource in ["DeletionMarker"] do + }) do %{ id: id, deleted: true diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex index 4f504af1..83f0b364 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex @@ -38,7 +38,6 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do ], batchers: [ default: [batch_size: options[:batch_size]], - delete: [batch_size: options[:batch_size]], noop: [batch_size: options[:batch_size]] ], context: %{cache_version: options[:cache_version]} @@ -111,7 +110,6 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do message |> Broadway.Message.put_data(message_map) - |> Broadway.Message.put_batcher(:delete) end # If it's not selected above, ack the message but don't do anything with it. @@ -150,19 +148,6 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do messages end - def handle_batch(:delete, messages, _batch_info, %{cache_version: cache_version}) do - # Delete the hydration cache entries for each of the resources to be deleted - Enum.each(messages, fn message -> - message.data.handled_data.id - |> IndexingPipeline.get_hydration_cache_entry!(cache_version) - |> IndexingPipeline.delete_hydration_cache_entry() - end) - - # Write the deletion marker to the hydration cache - Enum.each(messages, &write_to_hydration_cache(&1, cache_version)) - messages - end - def handle_batch(:noop, messages, _batch_info, _state) do messages end diff --git a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex index 47279ec5..c4619afc 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex @@ -73,12 +73,16 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do @spec to_map(resource :: %__MODULE__{}) :: map() defp to_map(resource = %__MODULE__{internal_resource: "DeletionMarker"}) do - %{"resource_id" => [%{"id" => deleted_resource_id}]} = resource.metadata + %{"resource_id" => [%{"id" => deleted_resource_id}], "resource_type" => [resource_type]} = + resource.metadata resource |> Map.from_struct() |> Map.delete(:__meta__) + |> Map.delete(:metadata) + |> Map.put(:metadata, %{"deleted" => true}) |> Map.put(:id, deleted_resource_id) + |> Map.put(:internal_resource, resource_type) end defp to_map(resource = %__MODULE__{}) do diff --git a/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex index fbebe95e..871864c0 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex @@ -40,7 +40,6 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do ], batchers: [ default: [batch_size: options[:batch_size]], - delete: [batch_size: options[:batch_size]], noop: [concurrency: 5, batch_size: options[:batch_size]] ], context: %{cache_version: options[:cache_version]} @@ -53,11 +52,10 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do def handle_message( _processor, message = %Broadway.Message{ - data: hydration_cache_entry = %{data: %{"internal_resource" => internal_resource}} + data: hydration_cache_entry = %{data: %{"metadata" => %{"deleted" => true}}} }, %{cache_version: _cache_version} - ) - when internal_resource in ["EphemeraFolder"] do + ) do solr_doc = Figgy.HydrationCacheEntry.to_solr_document(hydration_cache_entry) marker = CacheEntryMarker.from(message) @@ -76,7 +74,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do }, %{cache_version: _cache_version} ) - when internal_resource in ["DeletionMarker"] do + when internal_resource in ["EphemeraFolder"] do solr_doc = Figgy.HydrationCacheEntry.to_solr_document(hydration_cache_entry) marker = CacheEntryMarker.from(message) @@ -86,7 +84,6 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do incoming_message_data: hydration_cache_entry, handled_data: solr_doc }) - |> Broadway.Message.put_batcher(:delete) end # If it's not matched above, put it in the no-op batcher - we want to ack it @@ -103,19 +100,6 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do messages end - def handle_batch(:delete, messages, _batch_info, %{cache_version: cache_version}) do - # Delete the hydration cache entries for each of the resources to be deleted - Enum.each(messages, fn message -> - message.data.handled_data.id - |> IndexingPipeline.get_transformation_cache_entry!(cache_version) - |> IndexingPipeline.delete_transformation_cache_entry() - end) - - # Write the deletion marker to the transformation cache - Enum.each(messages, &write_to_transformation_cache(&1, cache_version)) - messages - end - def handle_batch(:default, messages, _batch_info, %{cache_version: cache_version}) do Enum.each(messages, &write_to_transformation_cache(&1, cache_version)) messages diff --git a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs index 7bb26afd..54b0291e 100644 --- a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs +++ b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs @@ -101,7 +101,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumerTest do |> Enum.map(&Figgy.HydrationConsumer.handle_message(nil, &1, %{cache_version: 1})) |> Enum.map(&Map.get(&1, :batcher)) - assert transformed_messages == [:default, :noop, :noop, :default, :noop, :delete, :noop] + assert transformed_messages == [:default, :noop, :noop, :default, :noop, :default, :noop] end end end diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index 2487a7b0..0032edf9 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -15,15 +15,20 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do def wait_for_index_completion() do transformation_cache_entries = IndexingPipeline.list_transformation_cache_entries() |> length ephemera_folder_count = FiggyTestSupport.ephemera_folder_count() - deletion_marker_count = FiggyTestSupport.deletion_marker_count() - total_records = ephemera_folder_count + deletion_marker_count + + # Count of resources to be deleted. In this test there is one added per + # DeletionMarker. A transformation cache entry with the key `deleted: true` + # stays in the transformation cache so the total number of records must take + # this into account. + deleted_resource_count = FiggyTestSupport.deletion_marker_count() + total_records = ephemera_folder_count + deleted_resource_count continue = if transformation_cache_entries == total_records do DpulCollections.Solr.commit(active_collection()) if DpulCollections.Solr.document_count() == - transformation_cache_entries - deletion_marker_count do + transformation_cache_entries - deleted_resource_count do true end end From a5a71d285cf8899b5e2b989a56ffbadbaa56680f Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Mon, 17 Feb 2025 21:14:06 -0600 Subject: [PATCH 06/12] Only process deletion markers linked to records with cache entries --- lib/dpul_collections/indexing_pipeline.ex | 11 -- .../figgy/hydration_consumer.ex | 24 ++-- .../figgy/hydration_consumer_test.exs | 110 +++++++++++++++--- 3 files changed, 114 insertions(+), 31 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline.ex b/lib/dpul_collections/indexing_pipeline.ex index 722fb236..e70c5338 100644 --- a/lib/dpul_collections/indexing_pipeline.ex +++ b/lib/dpul_collections/indexing_pipeline.ex @@ -58,8 +58,6 @@ defmodule DpulCollections.IndexingPipeline do Repo.delete(hydration_cache_entry) end - def delete_hydration_cache_entry(_), do: nil - @doc """ Writes or updates hydration cache entries. """ @@ -319,13 +317,6 @@ defmodule DpulCollections.IndexingPipeline do """ def get_transformation_cache_entry!(id), do: Repo.get!(Figgy.TransformationCacheEntry, id) - def get_transformation_cache_entry!(record_id, cache_version) do - Repo.get_by(Figgy.TransformationCacheEntry, - record_id: record_id, - cache_version: cache_version - ) - end - @spec get_transformation_cache_entries_since!( marker :: CacheEntryMarker.t(), count :: integer, @@ -381,8 +372,6 @@ defmodule DpulCollections.IndexingPipeline do Repo.delete(transformation_cache_entry) end - def delete_transformation_cache_entry(_), do: nil - @doc """ Writes or updates transformation cache entries. """ diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex index 83f0b364..6c955828 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex @@ -94,22 +94,32 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do data: %{ internal_resource: internal_resource, metadata: %{ + "resource_id" => [%{"id" => resource_id}], "resource_type" => [resource_type] } } }, - %{cache_version: _cache_version} + %{cache_version: cache_version} ) when internal_resource in ["DeletionMarker"] and resource_type in ["EphemeraFolder", "EphemeraTerm"] do - marker = CacheEntryMarker.from(message) + resource = IndexingPipeline.get_hydration_cache_entry!(resource_id, cache_version) - message_map = - %{marker: marker, incoming_message_data: message.data} - |> Map.merge(Figgy.Resource.to_hydration_cache_attrs(message.data)) + cond do + resource -> + marker = CacheEntryMarker.from(message) - message - |> Broadway.Message.put_data(message_map) + message_map = + %{marker: marker, incoming_message_data: message.data} + |> Map.merge(Figgy.Resource.to_hydration_cache_attrs(message.data)) + + message + |> Broadway.Message.put_data(message_map) + + true -> + message + |> Broadway.Message.put_batcher(:noop) + end end # If it's not selected above, ack the message but don't do anything with it. diff --git a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs index 54b0291e..4d8210d5 100644 --- a/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs +++ b/test/dpul_collections/indexing_pipeline/figgy/hydration_consumer_test.exs @@ -1,6 +1,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumerTest do use DpulCollections.DataCase + alias DpulCollections.IndexingPipeline alias DpulCollections.IndexingPipeline.Figgy describe "Figgy.HydrationConsumer" do @@ -62,7 +63,64 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumerTest do } } + file_set_deletion_marker_message = %Broadway.Message{ + acknowledger: nil, + data: %Figgy.Resource{ + id: "9773417d-1c36-4692-bf81-f387be688460", + updated_at: ~U[2025-01-02 19:47:21.726083Z], + internal_resource: "DeletionMarker", + metadata: %{ + "resource_id" => [%{"id" => "a521113e-e77a-4000-b00a-17c09b3aa757"}], + "resource_type" => ["FileSet"] + } + } + } + + transformed_messages = + [ + ephemera_folder_message, + pending_ephemera_folder_message, + restricted_ephemera_folder_message, + ephemera_term_message, + scanned_resource_message, + file_set_deletion_marker_message + ] + |> Enum.map(&Figgy.HydrationConsumer.handle_message(nil, &1, %{cache_version: 1})) + |> Enum.map(&Map.get(&1, :batcher)) + + assert transformed_messages == [:default, :noop, :noop, :default, :noop, :noop] + end + + test "handle_batch/3 only processes deletion markers with related resources in the HydrationCache" do + ephemera_folder_message = %Broadway.Message{ + acknowledger: nil, + data: %Figgy.Resource{ + id: "47276197-e223-471c-99d7-405c5f6c5285", + updated_at: ~U[2018-03-09 20:19:34.486004Z], + internal_resource: "EphemeraFolder", + metadata: %{ + "state" => ["complete"], + "visibility" => ["open"] + } + } + } + + # DeletionMarker that corresponds to a resource with a hydration cache entry ephemera_folder_deletion_marker_message = %Broadway.Message{ + acknowledger: nil, + data: %Figgy.Resource{ + id: "dced9109-6980-4035-9764-84c08ed5d7db", + updated_at: ~U[2025-01-02 19:47:21.726083Z], + internal_resource: "DeletionMarker", + metadata: %{ + "resource_id" => [%{"id" => "47276197-e223-471c-99d7-405c5f6c5285"}], + "resource_type" => ["EphemeraFolder"] + } + } + } + + # DeletionMarker that does not correspond to a resource with a hydration cache entry + orphaned_deletion_marker_message1 = %Broadway.Message{ acknowledger: nil, data: %Figgy.Resource{ id: "f8f62bdf-9d7b-438f-9870-1793358e5fe1", @@ -75,33 +133,59 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumerTest do } } - file_set_deletion_marker_message = %Broadway.Message{ + # DeletionMarker that does not correspond to a resource with a hydration cache entry + orphaned_deletion_marker_message2 = %Broadway.Message{ acknowledger: nil, data: %Figgy.Resource{ - id: "9773417d-1c36-4692-bf81-f387be688460", + id: "1a2e9bef-e50d-4a9c-81f1-8d8e82f3a8e4", updated_at: ~U[2025-01-02 19:47:21.726083Z], internal_resource: "DeletionMarker", metadata: %{ - "resource_id" => [%{"id" => "a521113e-e77a-4000-b00a-17c09b3aa757"}], - "resource_type" => ["FileSet"] + "resource_id" => [%{"id" => "e0d4e6f6-29f2-4fd7-9c8a-7293ae0d7689"}], + "resource_type" => ["EphemeraFolder"] } } } - transformed_messages = + # Create a hydration cache entry from an ephemera folder message + create_messages = + [ephemera_folder_message] + |> Enum.map(&Figgy.HydrationConsumer.handle_message(nil, &1, %{cache_version: 1})) + + Figgy.HydrationConsumer.handle_batch(:default, create_messages, nil, %{cache_version: 1}) + + # Process deletion marker messages + delete_messages = [ - ephemera_folder_message, - pending_ephemera_folder_message, - restricted_ephemera_folder_message, - ephemera_term_message, - scanned_resource_message, ephemera_folder_deletion_marker_message, - file_set_deletion_marker_message + orphaned_deletion_marker_message1, + orphaned_deletion_marker_message2 ] |> Enum.map(&Figgy.HydrationConsumer.handle_message(nil, &1, %{cache_version: 1})) - |> Enum.map(&Map.get(&1, :batcher)) - assert transformed_messages == [:default, :noop, :noop, :default, :noop, :default, :noop] + # Only the deletion marker message that has a corresponding resource with + # an exisiting hydration cache entry is handled by the default batcher. + batchers = delete_messages |> Enum.map(&Map.get(&1, :batcher)) + assert batchers == [:default, :noop, :noop] + + # Send the one message with corresponding resource to the default batch handler. + Figgy.HydrationConsumer.handle_batch(:default, [delete_messages |> hd], nil, %{ + cache_version: 1 + }) + + # A transformed hydration cache entry is created which replaces the + # existing ephemera folder hydration cache entry. It's metadata field + # only has the value `deleted` set to true. This signals the + # transformation consumer to create a transformation cache entry with a + # solr record that indicates it should be deleted from the index. + hydration_cache_entries = IndexingPipeline.list_hydration_cache_entries() + assert hydration_cache_entries |> length == 1 + + hydration_cache_entry = hydration_cache_entries |> hd + assert hydration_cache_entry.data["internal_resource"] == "EphemeraFolder" + assert hydration_cache_entry.record_id == ephemera_folder_message.data.id + assert hydration_cache_entry.data["id"] == ephemera_folder_message.data.id + assert hydration_cache_entry.data["metadata"]["deleted"] == true end end end From 1c79dce5cb0a527203aa9557d09469cb13ee6497 Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Tue, 18 Feb 2025 13:16:40 -0600 Subject: [PATCH 07/12] Add documentation --- .../0004-deleting-records.md | 60 +++++++++++++++++++ .../database_producer/cache_entry_marker.ex | 2 + .../figgy/hydration_cache_entry.ex | 2 + .../figgy/hydration_consumer.ex | 3 + .../indexing_pipeline/figgy/resource.ex | 4 ++ 5 files changed, 71 insertions(+) create mode 100644 architecture-decisions/0004-deleting-records.md diff --git a/architecture-decisions/0004-deleting-records.md b/architecture-decisions/0004-deleting-records.md new file mode 100644 index 00000000..46562fb7 --- /dev/null +++ b/architecture-decisions/0004-deleting-records.md @@ -0,0 +1,60 @@ +# 4. Deleting Records + +Date: 2025-02-18 + +## Status + +Accepted + +## Context + +When resources are deleted in Figgy, a DeletionMarker resource is created at the +same time. The DeletionMarker stores the deleted resource's identifier, +resource type, and a serialized copy of the metadata (in the `deleted_object` +field). We need a method for processing DeletionMarkers in DPUL-C to remove the +corresponding record from the Solr index. + +## Decision + +#### Hydration Consumer + +1. We will process DeletionMarkers that reference a deleted resource with a +resource type that we currently index into DPUL-C. In addition, we will check +if a hydration cache entry exists for the deleted resource and discard the +DeletionMarker if not. +1. A special CacheMarker is created from the DeletionMarker that uses the + deleted resource's id as the id and the updated_at value from the + DeletionMarker as the timestamp. +1. Special hydration cache entry attributes are generated. The hydration cache + entry created from these attributes will replace the hydration cache entry of + the deleted resource. + - Existing metadata is replaced with a simple deleted => true kv pair + - The entry id is set to the deleted resource's id + - The entry internal_resource type is set to that of the deleted resource + +#### Transformation Consumer + +1. Messages with the deleted => kv pair are handled separately. +1.A special solr document is generated from the deleted object hydration cache +entry with the following structure. + ``` + %{ id: "id", deleted: true } + ``` + +#### Indexing Consumer + +1. Messages with the `deleted: true` field are handled sperately and assigned to + the `delete` batcher. +1. The delete batcher sends the deleted record ids to a the Solr.delete_batch + function which iterates over them, deletes each record, and then commits the + batch of deletes. + +## Consequences + +- DeletionMarkers will stay in the Figgy database unless the resource is +restored. This means that DPUL-C will have to triage an ever increasing number +over time. + +- Deleted resource hydration and transformation cache entries will stay in the +cache after the resource is remove from Solr until the next full reindex, or in +the case of the transformation cache, partial reindex. diff --git a/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex b/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex index d09fa281..a3306e3f 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer/cache_entry_marker.ex @@ -34,6 +34,8 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer.CacheEntryMarker do internal_resource: "DeletionMarker", metadata: %{"resource_id" => [%{"id" => id}]} }) do + # A CacheMarker for a DeletionMarker resource has a standard timestamp, but + # the id is set from the deleted resource id. %__MODULE__{timestamp: updated_at, id: id} end diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex index 5262d282..46e956e1 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_cache_entry.ex @@ -27,6 +27,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationCacheEntry do "metadata" => %{"deleted" => true} } }) do + # Generate a small json document for deleted resources that indicates that + # the Solr record with that id should be deleted from the index. %{ id: id, deleted: true diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex index 6c955828..c486c1f1 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex @@ -103,6 +103,9 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do ) when internal_resource in ["DeletionMarker"] and resource_type in ["EphemeraFolder", "EphemeraTerm"] do + # Only process messages where the deleted resource has an exisiting + # hydration cache entry. If one does not exist, it means that the resource + # has not been indexed into DPUL-C. resource = IndexingPipeline.get_hydration_cache_entry!(resource_id, cache_version) cond do diff --git a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex index c4619afc..9782c786 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex @@ -76,6 +76,10 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do %{"resource_id" => [%{"id" => deleted_resource_id}], "resource_type" => [resource_type]} = resource.metadata + # Create attributes specifically for deletion markers. + # 1. Replace existing metadata with a simple deleted => true kv pair + # 2. Set the entry id to the deleted resource's id + # 3. Set the entry internal_resource type to that of the deleted resource resource |> Map.from_struct() |> Map.delete(:__meta__) From 8654633e927aa97546566e7c8d92aaf05b99d05e Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Wed, 19 Feb 2025 11:35:38 -0600 Subject: [PATCH 08/12] Generate map directly --- .../figgy/hydration_consumer.ex | 2 +- .../indexing_pipeline/figgy/resource.ex | 25 +++++++++---------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex index c486c1f1..776ef464 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex @@ -103,7 +103,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do ) when internal_resource in ["DeletionMarker"] and resource_type in ["EphemeraFolder", "EphemeraTerm"] do - # Only process messages where the deleted resource has an exisiting + # Only process messages where the deleted resource has an existing # hydration cache entry. If one does not exist, it means that the resource # has not been indexed into DPUL-C. resource = IndexingPipeline.get_hydration_cache_entry!(resource_id, cache_version) diff --git a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex index 9782c786..d19c0495 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex @@ -73,20 +73,19 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do @spec to_map(resource :: %__MODULE__{}) :: map() defp to_map(resource = %__MODULE__{internal_resource: "DeletionMarker"}) do - %{"resource_id" => [%{"id" => deleted_resource_id}], "resource_type" => [resource_type]} = - resource.metadata + %{ + "resource_id" => [%{"id" => deleted_resource_id}], + "resource_type" => [deleted_resource_type] + } = resource.metadata - # Create attributes specifically for deletion markers. - # 1. Replace existing metadata with a simple deleted => true kv pair - # 2. Set the entry id to the deleted resource's id - # 3. Set the entry internal_resource type to that of the deleted resource - resource - |> Map.from_struct() - |> Map.delete(:__meta__) - |> Map.delete(:metadata) - |> Map.put(:metadata, %{"deleted" => true}) - |> Map.put(:id, deleted_resource_id) - |> Map.put(:internal_resource, resource_type) + %{ + id: deleted_resource_id, + internal_resource: deleted_resource_type, + lock_version: resource.lock_version, + created_at: resource.created_at, + updated_at: resource.updated_at, + metadata: %{"deleted" => true} + } end defp to_map(resource = %__MODULE__{}) do From 1252d94a9cde3dbeca2b96b3c54944398b6f9cfc Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Wed, 19 Feb 2025 11:42:34 -0600 Subject: [PATCH 09/12] Use soft commit when deleting solr records --- lib/dpul_collections/solr.ex | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/dpul_collections/solr.ex b/lib/dpul_collections/solr.ex index dd8b34ba..6a450428 100644 --- a/lib/dpul_collections/solr.ex +++ b/lib/dpul_collections/solr.ex @@ -138,6 +138,14 @@ defmodule DpulCollections.Solr do ) end + @spec soft_commit(String.t()) :: {:ok, Req.Response.t()} | {:error, Exception.t()} + def soft_commit(collection \\ read_collection()) do + Req.get( + update_url(collection), + params: [commit: true, softCommit: true] + ) + end + @spec delete_all(String.t()) :: {:ok, Req.Response.t()} | {:error, Exception.t()} | Exception.t() def delete_all(collection \\ read_collection()) do @@ -160,7 +168,7 @@ defmodule DpulCollections.Solr do ) end) - commit(collection) + soft_commit(collection) end defp select_url(collection) do From b43bfba081495f4fdf9b94103d387672810e744c Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Wed, 19 Feb 2025 11:52:09 -0600 Subject: [PATCH 10/12] Remove redundant function --- .../figgy/transformation_consumer.ex | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex index 871864c0..c05d3aed 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/transformation_consumer.ex @@ -49,24 +49,6 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationConsumer do @impl Broadway @spec handle_message(any(), any(), %{required(:cache_version) => integer()}) :: Broadway.Message.t() - def handle_message( - _processor, - message = %Broadway.Message{ - data: hydration_cache_entry = %{data: %{"metadata" => %{"deleted" => true}}} - }, - %{cache_version: _cache_version} - ) do - solr_doc = Figgy.HydrationCacheEntry.to_solr_document(hydration_cache_entry) - marker = CacheEntryMarker.from(message) - - message - |> Message.put_data(%{ - marker: marker, - incoming_message_data: hydration_cache_entry, - handled_data: solr_doc - }) - end - def handle_message( _processor, message = %Broadway.Message{ From 9d1e2bba5ede1deac4e130bef059e3a73a8421e8 Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Wed, 19 Feb 2025 11:52:31 -0600 Subject: [PATCH 11/12] Rename resource variable --- .../indexing_pipeline/figgy/hydration_consumer.ex | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex index 776ef464..78ed803e 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex @@ -106,10 +106,11 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do # Only process messages where the deleted resource has an existing # hydration cache entry. If one does not exist, it means that the resource # has not been indexed into DPUL-C. - resource = IndexingPipeline.get_hydration_cache_entry!(resource_id, cache_version) + hydration_cache_entry = + IndexingPipeline.get_hydration_cache_entry!(resource_id, cache_version) cond do - resource -> + hydration_cache_entry -> marker = CacheEntryMarker.from(message) message_map = From b57ea0b8a48fec1c3d5316229d71071eebc61c16 Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Wed, 19 Feb 2025 11:57:00 -0600 Subject: [PATCH 12/12] Update ADR documentation --- architecture-decisions/0004-deleting-records.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/architecture-decisions/0004-deleting-records.md b/architecture-decisions/0004-deleting-records.md index 46562fb7..f3efe59a 100644 --- a/architecture-decisions/0004-deleting-records.md +++ b/architecture-decisions/0004-deleting-records.md @@ -34,8 +34,7 @@ DeletionMarker if not. #### Transformation Consumer -1. Messages with the deleted => kv pair are handled separately. -1.A special solr document is generated from the deleted object hydration cache +1. A special solr document is generated from the deleted object hydration cache entry with the following structure. ``` %{ id: "id", deleted: true } @@ -47,7 +46,9 @@ entry with the following structure. the `delete` batcher. 1. The delete batcher sends the deleted record ids to a the Solr.delete_batch function which iterates over them, deletes each record, and then commits the - batch of deletes. + batch of deletes. The additional batcher doesn't create a potential race + condition because there is only one entry for the resource earlier in the + pipeline. ## Consequences @@ -56,5 +57,4 @@ restored. This means that DPUL-C will have to triage an ever increasing number over time. - Deleted resource hydration and transformation cache entries will stay in the -cache after the resource is remove from Solr until the next full reindex, or in -the case of the transformation cache, partial reindex. +cache after the resource is remove from Solr until the next full reindex.