Skip to content

Commit

Permalink
Merge pull request #317 from pulibrary/102-deletes
Browse files Browse the repository at this point in the history
Delete records linked to deletion markers
  • Loading branch information
hackartisan authored Feb 19, 2025
2 parents fd12146 + b57ea0b commit 94bf233
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 15 deletions.
60 changes: 60 additions & 0 deletions architecture-decisions/0004-deleting-records.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# 4. Deleting Records

Date: 2025-02-18

## Status

Accepted

## Context

When resources are deleted in Figgy, a DeletionMarker resource is created at the
same time. The DeletionMarker stores the deleted resource's identifier,
resource type, and a serialized copy of the metadata (in the `deleted_object`
field). We need a method for processing DeletionMarkers in DPUL-C to remove the
corresponding record from the Solr index.

## Decision

#### Hydration Consumer

1. We will process DeletionMarkers that reference a deleted resource with a
resource type that we currently index into DPUL-C. In addition, we will check
if a hydration cache entry exists for the deleted resource and discard the
DeletionMarker if not.
1. A special CacheMarker is created from the DeletionMarker that uses the
deleted resource's id as the id and the updated_at value from the
DeletionMarker as the timestamp.
1. Special hydration cache entry attributes are generated. The hydration cache
entry created from these attributes will replace the hydration cache entry of
the deleted resource.
- Existing metadata is replaced with a simple deleted => true kv pair
- The entry id is set to the deleted resource's id
- The entry internal_resource type is set to that of the deleted resource

#### Transformation Consumer

1. A special solr document is generated from the deleted object hydration cache
entry with the following structure.
```
%{ id: "id", deleted: true }
```

#### Indexing Consumer

1. Messages with the `deleted: true` field are handled sperately and assigned to
the `delete` batcher.
1. The delete batcher sends the deleted record ids to a the Solr.delete_batch
function which iterates over them, deletes each record, and then commits the
batch of deletes. The additional batcher doesn't create a potential race
condition because there is only one entry for the resource earlier in the
pipeline.

## Consequences

- DeletionMarkers will stay in the Figgy database unless the resource is
restored. This means that DPUL-C will have to triage an ever increasing number
over time.

- Deleted resource hydration and transformation cache entries will stay in the
cache after the resource is remove from Solr until the next full reindex.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer.CacheEntryMarker do
end

@spec from(%Resource{}) :: t()
def from(%Resource{
updated_at: updated_at,
internal_resource: "DeletionMarker",
metadata: %{"resource_id" => [%{"id" => id}]}
}) do
# A CacheMarker for a DeletionMarker resource has a standard timestamp, but
# the id is set from the deleted resource id.
%__MODULE__{timestamp: updated_at, id: id}
end

def from(%Resource{updated_at: updated_at, id: id}) do
%__MODULE__{timestamp: updated_at, id: id}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,25 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationCacheEntry do
end

@spec to_solr_document(%__MODULE__{}) :: %{}
def to_solr_document(hydration_cache_entry) do
%{record_id: id} = hydration_cache_entry
%{data: data = %{"metadata" => metadata}, related_data: related_data} = hydration_cache_entry
def to_solr_document(%{
record_id: id,
data: %{
"metadata" => %{"deleted" => true}
}
}) do
# Generate a small json document for deleted resources that indicates that
# the Solr record with that id should be deleted from the index.
%{
id: id,
deleted: true
}
end

