From 349269094981f572aedc4982a967d76733483e97 Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Tue, 18 Feb 2025 14:57:52 -0600 Subject: [PATCH] Delete records when visiblity or state change --- .../figgy/hydration_consumer.ex | 32 +++++++++++++++++++ .../indexing_pipeline/figgy/resource.ex | 32 +++++++++++++++---- 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex index c486c1f1..d536fd0a 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex @@ -68,6 +68,38 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do |> Broadway.Message.put_data(message_map) end + @impl Broadway + # pass through messages and write to cache in batcher to avoid race condition + def handle_message( + _processor, + message = %Broadway.Message{ + data: %{ + id: id, + internal_resource: internal_resource + } + }, + %{cache_version: cache_version} + ) + when internal_resource in ["EphemeraFolder"] do + resource = IndexingPipeline.get_hydration_cache_entry!(id, cache_version) + + cond do + resource -> + marker = CacheEntryMarker.from(message) + + message_map = + %{marker: marker, incoming_message_data: message.data} + |> Map.merge(Figgy.Resource.to_hydration_cache_attrs(message.data)) + + message + |> Broadway.Message.put_data(message_map) + + true -> + message + |> Broadway.Message.put_batcher(:noop) + end + end + def handle_message( _processor, message = %Broadway.Message{ diff --git a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex index 9782c786..f2af56aa 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/resource.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/resource.ex @@ -36,13 +36,25 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do } end - def to_hydration_cache_attrs(resource = %__MODULE__{internal_resource: "EphemeraFolder"}) do + def to_hydration_cache_attrs( + resource = %__MODULE__{ + internal_resource: "EphemeraFolder", + metadata: %{"state" => ["complete"], "visibility" => ["open"]} + } + ) do %{ handled_data: resource |> to_map, related_data: extract_related_data(resource) } end + def to_hydration_cache_attrs(resource = %__MODULE__{internal_resource: "EphemeraFolder"}) do + %{ + handled_data: resource |> to_deleted_map, + related_data: %{} + } + end + @spec extract_related_data(resource :: %__MODULE__{}) :: related_data() def extract_related_data(resource) do %{ @@ -76,15 +88,12 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do %{"resource_id" => [%{"id" => deleted_resource_id}], "resource_type" => [resource_type]} = resource.metadata - # Create attributes specifically for deletion markers. + # Create attributes specifically for DeletionMarkers # 1. Replace existing metadata with a simple deleted => true kv pair # 2. Set the entry id to the deleted resource's id # 3. Set the entry internal_resource type to that of the deleted resource resource - |> Map.from_struct() - |> Map.delete(:__meta__) - |> Map.delete(:metadata) - |> Map.put(:metadata, %{"deleted" => true}) + |> to_deleted_map |> Map.put(:id, deleted_resource_id) |> Map.put(:internal_resource, resource_type) end @@ -94,4 +103,15 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do |> Map.from_struct() |> Map.delete(:__meta__) end + + @spec to_deleted_map(resource :: %__MODULE__{}) :: map() + defp to_deleted_map(resource = %__MODULE__{}) do + # Create attributes specifically for resources that need to be deleted + # 1. Replace existing metadata with a simple deleted => true kv pair + resource + |> Map.from_struct() + |> Map.delete(:__meta__) + |> Map.delete(:metadata) + |> Map.put(:metadata, %{"deleted" => true}) + end end