diff --git a/architecture-decisions/0002-indexing.md b/architecture-decisions/0002-indexing.md index f5a3cda4..2a65e676 100644 --- a/architecture-decisions/0002-indexing.md +++ b/architecture-decisions/0002-indexing.md @@ -127,7 +127,7 @@ Each Processor will keep track of the last object they acted on in a ProcessorMa | id | cache_location | cache_version | type | |------|----------------|---------------|---------| -| INT | varchar | INT | VARCHAR | +| INT | DATETIME | INT | VARCHAR | - For Hydrator, `cache_location` is an `updated_at` value from the Figgy database. - For Transformer, `cache_location` is a `cache_order` value from the HydrationCache diff --git a/lib/dpul_collections/indexing_pipeline.ex b/lib/dpul_collections/indexing_pipeline.ex index 753b3140..f8e2a372 100644 --- a/lib/dpul_collections/indexing_pipeline.ex +++ b/lib/dpul_collections/indexing_pipeline.ex @@ -101,4 +101,100 @@ defmodule DpulCollections.IndexingPipeline do def change_hydration_cache_entry(%HydrationCacheEntry{} = hydration_cache_entry, attrs \\ %{}) do HydrationCacheEntry.changeset(hydration_cache_entry, attrs) end + + alias DpulCollections.IndexingPipeline.ProcessorMarker + + @doc """ + Returns the list of processor_markers. + + ## Examples + + iex> list_processor_markers() + [%ProcessorMarker{}, ...] + + """ + def list_processor_markers do + Repo.all(ProcessorMarker) + end + + @doc """ + Gets a single processor_marker. + + Raises `Ecto.NoResultsError` if the Processor marker does not exist. + + ## Examples + + iex> get_processor_marker!(123) + %ProcessorMarker{} + + iex> get_processor_marker!(456) + ** (Ecto.NoResultsError) + + """ + def get_processor_marker!(id), do: Repo.get!(ProcessorMarker, id) + + @doc """ + Creates a processor_marker. + + ## Examples + + iex> create_processor_marker(%{field: value}) + {:ok, %ProcessorMarker{}} + + iex> create_processor_marker(%{field: bad_value}) + {:error, %Ecto.Changeset{}} + + """ + def create_processor_marker(attrs \\ %{}) do + %ProcessorMarker{} + |> ProcessorMarker.changeset(attrs) + |> Repo.insert() + end + + @doc """ + Updates a processor_marker. + + ## Examples + + iex> update_processor_marker(processor_marker, %{field: new_value}) + {:ok, %ProcessorMarker{}} + + iex> update_processor_marker(processor_marker, %{field: bad_value}) + {:error, %Ecto.Changeset{}} + + """ + def update_processor_marker(%ProcessorMarker{} = processor_marker, attrs) do + processor_marker + |> ProcessorMarker.changeset(attrs) + |> Repo.update() + end + + @doc """ + Deletes a processor_marker. + + ## Examples + + iex> delete_processor_marker(processor_marker) + {:ok, %ProcessorMarker{}} + + iex> delete_processor_marker(processor_marker) + {:error, %Ecto.Changeset{}} + + """ + def delete_processor_marker(%ProcessorMarker{} = processor_marker) do + Repo.delete(processor_marker) + end + + @doc """ + Returns an `%Ecto.Changeset{}` for tracking processor_marker changes. + + ## Examples + + iex> change_processor_marker(processor_marker) + %Ecto.Changeset{data: %ProcessorMarker{}} + + """ + def change_processor_marker(%ProcessorMarker{} = processor_marker, attrs \\ %{}) do + ProcessorMarker.changeset(processor_marker, attrs) + end end diff --git a/lib/dpul_collections/indexing_pipeline/processor_marker.ex b/lib/dpul_collections/indexing_pipeline/processor_marker.ex new file mode 100644 index 00000000..9bff670a --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/processor_marker.ex @@ -0,0 +1,19 @@ +defmodule DpulCollections.IndexingPipeline.ProcessorMarker do + use Ecto.Schema + import Ecto.Changeset + + schema "processor_markers" do + field :type, :string + field :cache_location, :utc_datetime + field :cache_version, :integer + + timestamps(type: :utc_datetime) + end + + @doc false + def changeset(processor_marker, attrs) do + processor_marker + |> cast(attrs, [:cache_location, :cache_version, :type]) + |> validate_required([:cache_location, :cache_version, :type]) + end +end diff --git a/priv/repo/migrations/20240724204108_create_processor_markers.exs b/priv/repo/migrations/20240724204108_create_processor_markers.exs new file mode 100644 index 00000000..2b88e548 --- /dev/null +++ b/priv/repo/migrations/20240724204108_create_processor_markers.exs @@ -0,0 +1,13 @@ +defmodule DpulCollections.Repo.Migrations.CreateProcessorMarkers do + use Ecto.Migration + + def change do + create table(:processor_markers) do + add :cache_location, :utc_datetime + add :cache_version, :integer + add :type, :string + + timestamps(type: :utc_datetime) + end + end +end diff --git a/test/dpul_collections/indexing_pipeline_test.exs b/test/dpul_collections/indexing_pipeline_test.exs index 82cd402e..78a7da60 100644 --- a/test/dpul_collections/indexing_pipeline_test.exs +++ b/test/dpul_collections/indexing_pipeline_test.exs @@ -94,4 +94,84 @@ defmodule DpulCollections.IndexingPipelineTest do IndexingPipeline.change_hydration_cache_entry(hydration_cache_entry) end end + + describe "processor_markers" do + alias DpulCollections.IndexingPipeline.ProcessorMarker + + import DpulCollections.IndexingPipelineFixtures + + @invalid_attrs %{type: nil, cache_location: nil, cache_version: nil} + + test "list_processor_markers/0 returns all processor_markers" do + processor_marker = processor_marker_fixture() + assert IndexingPipeline.list_processor_markers() == [processor_marker] + end + + test "get_processor_marker!/1 returns the processor_marker with given id" do + processor_marker = processor_marker_fixture() + assert IndexingPipeline.get_processor_marker!(processor_marker.id) == processor_marker + end + + test "create_processor_marker/1 with valid data creates a processor_marker" do + valid_attrs = %{ + type: "some type", + cache_location: ~U[2024-07-23 20:40:00Z], + cache_version: 42 + } + + assert {:ok, %ProcessorMarker{} = processor_marker} = + IndexingPipeline.create_processor_marker(valid_attrs) + + assert processor_marker.type == "some type" + assert processor_marker.cache_location == ~U[2024-07-23 20:40:00Z] + assert processor_marker.cache_version == 42 + end + + test "create_processor_marker/1 with invalid data returns error changeset" do + assert {:error, %Ecto.Changeset{}} = + IndexingPipeline.create_processor_marker(@invalid_attrs) + end + + test "update_processor_marker/2 with valid data updates the processor_marker" do + processor_marker = processor_marker_fixture() + + update_attrs = %{ + type: "some updated type", + cache_location: ~U[2024-07-24 20:40:00Z], + cache_version: 43 + } + + assert {:ok, %ProcessorMarker{} = processor_marker} = + IndexingPipeline.update_processor_marker(processor_marker, update_attrs) + + assert processor_marker.type == "some updated type" + assert processor_marker.cache_location == ~U[2024-07-24 20:40:00Z] + assert processor_marker.cache_version == 43 + end + + test "update_processor_marker/2 with invalid data returns error changeset" do + processor_marker = processor_marker_fixture() + + assert {:error, %Ecto.Changeset{}} = + IndexingPipeline.update_processor_marker(processor_marker, @invalid_attrs) + + assert processor_marker == IndexingPipeline.get_processor_marker!(processor_marker.id) + end + + test "delete_processor_marker/1 deletes the processor_marker" do + processor_marker = processor_marker_fixture() + + assert {:ok, %ProcessorMarker{}} = + IndexingPipeline.delete_processor_marker(processor_marker) + + assert_raise Ecto.NoResultsError, fn -> + IndexingPipeline.get_processor_marker!(processor_marker.id) + end + end + + test "change_processor_marker/1 returns a processor_marker changeset" do + processor_marker = processor_marker_fixture() + assert %Ecto.Changeset{} = IndexingPipeline.change_processor_marker(processor_marker) + end + end end diff --git a/test/support/fixtures/indexing_pipeline_fixtures.ex b/test/support/fixtures/indexing_pipeline_fixtures.ex index 4a793cf7..c19d434c 100644 --- a/test/support/fixtures/indexing_pipeline_fixtures.ex +++ b/test/support/fixtures/indexing_pipeline_fixtures.ex @@ -20,4 +20,20 @@ defmodule DpulCollections.IndexingPipelineFixtures do hydration_cache_entry end + + @doc """ + Generate a processor_marker. + """ + def processor_marker_fixture(attrs \\ %{}) do + {:ok, processor_marker} = + attrs + |> Enum.into(%{ + cache_location: ~U[2024-07-23 20:40:00Z], + cache_version: 42, + type: "some type" + }) + |> DpulCollections.IndexingPipeline.create_processor_marker() + + processor_marker + end end