def to_solr_document(%{
record_id: id,
data: data = %{"metadata" => metadata},
related_data: related_data
}) do
%{
id: id,
title_txtm: extract_title(metadata),
Expand Down
38 changes: 38 additions & 0 deletions lib/dpul_collections/indexing_pipeline/figgy/hydration_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,44 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationConsumer do
|> Broadway.Message.put_data(message_map)
end

def handle_message(
_processor,
message = %Broadway.Message{
data: %{
internal_resource: internal_resource,
metadata: %{
"resource_id" => [%{"id" => resource_id}],
"resource_type" => [resource_type]
}
}
},
%{cache_version: cache_version}
)
when internal_resource in ["DeletionMarker"] and
resource_type in ["EphemeraFolder", "EphemeraTerm"] do
# Only process messages where the deleted resource has an existing
# hydration cache entry. If one does not exist, it means that the resource
# has not been indexed into DPUL-C.
hydration_cache_entry =
IndexingPipeline.get_hydration_cache_entry!(resource_id, cache_version)

cond do
hydration_cache_entry ->
marker = CacheEntryMarker.from(message)

message_map =
%{marker: marker, incoming_message_data: message.data}
|> Map.merge(Figgy.Resource.to_hydration_cache_attrs(message.data))

message
|> Broadway.Message.put_data(message_map)

true ->
message
|> Broadway.Message.put_batcher(:noop)
end
end

# If it's not selected above, ack the message but don't do anything with it.
def handle_message(_processor, message, _state) do
message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingConsumer do
default: []
],
batchers: [
default: [batch_size: options[:batch_size]]
default: [batch_size: options[:batch_size]],
delete: [batch_size: options[:batch_size]]
],
context: %{
cache_version: options[:cache_version],
Expand All @@ -55,10 +56,29 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingConsumer do
# return the message so we can handle it in the batch
@spec handle_message(any(), Broadway.Message.t(), %{required(:cache_version) => integer()}) ::
Broadway.Message.t()
def handle_message(_processor, %{data: %{data: %{"deleted" => true}}} = message, %{
cache_version: _cache_version
}) do
message
|> Broadway.Message.put_batcher(:delete)
end

def handle_message(_processor, message, %{cache_version: _cache_version}) do
message
end

@impl Broadway
@spec handle_batch(any(), list(Broadway.Message.t()), any(), any()) ::
list(Broadway.Message.t())
def handle_batch(:delete, messages, _batch_info, context) do
messages
|> Enum.map(&unwrap/1)
|> Enum.map(& &1["id"])
|> Solr.delete_batch(context[:write_collection])

messages
end

@impl Broadway
@spec handle_batch(any(), list(Broadway.Message.t()), any(), any()) ::
list(Broadway.Message.t())
Expand Down
23 changes: 23 additions & 0 deletions lib/dpul_collections/indexing_pipeline/figgy/resource.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do
handled_data: map(),
related_data: related_data()
}
def to_hydration_cache_attrs(resource = %__MODULE__{internal_resource: "DeletionMarker"}) do
%{
handled_data: resource |> to_map,
related_data: %{}
}
end

def to_hydration_cache_attrs(resource = %__MODULE__{internal_resource: "EphemeraTerm"}) do
%{
handled_data: resource |> to_map,
Expand Down Expand Up @@ -59,6 +66,22 @@ defmodule DpulCollections.IndexingPipeline.Figgy.Resource do
end

@spec to_map(resource :: %__MODULE__{}) :: map()
defp to_map(resource = %__MODULE__{internal_resource: "DeletionMarker"}) do
%{
"resource_id" => [%{"id" => deleted_resource_id}],
"resource_type" => [deleted_resource_type]
} = resource.metadata

%{
id: deleted_resource_id,
internal_resource: deleted_resource_type,
lock_version: resource.lock_version,
created_at: resource.created_at,
updated_at: resource.updated_at,
metadata: %{"deleted" => true}
}
end

defp to_map(resource = %__MODULE__{}) do
resource
|> Map.from_struct()
Expand Down
22 changes: 22 additions & 0 deletions lib/dpul_collections/solr.ex
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ defmodule DpulCollections.Solr do
)
end

@spec soft_commit(String.t()) :: {:ok, Req.Response.t()} | {:error, Exception.t()}
def soft_commit(collection \\ read_collection()) do
Req.get(
update_url(collection),
params: [commit: true, softCommit: true]
)
end

@spec delete_all(String.t()) ::
{:ok, Req.Response.t()} | {:error, Exception.t()} | Exception.t()
def delete_all(collection \\ read_collection()) do
Expand All @@ -167,6 +175,20 @@ defmodule DpulCollections.Solr do
commit(collection)
end

@spec delete_batch(list(), String.t()) ::
{:ok, Req.Response.t()} | {:error, Exception.t()} | Exception.t()
def delete_batch(ids, collection \\ read_collection()) do
ids
|> Enum.each(fn id ->
Req.post!(
update_url(collection),
json: %{delete: %{query: "id:#{id}"}}
)
end)

soft_commit(collection)
end

defp select_url(collection) do
client()
|> Req.merge(url: "/solr/#{collection}/select")
Expand Down
Loading

0 comments on commit 94bf233

Please sign in to comment.