Skip to content

Commit

Permalink
Delete records when visiblity or state change
Browse files Browse the repository at this point in the history
  • Loading branch information
eliotjordan committed Feb 18, 2025
1 parent 1c79dce commit 3492690
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 6 deletions.
32 changes: 32 additions & 0 deletions lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,38 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do
|> Broadway.Message.put_data(message_map)
end

@impl Broadway
# pass through messages and write to cache in batcher to avoid race condition
def handle_message(
_processor,
message = %Broadway.Message{
data: %{
id: id,
internal_resource: internal_resource
}
},
%{cache_version: cache_version}
)
when internal_resource in ["EphemeraFolder"] do
resource = IndexingPipeline.get_hydration_cache_entry!(id, cache_version)

cond do
resource ->
marker = CacheEntryMarker.from(message)

message_map =
%{marker: marker, incoming_message_data: message.data}
|> Map.merge(Figgy.Resource.to_hydration_cache_attrs(message.data))

message
|> Broadway.Message.put_data(message_map)

true ->
message
|> Broadway.Message.put_batcher(:noop)
end
end

def handle_message(
_processor,
message = %Broadway.Message{
Expand Down
32 changes: 26 additions & 6 deletions lib/dpul_collections/indexing_pipeline/figgy/resource.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,25 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do
}
end

def to_hydration_cache_attrs(resource = %__MODULE__{internal_resource: "EphemeraFolder"}) do
def to_hydration_cache_attrs(
resource = %__MODULE__{
internal_resource: "EphemeraFolder",
metadata: %{"state" => ["complete"], "visibility" => ["open"]}
}
) do
%{
handled_data: resource |> to_map,
related_data: extract_related_data(resource)
}
end

def to_hydration_cache_attrs(resource = %__MODULE__{internal_resource: "EphemeraFolder"}) do
%{
handled_data: resource |> to_deleted_map,
related_data: %{}
}
end

@spec extract_related_data(resource :: %__MODULE__{}) :: related_data()
def extract_related_data(resource) do
%{
Expand Down Expand Up @@ -76,15 +88,12 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do
%{"resource_id" => [%{"id" => deleted_resource_id}], "resource_type" => [resource_type]} =
resource.metadata

# Create attributes specifically for deletion markers.
# Create attributes specifically for DeletionMarkers
# 1. Replace existing metadata with a simple deleted => true kv pair
# 2. Set the entry id to the deleted resource's id
# 3. Set the entry internal_resource type to that of the deleted resource
resource
|> Map.from_struct()
|> Map.delete(:__meta__)
|> Map.delete(:metadata)
|> Map.put(:metadata, %{"deleted" => true})
|> to_deleted_map
|> Map.put(:id, deleted_resource_id)
|> Map.put(:internal_resource, resource_type)
end
Expand All @@ -94,4 +103,15 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do
|> Map.from_struct()
|> Map.delete(:__meta__)
end

@spec to_deleted_map(resource :: %__MODULE__{}) :: map()
defp to_deleted_map(resource = %__MODULE__{}) do
# Create attributes specifically for resources that need to be deleted
# 1. Replace existing metadata with a simple deleted => true kv pair
resource
|> Map.from_struct()
|> Map.delete(:__meta__)
|> Map.delete(:metadata)
|> Map.put(:metadata, %{"deleted" => true})
end
end

0 comments on commit 3492690

Please sign in to comment.