diff --git a/.circleci/config.yml b/.circleci/config.yml index 15a31a43..a4ecfcc4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -5,10 +5,15 @@ description: Common jobs for testing and building phoenix applications executors: builder: docker: - - image: cimg/elixir:1.16.2-erlang-26.2.1-browsers + - image: cimg/elixir:1.17.2-erlang-26.2.1-browsers environment: TEST_POSTGRES_PORT: 5432 + TEST_POSTGRES_FIGGY_HOST: figgy_database - image: cimg/postgres:15.2 + - image: ghcr.io/pulibrary/dpul-collections:figgy-fixtures + name: figgy_database + environment: + POSTGRES_PASSWORD: "postgres" working_directory: ~/project commands: diff --git a/architecture-decisions/0002-indexing.md b/architecture-decisions/0002-indexing.md index f5a3cda4..2a65e676 100644 --- a/architecture-decisions/0002-indexing.md +++ b/architecture-decisions/0002-indexing.md @@ -127,7 +127,7 @@ Each Processor will keep track of the last object they acted on in a ProcessorMa | id | cache_location | cache_version | type | |------|----------------|---------------|---------| -| INT | varchar | INT | VARCHAR | +| INT | DATETIME | INT | VARCHAR | - For Hydrator, `cache_location` is an `updated_at` value from the Figgy database. - For Transformer, `cache_location` is a `cache_order` value from the HydrationCache diff --git a/config/config.exs b/config/config.exs index 69185047..45b9e3d2 100644 --- a/config/config.exs +++ b/config/config.exs @@ -8,7 +8,7 @@ import Config config :dpul_collections, - ecto_repos: [DpulCollections.Repo], + ecto_repos: [DpulCollections.Repo, DpulCollections.FiggyRepo], generators: [timestamp_type: :utc_datetime] # Configures the endpoint diff --git a/config/dev.exs b/config/dev.exs index 951d714a..1f57b263 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -11,6 +11,23 @@ config :dpul_collections, DpulCollections.Repo, show_sensitive_data_on_connection_error: true, pool_size: 10 +# Configure your other database +config :dpul_collections, DpulCollections.FiggyRepo, + username: "postgres", + password: "postgres", + hostname: "localhost", + port: "5435", + database: "postgres", + stacktrace: true, + show_sensitive_data_on_connection_error: true, + pool_size: 10 + +# Setup test to use a Stand-In Producer +config :dpul_collections, + producer_module: DpulCollections.IndexingPipeline.FiggyProducer, + # change if required for your dev/prod producer + producer_options: [] + # For development, we disable any cache and enable # debugging and code reloading. # diff --git a/config/test.exs b/config/test.exs index 6471913d..aafaa8c4 100644 --- a/config/test.exs +++ b/config/test.exs @@ -5,6 +5,7 @@ import Config # The MIX_TEST_PARTITION environment variable can be used # to provide built-in test partitioning in CI environment. # Run `mix help test` for more information. + config :dpul_collections, DpulCollections.Repo, username: "postgres", password: "postgres", @@ -14,6 +15,17 @@ config :dpul_collections, DpulCollections.Repo, pool: Ecto.Adapters.SQL.Sandbox, pool_size: System.schedulers_online() * 2 +# Configure your other database +config :dpul_collections, DpulCollections.FiggyRepo, + username: "postgres", + password: "postgres", + hostname: System.get_env("TEST_POSTGRES_FIGGY_HOST") || "localhost", + port: System.get_env("TEST_POSTGRES_PORT") || 5435, + database: "postgres", + stacktrace: true, + show_sensitive_data_on_connection_error: true, + pool_size: 10 + # We don't run a server during test. If one is required, # you can enable the server option below. config :dpul_collections, DpulCollectionsWeb.Endpoint, diff --git a/docs/indexing_pipeline/producer.md b/docs/indexing_pipeline/producer.md new file mode 100644 index 00000000..c5a1e5b7 --- /dev/null +++ b/docs/indexing_pipeline/producer.md @@ -0,0 +1,67 @@ +When Producer spins up, initialize last_queried_marker from the ProcessorMarkers table, or set it to 1900. + +```mermaid +sequenceDiagram title Figgy Producer +Participant FiggyDatabase +Participant Producer +Participant Acknowledger +Participant Consumer +Participant Batcher as Batcher (batch size of 2) +Participant HydrationCache + +Consumer->>Producer: Demand 3 records +Producer->>FiggyDatabase: Query records since last_queried_marker (1900) +FiggyDatabase->>Producer: Return 3 records +Producer->>Producer: Set last_queried_marker to last updated_at +Producer->>Producer: Add all 3 {record_id, updated_at} to pulled_records +Producer->>Consumer: Deliver record[1,2,3] +Consumer->>Consumer: Process record[1] +Consumer->>Batcher: Deliver record[1] +Consumer->>Consumer: Process record[3] +Consumer->>Batcher: Deliver record[3] +Batcher->>HydrationCache: Writes record[1,3] +Batcher->>Acknowledger: Acknowledge record[1,3] +Acknowledger->>Acknowledger: Do error handling +Acknowledger->>Producer: Acknowledge record[1,3] +Producer->>Producer: Update state: Append record[1,3] to acked_records +Producer->>Producer: Update state: Sort acked_records by updated_at +Producer->>Producer: Run Acknowledging Records algorithm +Consumer->>Consumer: Process record[2] +Consumer->>Batcher: Deliver record[2] +Batcher->>Batcher: 1 minute passes +Batcher->>HydrationCache: Writes record[2] +Batcher->>Acknowledger: Acknowledge record[2] +Acknowledger->>Acknowledger: Do error handling +Acknowledger->>Producer: Acknowledge record[2] +Producer->>Producer: Update state: Append record[2] to acked_records +Producer->>Producer: Update state: Sort acked_records by updated_at +Producer->>Producer: Run Acknowledging Records algorithm +Consumer->>Producer: Demand 3 records +Producer->>FiggyDatabase: Query records since last_queried_marker +``` + +## Managing Producer State + +### Acknowledging Records + +When receiving acknowledgement for [1,3]: + +Start state: `{last_queried_marker: record[3].updated_at, pulled_records: [1,2,3], acked_records: [1,3]}` + +If the first element is the same in pulled_records and acked_records, then remove that element from both. Repeat until there's no match. Then write the timestamp from the last element that got removed from pulled_records. + +If the first marker in acked_records is less than the first marker in pulled_records, discard it. This means we must have acked that record already, and we don't need to ack it again. This could happen in certain producer crash scenarios. + +The processor will block during this acknowledgement, so you don't have to worry about race conditions here. + +End State: `{last_queried_marker: record[3].updated_at, pulled_records: [2,3], acked_records: [3]}` + +Write `1.updated_at` to `ProcessorMarkers` + +When receiving Acknowledgement for [2]: + +Start State: `{last_queried_marker: record[3].updated_at, pulled_records: [2,3], acked_records: [2,3]}` + +End State: `{last_queried_marker: record[3].updated_at, pulled_records: [], acked_records: []}` + +Write `3.updated_at` to `ProcessorMarkers` diff --git a/lib/dpul_collections/application.ex b/lib/dpul_collections/application.ex index c495a2b0..b6e17a06 100644 --- a/lib/dpul_collections/application.ex +++ b/lib/dpul_collections/application.ex @@ -10,6 +10,11 @@ defmodule DpulCollections.Application do children = [ DpulCollectionsWeb.Telemetry, DpulCollections.Repo, + DpulCollections.FiggyRepo, + # Controllable Hydrator for testing in dev. + # {DpulCollections.IndexingPipeline.FiggyHydrator, producer_module: FiggyTestProducer, producer_options: {self()}}, + # Production Hydrator + # DpulCollections.IndexingPipeline.FiggyHydrator, {DNSCluster, query: Application.get_env(:dpul_collections, :dns_cluster_query) || :ignore}, {Phoenix.PubSub, name: DpulCollections.PubSub}, # Start the Finch HTTP client for sending emails diff --git a/lib/dpul_collections/figgy_repo.ex b/lib/dpul_collections/figgy_repo.ex new file mode 100644 index 00000000..ea6ea5d7 --- /dev/null +++ b/lib/dpul_collections/figgy_repo.ex @@ -0,0 +1,6 @@ +defmodule DpulCollections.FiggyRepo do + use Ecto.Repo, + otp_app: :dpul_collections, + adapter: Ecto.Adapters.Postgres, + read_only: true +end diff --git a/lib/dpul_collections/indexing_pipeline.ex b/lib/dpul_collections/indexing_pipeline.ex new file mode 100644 index 00000000..1b82ba99 --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline.ex @@ -0,0 +1,236 @@ +defmodule DpulCollections.IndexingPipeline do + @moduledoc """ + The IndexingPipeline context. + """ + + import Ecto.Query, warn: false + alias DpulCollections.{Repo, FiggyRepo} + + alias DpulCollections.IndexingPipeline.{HydrationCacheEntry, FiggyResource, ResourceMarker} + + @doc """ + Returns the list of hydration_cache_entries. + + ## Examples + + iex> list_hydration_cache_entries() + [%HydrationCacheEntry{}, ...] + + """ + def list_hydration_cache_entries do + Repo.all(HydrationCacheEntry) + end + + @doc """ + Gets a single hydration_cache_entry. + + Raises `Ecto.NoResultsError` if the Hydration cache entry does not exist. + + ## Examples + + iex> get_hydration_cache_entry!(123) + %HydrationCacheEntry{} + + iex> get_hydration_cache_entry!(456) + ** (Ecto.NoResultsError) + + """ + def get_hydration_cache_entry!(id), do: Repo.get!(HydrationCacheEntry, id) + + @doc """ + Deletes a hydration_cache_entry. + + ## Examples + + iex> delete_hydration_cache_entry(hydration_cache_entry) + {:ok, %HydrationCacheEntry{}} + + iex> delete_hydration_cache_entry(hydration_cache_entry) + {:error, %Ecto.Changeset{}} + + """ + def delete_hydration_cache_entry(%HydrationCacheEntry{} = hydration_cache_entry) do + Repo.delete(hydration_cache_entry) + end + + @doc """ + Writes or updates hydration cache entries. + """ + def write_hydration_cache_entry(attrs \\ %{}) do + conflict_query = + HydrationCacheEntry + |> update(set: [data: ^attrs.data, source_cache_order: ^attrs.source_cache_order]) + |> where([c], c.source_cache_order <= ^attrs.source_cache_order) + + try do + %HydrationCacheEntry{} + |> HydrationCacheEntry.changeset(attrs) + |> Repo.insert( + on_conflict: conflict_query, + conflict_target: [:cache_version, :record_id] + ) + rescue + Ecto.StaleEntryError -> {:ok, nil} + end + end + + alias DpulCollections.IndexingPipeline.ProcessorMarker + + @doc """ + Returns the list of processor_markers. + + ## Examples + + iex> list_processor_markers() + [%ProcessorMarker{}, ...] + + """ + def list_processor_markers do + Repo.all(ProcessorMarker) + end + + @doc """ + Gets a single processor_marker. + + Raises `Ecto.NoResultsError` if the Processor marker does not exist. + + ## Examples + + iex> get_processor_marker!(123) + %ProcessorMarker{} + + iex> get_processor_marker!(456) + ** (Ecto.NoResultsError) + + """ + def get_processor_marker!(id), do: Repo.get!(ProcessorMarker, id) + + @doc """ + Gets the processor marker for a specific cache version + """ + def get_processor_marker!(type, cache_version) do + Repo.get_by(ProcessorMarker, type: type, cache_version: cache_version) + end + + @doc """ + Deletes a processor_marker. + + ## Examples + + iex> delete_processor_marker(processor_marker) + {:ok, %ProcessorMarker{}} + + iex> delete_processor_marker(processor_marker) + {:error, %Ecto.Changeset{}} + + """ + def delete_processor_marker(%ProcessorMarker{} = processor_marker) do + Repo.delete(processor_marker) + end + + @doc """ + Writes or updates processor markers + """ + def write_processor_marker(attrs \\ %{}) do + %ProcessorMarker{} + |> ProcessorMarker.changeset(attrs) + |> Repo.insert( + on_conflict: [ + set: [cache_location: attrs.cache_location, cache_record_id: attrs.cache_record_id] + ], + conflict_target: [:type, :cache_version] + ) + end + + @doc """ + Gets a single Resource by id from Figgy Database. + + Raises `Ecto.NoResultsError` if the Resource does not exist. + + ## Examples + + iex> get_figgy_resource!(123) + %HydrationCacheEntry{} + + iex> get_figgy_resource!(456) + ** (Ecto.NoResultsError) + + """ + def get_figgy_resource!(id), do: FiggyRepo.get!(FiggyResource, id) + + @doc """ + ## Description + Query to return a limited number of figgy resources using the value of a marker tuple. + + 1. Orders figgy records by updated_at and then id in ascending order + 2. Selects records where + - record.updated_at equals to marker.updated_at AND + - record.id is greater than marker.id + - OR + - record.updated_at is greater than marker.updated_at + 3. Return the number of records indicated by the count parameter + + ## Examples + + Records in Figgy: + { id: "a", updated_at: 1 } + { id: "b", updated_at: 2 } + { id: "d", updated_at: 3 } # Duplicate time stamp + { id: "c", updated_at: 3 } # Duplicate time stamp + { id: "e", updated_at: 3 } # Duplicate time stamp + { id: "a", updated_at: 4 } # Repeated id (edited and saved) + + Function calls: + + We get the records back ordered by timestamp, then id: + + get_figgy_resources_since!({1, "a"}, 2) -> + { id: "b", updated_at: 2 } + { id: "c", updated_at: 3 } + + + We get the records for the same time stamp for ids after the one given: + + get_figgy_resources_since!({3, "c"}, 3) -> + { id: "d", updated_at: 3 } + { id: "e", updated_at: 3 } + { id: "a", updated_at: 4 } + + + We get a record again if it's been updated since it was last fetched: + + get_figgy_resources_since!({1, "a"}, 5) -> + { id: "b", updated_at: 2 } + { id: "c", updated_at: 3 } + { id: "d", updated_at: 3 } + { id: "e", updated_at: 3 } + { id: "a", updated_at: 4 } + """ + + @spec get_figgy_resources_since!( + marker :: ResourceMarker.t(), + count :: integer + ) :: list(FiggyResource) + def get_figgy_resources_since!(%ResourceMarker{timestamp: updated_at, id: id}, count) do + query = + from r in FiggyResource, + where: (r.updated_at == ^updated_at and r.id > ^id) or r.updated_at > ^updated_at, + limit: ^count, + order_by: [asc: r.updated_at, asc: r.id] + + FiggyRepo.all(query) + end + + @spec get_figgy_resources_since!( + nil, + count :: integer + ) :: list(FiggyResource) + def get_figgy_resources_since!(nil, count) do + query = + from r in FiggyResource, + limit: ^count, + order_by: [asc: r.updated_at, asc: r.id] + + FiggyRepo.all(query) + end +end diff --git a/lib/dpul_collections/indexing_pipeline/figgy_hydrator.ex b/lib/dpul_collections/indexing_pipeline/figgy_hydrator.ex new file mode 100644 index 00000000..6e7db274 --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/figgy_hydrator.ex @@ -0,0 +1,91 @@ +defmodule DpulCollections.IndexingPipeline.FiggyHydrator do + @moduledoc """ + Broadway consumer that demands Figgy records and caches them in the database. + """ + alias DpulCollections.IndexingPipeline + alias DpulCollections.IndexingPipeline.FiggyProducer + use Broadway + + @type start_opts :: + {:cache_version, Integer} + | {:producer_module, Module} + | {:producer_options, any()} + | {:batch_size, Integer} + @spec start_link([start_opts()]) :: Broadway.on_start() + def start_link(options \\ []) do + default = [ + cache_version: 0, + producer_module: FiggyProducer, + producer_options: 0, + batch_size: 10 + ] + + options = Keyword.merge(default, options) + + Broadway.start_link(__MODULE__, + name: __MODULE__, + producer: [ + module: {options[:producer_module], options[:producer_options]} + ], + processors: [ + default: [] + ], + batchers: [ + default: [batch_size: options[:batch_size]] + ], + context: %{cache_version: options[:cache_version]} + ) + end + + @impl Broadway + # (note that the start_link param will populate _context) + # Only write to the cache if it's an ephemera folder + def handle_message( + _processor, + message = %Broadway.Message{data: %{internal_resource: "EphemeraFolder"}}, + %{cache_version: cache_version} + ) do + write_to_hydration_cache(message, cache_version) + + message + end + + @impl Broadway + # Only write to the cache if it's an ephemera term + def handle_message( + _processor, + message = %Broadway.Message{data: %{internal_resource: "EphemeraTerm"}}, + %{cache_version: cache_version} + ) do + write_to_hydration_cache(message, cache_version) + + message + end + + @impl Broadway + # fallback so we acknowledge messages we intentionally don't write + def handle_message(_processor, message, %{cache_version: _cache_version}) do + message + end + + defp write_to_hydration_cache(message, cache_version) do + # store in HydrationCache: + # - data (blob) - this is the record + # - cache_order (datetime) - this is our own new timestamp for this table + # - cache_version (this only changes manually, we have to hold onto it as state) + # - record_id (varchar) - the figgy UUID + # - source_cache_order (datetime) - the figgy updated_at + {:ok, _} = + IndexingPipeline.write_hydration_cache_entry(%{ + cache_version: cache_version, + record_id: message.data.id, + source_cache_order: message.data.updated_at, + data: message.data |> Map.from_struct() |> Map.delete(:__meta__) + }) + end + + @impl Broadway + def handle_batch(_batcher, messages, _batch_info, _context) do + messages + end +end diff --git a/lib/dpul_collections/indexing_pipeline/figgy_producer.ex b/lib/dpul_collections/indexing_pipeline/figgy_producer.ex new file mode 100644 index 00000000..5fe4b196 --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/figgy_producer.ex @@ -0,0 +1,166 @@ +defmodule DpulCollections.IndexingPipeline.FiggyProducer do + @moduledoc """ + GenStage Producer that pulls records from the Figgy database + """ + + alias DpulCollections.IndexingPipeline + alias DpulCollections.IndexingPipeline.{FiggyResource, ResourceMarker} + use GenStage + @behaviour Broadway.Acknowledger + + def start_link(cache_version \\ 0) do + GenStage.start_link(__MODULE__, cache_version) + end + + @impl GenStage + @type state :: %{ + last_queried_marker: ResourceMarker.t(), + pulled_records: [ResourceMarker.t()], + acked_records: [ResourceMarker.t()], + cache_version: Integer + } + def init(cache_version) do + last_queried_marker = IndexingPipeline.get_processor_marker!("hydrator", cache_version) + + initial_state = %{ + last_queried_marker: last_queried_marker |> ResourceMarker.from(), + pulled_records: [], + acked_records: [], + cache_version: cache_version + } + + {:producer, initial_state} + end + + @impl GenStage + def handle_demand( + demand, + state = %{ + last_queried_marker: last_queried_marker, + pulled_records: pulled_records, + acked_records: acked_records + } + ) + when demand > 0 do + records = IndexingPipeline.get_figgy_resources_since!(last_queried_marker, demand) + + new_state = + state + |> Map.put( + :last_queried_marker, + Enum.at(records, -1) |> ResourceMarker.from() || last_queried_marker + ) + |> Map.put( + :pulled_records, + Enum.concat(pulled_records, Enum.map(records, &ResourceMarker.from/1)) + ) + |> Map.put(:acked_records, acked_records) + + {:noreply, Enum.map(records, &wrap_record/1), new_state} + end + + @impl GenStage + def handle_info({:ack, :figgy_producer_ack, pending_markers}, state) do + messages = [] + + sorted_markers = + (state.acked_records ++ pending_markers) + |> Enum.uniq() + |> Enum.sort(ResourceMarker) + + state = + state + |> Map.put(:acked_records, sorted_markers) + + {new_state, last_removed_marker} = process_markers(state, nil) + + if last_removed_marker != nil do + %ResourceMarker{timestamp: cache_location, id: cache_record_id} = last_removed_marker + + IndexingPipeline.write_processor_marker(%{ + type: "hydrator", + cache_version: state.cache_version, + cache_location: cache_location, + cache_record_id: cache_record_id + }) + end + + notify_ack(pending_markers |> length()) + {:noreply, messages, new_state} + end + + # Updates state, removing any acked_records from pulled_records and returns the + # last removed marker so it can be saved to the database. + # If the first element of pulled_records is the first element of + # acked_records, remove it from both and process again. + @spec process_markers(state(), ResourceMarker.t()) :: {state, ResourceMarker.t()} + defp process_markers( + state = %{ + pulled_records: [first_record | pulled_records], + acked_records: [first_record | acked_records] + }, + _last_removed_marker + ) do + state + |> Map.put(:pulled_records, pulled_records) + |> Map.put(:acked_records, acked_records) + |> process_markers(first_record) + end + + # Handles the case where the producer crashes, resets pulled_records to an + # empty list, and then gets a message acknowledgement. + defp process_markers( + state = %{pulled_records: [], acked_records: acked_records}, + last_removed_marker + ) + when length(acked_records) > 0 do + state + |> Map.put(:acked_records, []) + |> process_markers(last_removed_marker) + end + + defp process_markers( + state = %{ + pulled_records: [first_pulled_record | _], + acked_records: [first_acked_record | tail_acked_records] + }, + last_removed_marker + ) do + if ResourceMarker.compare(first_acked_record, first_pulled_record) == :lt do + state + |> Map.put(:acked_records, tail_acked_records) + |> process_markers(last_removed_marker) + else + {state, last_removed_marker} + end + end + + defp process_markers(state, last_removed_marker), do: {state, last_removed_marker} + + @impl Broadway.Acknowledger + def ack({figgy_producer_pid, :figgy_producer_ack}, successful, failed) do + # TODO: Do some error handling + acked_markers = (successful ++ failed) |> Enum.map(&ResourceMarker.from/1) + send(figgy_producer_pid, {:ack, :figgy_producer_ack, acked_markers}) + end + + # This happens when ack is finished, we listen to this telemetry event in + # tests so we know when the Hydrator's done processing a message. + defp notify_ack(acked_message_count) do + :telemetry.execute( + [:figgy_producer, :ack, :done], + %{}, + %{acked_count: acked_message_count} + ) + end + + # TODO: Function to reset current index version's saved marker, i.e. full reindex + + @spec wrap_record(record :: FiggyResource) :: Broadway.Message.t() + defp wrap_record(record) do + %Broadway.Message{ + data: record, + acknowledger: {__MODULE__, {self(), :figgy_producer_ack}, nil} + } + end +end diff --git a/lib/dpul_collections/indexing_pipeline/figgy_resource.ex b/lib/dpul_collections/indexing_pipeline/figgy_resource.ex new file mode 100644 index 00000000..dbc819ae --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/figgy_resource.ex @@ -0,0 +1,15 @@ +defmodule DpulCollections.IndexingPipeline.FiggyResource do + @moduledoc """ + Schema for a resource in the Figgy database + """ + use Ecto.Schema + + @primary_key {:id, :binary_id, autogenerate: true} + schema "orm_resources" do + field :internal_resource, :string + field :lock_version, :integer + field :metadata, :map + field :created_at, :utc_datetime_usec + field :updated_at, :utc_datetime_usec + end +end diff --git a/lib/dpul_collections/indexing_pipeline/hydration_cache_entry.ex b/lib/dpul_collections/indexing_pipeline/hydration_cache_entry.ex new file mode 100644 index 00000000..afb6cc7d --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/hydration_cache_entry.ex @@ -0,0 +1,20 @@ +defmodule DpulCollections.IndexingPipeline.HydrationCacheEntry do + use Ecto.Schema + import Ecto.Changeset + + schema "hydration_cache_entries" do + field :data, :map + field :cache_version, :integer + field :record_id, :string + field :source_cache_order, :utc_datetime_usec + + timestamps(updated_at: :cache_order, inserted_at: false, type: :utc_datetime_usec) + end + + @doc false + def changeset(hydration_cache_entry, attrs) do + hydration_cache_entry + |> cast(attrs, [:data, :cache_version, :record_id, :source_cache_order]) + |> validate_required([:data, :cache_version, :record_id, :source_cache_order]) + end +end diff --git a/lib/dpul_collections/indexing_pipeline/processor_marker.ex b/lib/dpul_collections/indexing_pipeline/processor_marker.ex new file mode 100644 index 00000000..5de7a27c --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/processor_marker.ex @@ -0,0 +1,20 @@ +defmodule DpulCollections.IndexingPipeline.ProcessorMarker do + use Ecto.Schema + import Ecto.Changeset + + schema "processor_markers" do + field :type, :string + field :cache_location, :utc_datetime_usec + field :cache_record_id, :string + field :cache_version, :integer + + timestamps(type: :utc_datetime_usec) + end + + @doc false + def changeset(processor_marker, attrs) do + processor_marker + |> cast(attrs, [:cache_location, :cache_version, :type, :cache_record_id]) + |> validate_required([:cache_location, :cache_version, :type, :cache_record_id]) + end +end diff --git a/lib/dpul_collections/indexing_pipeline/resource_marker.ex b/lib/dpul_collections/indexing_pipeline/resource_marker.ex new file mode 100644 index 00000000..8a84871c --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/resource_marker.ex @@ -0,0 +1,46 @@ +defmodule DpulCollections.IndexingPipeline.ResourceMarker do + @type t :: %__MODULE__{id: String.t(), timestamp: UTCDateTime} + defstruct [:id, :timestamp] + + alias DpulCollections.IndexingPipeline.{ProcessorMarker, FiggyResource} + @spec from(%ProcessorMarker{}) :: t() + @doc """ + Converts ProcessorMarker struct to a marker tuple. + """ + def from(%ProcessorMarker{ + cache_location: cache_location, + cache_record_id: cache_record_id + }) do + %__MODULE__{timestamp: cache_location, id: cache_record_id} + end + + def from(nil), do: nil + + @spec from(%FiggyResource{}) :: t() + def from(%FiggyResource{updated_at: updated_at, id: id}) do + %__MODULE__{timestamp: updated_at, id: id} + end + + @spec from(%Broadway.Message{}) :: t() + def from(%Broadway.Message{data: data}) do + from(data) + end + + @spec compare(marker1 :: t(), marker2 :: t()) :: :gt | :lt | :eq + def compare(marker1, marker1), do: :eq + + def compare(%__MODULE__{timestamp: marker_date1}, %__MODULE__{timestamp: marker_date2}) + when marker_date1 != marker_date2 do + DateTime.compare(marker_date1, marker_date2) + end + + def compare(%__MODULE__{timestamp: marker_date1, id: marker_id1}, %__MODULE__{ + timestamp: marker_date1, + id: marker_id2 + }) do + cond do + marker_id1 < marker_id2 -> :lt + marker_id1 > marker_id2 -> :gt + end + end +end diff --git a/mix.exs b/mix.exs index 47440b64..3ba25ec4 100644 --- a/mix.exs +++ b/mix.exs @@ -34,6 +34,8 @@ defmodule DpulCollections.MixProject do # Specifies which paths to compile per environment. defp elixirc_paths(:test), do: ["lib", "test/support"] + # Provide access to FiggyTestProducer in dev. + defp elixirc_paths(:dev), do: ["lib", "test/support"] defp elixirc_paths(_), do: ["lib"] # Specifies your project dependencies. @@ -67,7 +69,9 @@ defmodule DpulCollections.MixProject do {:jason, "~> 1.2"}, {:dns_cluster, "~> 0.1.1"}, {:bandit, "~> 1.2"}, - {:excoveralls, "~> 0.18", only: :test} + {:excoveralls, "~> 0.18", only: :test}, + {:broadway, "~> 1.0"}, + {:ex_doc, "~> 0.21", only: :dev, runtime: false} ] end diff --git a/mix.lock b/mix.lock index e0fbf8d1..00180f8e 100644 --- a/mix.lock +++ b/mix.lock @@ -1,24 +1,32 @@ %{ "bandit": {:hex, :bandit, "1.5.2", "ed0a41c43a9e529c670d0fd48371db4027e7b80d43b1942893e17deb8bed0540", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "35ddbdce7e8a2a3c6b5093f7299d70832a43ed2f4a1852885a61d334cab1b4ad"}, + "broadway": {:hex, :broadway, "1.1.0", "8ed3aea01fd6f5640b3e1515b90eca51c4fc1fac15fb954cdcf75dc054ae719c", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "25e315ef1afe823129485d981dcc6d9b221cea30e625fd5439e9b05f44fb60e4"}, "castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"}, "db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dns_cluster": {:hex, :dns_cluster, "0.1.3", "0bc20a2c88ed6cc494f2964075c359f8c2d00e1bf25518a6a6c7fd277c9b0c66", [:mix], [], "hexpm", "46cb7c4a1b3e52c7ad4cbe33ca5079fbde4840dedeafca2baf77996c2da1bc33"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, "ecto": {:hex, :ecto, "3.11.2", "e1d26be989db350a633667c5cda9c3d115ae779b66da567c68c80cfb26a8c9ee", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3c38bca2c6f8d8023f2145326cc8a80100c3ffe4dcbd9842ff867f7fc6156c65"}, "ecto_sql": {:hex, :ecto_sql, "3.11.2", "c7cc7f812af571e50b80294dc2e535821b3b795ce8008d07aa5f336591a185a8", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "73c07f995ac17dbf89d3cfaaf688fcefabcd18b7b004ac63b0dc4ef39499ed6b"}, "esbuild": {:hex, :esbuild, "0.8.1", "0cbf919f0eccb136d2eeef0df49c4acf55336de864e63594adcea3814f3edf41", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "25fc876a67c13cb0a776e7b5d7974851556baeda2085296c14ab48555ea7560f"}, + "ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"}, "excoveralls": {:hex, :excoveralls, "0.18.1", "a6f547570c6b24ec13f122a5634833a063aec49218f6fff27de9df693a15588c", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d65f79db146bb20399f23046015974de0079668b9abb2f5aac074d078da60b8d"}, "expo": {:hex, :expo, "0.5.2", "beba786aab8e3c5431813d7a44b828e7b922bfa431d6bfbada0904535342efe2", [:mix], [], "hexpm", "8c9bfa06ca017c9cb4020fabe980bc7fdb1aaec059fd004c2ab3bff03b1c599c"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, "floki": {:hex, :floki, "0.36.2", "a7da0193538c93f937714a6704369711998a51a6164a222d710ebd54020aa7a3", [:mix], [], "hexpm", "a8766c0bc92f074e5cb36c4f9961982eda84c5d2b8e979ca67f5c268ec8ed580"}, + "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "gettext": {:hex, :gettext, "0.24.0", "6f4d90ac5f3111673cbefc4ebee96fe5f37a114861ab8c7b7d5b30a1108ce6d8", [:mix], [{:expo, "~> 0.5.1", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "bdf75cdfcbe9e4622dd18e034b227d77dd17f0f133853a1c73b97b3d6c770e8b"}, "heroicons": {:git, "https://github.com/tailwindlabs/heroicons.git", "88ab3a0d790e6a47404cba02800a6b25d2afae50", [tag: "v2.1.1", sparse: "optimized"]}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, + "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mint": {:hex, :mint, "1.6.0", "88a4f91cd690508a04ff1c3e28952f322528934be541844d54e0ceb765f01d5e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "3c5ae85d90a5aca0a49c0d8b67360bbe407f3b54f1030a111047ff988e8fefaa"}, "nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "phoenix": {:hex, :phoenix, "1.7.12", "1cc589e0eab99f593a8aa38ec45f15d25297dd6187ee801c8de8947090b5a9d3", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "d646192fbade9f485b01bc9920c139bfdd19d0f8df3d73fd8eaf2dfbe0d2837c"}, "phoenix_ecto": {:hex, :phoenix_ecto, "4.6.1", "96798325fab2fed5a824ca204e877b81f9afd2e480f581e81f7b4b64a5a477f2", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.1", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.17", [hex: :postgrex, repo: "hexpm", optional: true]}], "hexpm", "0ae544ff99f3c482b0807c5cec2c8289e810ecacabc04959d82c3337f4703391"}, diff --git a/priv/figgy_repo/migrations/.keep b/priv/figgy_repo/migrations/.keep new file mode 100644 index 00000000..e69de29b diff --git a/priv/repo/migrations/20240724200512_create_hydration_cache_entries.exs b/priv/repo/migrations/20240724200512_create_hydration_cache_entries.exs new file mode 100644 index 00000000..ba177b5c --- /dev/null +++ b/priv/repo/migrations/20240724200512_create_hydration_cache_entries.exs @@ -0,0 +1,22 @@ +defmodule DpulCollections.Repo.Migrations.CreateHydrationCacheEntries do + use Ecto.Migration + + def change do + create table(:hydration_cache_entries) do + add :data, :map + add :cache_version, :integer + add :record_id, :string + add :source_cache_order, :utc_datetime_usec + + timestamps(updated_at: :cache_order, inserted_at: false, type: :utc_datetime_usec) + end + + create( + unique_index( + :hydration_cache_entries, + [:record_id, :cache_version], + name: :record_id_cache_version_idx + ) + ) + end +end diff --git a/priv/repo/migrations/20240724204108_create_processor_markers.exs b/priv/repo/migrations/20240724204108_create_processor_markers.exs new file mode 100644 index 00000000..60c46dd9 --- /dev/null +++ b/priv/repo/migrations/20240724204108_create_processor_markers.exs @@ -0,0 +1,16 @@ +defmodule DpulCollections.Repo.Migrations.CreateProcessorMarkers do + use Ecto.Migration + + def change do + create table(:processor_markers) do + add :cache_location, :utc_datetime_usec + add :cache_record_id, :string + add :cache_version, :integer + add :type, :string + + timestamps(type: :utc_datetime_usec) + end + + create index(:processor_markers, [:type, :cache_version], unique: true) + end +end diff --git a/test/dpul_collections/indexing_pipeline/figgy_hydrator_test.exs b/test/dpul_collections/indexing_pipeline/figgy_hydrator_test.exs new file mode 100644 index 00000000..853f4270 --- /dev/null +++ b/test/dpul_collections/indexing_pipeline/figgy_hydrator_test.exs @@ -0,0 +1,43 @@ +defmodule DpulCollections.IndexingPipeline.FiggyHydratorTest do + use DpulCollections.DataCase + + alias DpulCollections.IndexingPipeline.{FiggyHydrator, FiggyResource, HydrationCacheEntry} + + describe "FiggyHydrator" do + test "handle_message/3 only writes EphemeraFolders and EphemeraTerms to the HydrationCache" do + ephemera_folder_message = %Broadway.Message{ + acknowledger: nil, + data: %FiggyResource{ + id: "47276197-e223-471c-99d7-405c5f6c5285", + updated_at: ~U[2018-03-09 20:19:34.486004Z], + internal_resource: "EphemeraFolder" + } + } + + ephemera_term_message = %Broadway.Message{ + acknowledger: nil, + data: %FiggyResource{ + id: "3cb7627b-defc-401b-9959-42ebc4488f74", + updated_at: ~U[2018-03-09 20:19:33.414040Z], + internal_resource: "EphemeraTerm" + } + } + + scanned_resource_message = %Broadway.Message{ + acknowledger: nil, + data: %FiggyResource{ + id: "69990556-434c-476a-9043-bbf9a1bda5a4", + updated_at: ~U[2018-03-09 20:19:34.465203Z], + internal_resource: "ScannedResource" + } + } + + FiggyHydrator.handle_message(nil, ephemera_folder_message, %{cache_version: 0}) + FiggyHydrator.handle_message(nil, ephemera_term_message, %{cache_version: 0}) + FiggyHydrator.handle_message(nil, scanned_resource_message, %{cache_version: 0}) + + entry_count = Repo.aggregate(HydrationCacheEntry, :count) + assert entry_count == 2 + end + end +end diff --git a/test/dpul_collections/indexing_pipeline/figgy_producer_test.exs b/test/dpul_collections/indexing_pipeline/figgy_producer_test.exs new file mode 100644 index 00000000..0e2ce0fb --- /dev/null +++ b/test/dpul_collections/indexing_pipeline/figgy_producer_test.exs @@ -0,0 +1,375 @@ +defmodule DpulCollections.IndexingPipeline.FiggyProducerTest do + use DpulCollections.DataCase + + alias DpulCollections.IndexingPipeline.{FiggyProducer, FiggyResource, ResourceMarker} + alias DpulCollections.IndexingPipeline + + describe "FiggyProducer" do + test "handle_demand/2 with initial state and demand > 1 returns figgy resources" do + {marker1, marker2, _marker3} = FiggyTestFixtures.markers() + {:producer, initial_state} = FiggyProducer.init(0) + {:noreply, messages, new_state} = FiggyProducer.handle_demand(2, initial_state) + + ids = Enum.map(messages, fn %Broadway.Message{data: %FiggyResource{id: id}} -> id end) + + assert ids == [marker1.id, marker2.id] + + expected_state = + %{ + last_queried_marker: marker2, + pulled_records: [ + marker1, + marker2 + ], + acked_records: [], + cache_version: 0 + } + + assert new_state == expected_state + end + + test "handle_demand/2 with consecutive state returns a new record" do + {marker1, marker2, marker3} = FiggyTestFixtures.markers() + + initial_state = + %{ + last_queried_marker: marker2, + pulled_records: [ + marker1, + marker2 + ], + acked_records: [], + cache_version: 0 + } + + {:noreply, messages, new_state} = FiggyProducer.handle_demand(1, initial_state) + + ids = Enum.map(messages, fn %Broadway.Message{data: %FiggyResource{id: id}} -> id end) + assert ids == [marker3.id] + + expected_state = + %{ + last_queried_marker: marker3, + pulled_records: [ + marker1, + marker2, + marker3 + ], + acked_records: [], + cache_version: 0 + } + + assert new_state == expected_state + end + + test "handle_demand/2 when the marker record has been updated" do + {_marker1, marker2, marker3} = FiggyTestFixtures.markers() + + fabricated_marker = %ResourceMarker{ + timestamp: DateTime.add(marker2.timestamp, 1, :microsecond), + id: marker3.id + } + + initial_state = + %{ + # This is a manufactured marker. + # This timestamp is set to be right before the actual record updated_at. + last_queried_marker: fabricated_marker, + pulled_records: [ + fabricated_marker + ], + acked_records: [], + cache_version: 0 + } + + {:noreply, messages, new_state} = FiggyProducer.handle_demand(1, initial_state) + + ids = Enum.map(messages, fn %Broadway.Message{data: %FiggyResource{id: id}} -> id end) + assert ids == [marker3.id] + + expected_state = + %{ + last_queried_marker: marker3, + pulled_records: [ + fabricated_marker, + marker3 + ], + acked_records: [], + cache_version: 0 + } + + assert new_state == expected_state + end + + test "handle_demand/2 when the query returns no records" do + {_marker1, _marker2, marker3} = FiggyTestFixtures.markers() + # Move last_queried marker to a marker 200 years in the future. + fabricated_marker = %ResourceMarker{ + timestamp: DateTime.add(marker3.timestamp, 356 * 10, :day), + id: marker3.id + } + + initial_state = + %{ + last_queried_marker: fabricated_marker, + pulled_records: [], + acked_records: [], + cache_version: 0 + } + + {:noreply, messages, new_state} = FiggyProducer.handle_demand(1, initial_state) + + assert messages == [] + + expected_state = + %{ + last_queried_marker: fabricated_marker, + pulled_records: [], + acked_records: [], + cache_version: 0 + } + + assert new_state == expected_state + end + + test "handle_info/2 with figgy producer ack, acknowledging first and third record" do + {marker1, marker2, marker3} = FiggyTestFixtures.markers() + cache_version = 1 + + initial_state = %{ + last_queried_marker: marker3, + pulled_records: [ + marker1, + marker2, + marker3 + ], + acked_records: [], + cache_version: cache_version + } + + acked_markers = + [ + marker1, + marker3 + ] + |> Enum.sort() + + expected_state = %{ + last_queried_marker: marker3, + pulled_records: [ + marker2, + marker3 + ], + acked_records: [ + marker3 + ], + cache_version: cache_version + } + + {:noreply, [], new_state} = + FiggyProducer.handle_info({:ack, :figgy_producer_ack, acked_markers}, initial_state) + + assert new_state == expected_state + processor_marker = IndexingPipeline.get_processor_marker!("hydrator", cache_version) + + assert marker1 == %ResourceMarker{ + timestamp: processor_marker.cache_location, + id: processor_marker.cache_record_id + } + + initial_state = new_state + acked_markers = [marker2] + + expected_state = %{ + last_queried_marker: marker3, + pulled_records: [], + acked_records: [], + cache_version: cache_version + } + + {:noreply, [], new_state} = + FiggyProducer.handle_info({:ack, :figgy_producer_ack, acked_markers}, initial_state) + + assert new_state == expected_state + + processor_marker = IndexingPipeline.get_processor_marker!("hydrator", cache_version) + + assert marker3 == %ResourceMarker{ + timestamp: processor_marker.cache_location, + id: processor_marker.cache_record_id + } + end + + test "handle_info/2 with figgy producer ack, nothing to acknowledge" do + {marker1, marker2, marker3} = FiggyTestFixtures.markers() + + initial_state = %{ + last_queried_marker: marker3, + pulled_records: [ + marker1, + marker2, + marker3 + ], + acked_records: [], + cache_version: 1 + } + + acked_markers = + [ + marker2 + ] + |> Enum.sort() + + expected_state = %{ + last_queried_marker: marker3, + pulled_records: [ + marker1, + marker2, + marker3 + ], + acked_records: [ + marker2 + ], + cache_version: 1 + } + + {:noreply, [], new_state} = + FiggyProducer.handle_info({:ack, :figgy_producer_ack, acked_markers}, initial_state) + + assert new_state == expected_state + processor_marker = IndexingPipeline.get_processor_marker!("hydrator", 1) + assert processor_marker == nil + end + + test "handle_info/2 with figgy producer ack, empty pulled_records" do + {marker1, _marker2, marker3} = FiggyTestFixtures.markers() + + initial_state = %{ + last_queried_marker: marker3, + pulled_records: [], + acked_records: [], + cache_version: 1 + } + + acked_markers = + [ + marker1 + ] + |> Enum.sort() + + expected_state = %{ + last_queried_marker: marker3, + pulled_records: [], + acked_records: [], + cache_version: 1 + } + + {:noreply, [], new_state} = + FiggyProducer.handle_info({:ack, :figgy_producer_ack, acked_markers}, initial_state) + + assert new_state == expected_state + processor_marker = IndexingPipeline.get_processor_marker!("hydrator", 1) + assert processor_marker == nil + end + + test "handle_info/2 with figgy producer ack, duplicate ack records" do + {marker1, marker2, marker3} = FiggyTestFixtures.markers() + + initial_state = %{ + last_queried_marker: marker3, + pulled_records: [ + marker1, + marker2 + ], + acked_records: [ + marker2 + ], + cache_version: 1 + } + + acked_markers = + [ + marker2 + ] + |> Enum.sort() + + expected_state = %{ + last_queried_marker: marker3, + pulled_records: [ + marker1, + marker2 + ], + acked_records: [ + marker2 + ], + cache_version: 1 + } + + {:noreply, [], new_state} = + FiggyProducer.handle_info({:ack, :figgy_producer_ack, acked_markers}, initial_state) + + assert new_state == expected_state + processor_marker = IndexingPipeline.get_processor_marker!("hydrator", 1) + assert processor_marker == nil + end + + test "handle_info/2 with figgy producer ack, acking after crash and respawn" do + {marker1, marker2, _marker3} = FiggyTestFixtures.markers() + + # Producer sent out marker1 then crashed, started again, then sent out + # marker1 and marker2. + # The consumer has marker1, marker1, and marker2 to process. + initial_state = %{ + last_queried_marker: marker2, + pulled_records: [ + marker1, + marker2 + ], + acked_records: [], + cache_version: 1 + } + + first_ack = + [ + marker1 + ] + + expected_state = %{ + last_queried_marker: marker2, + pulled_records: [ + marker2 + ], + acked_records: [], + cache_version: 1 + } + + {:noreply, [], new_state} = + FiggyProducer.handle_info({:ack, :figgy_producer_ack, first_ack}, initial_state) + + assert new_state == expected_state + + second_ack = + [ + marker1, + marker2 + ] + + expected_state = %{ + last_queried_marker: marker2, + pulled_records: [], + acked_records: [], + cache_version: 1 + } + + {:noreply, [], new_state} = + FiggyProducer.handle_info({:ack, :figgy_producer_ack, second_ack}, new_state) + + assert new_state == expected_state + + processor_marker = + IndexingPipeline.get_processor_marker!("hydrator", 1) |> ResourceMarker.from() + + assert processor_marker == marker2 + end + end +end diff --git a/test/dpul_collections/indexing_pipeline/integration/figgy_hydrator_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/figgy_hydrator_integration_test.exs new file mode 100644 index 00000000..16f95c48 --- /dev/null +++ b/test/dpul_collections/indexing_pipeline/integration/figgy_hydrator_integration_test.exs @@ -0,0 +1,144 @@ +defmodule DpulCollections.IndexingPipeline.FiggyHydratorIntegrationTest do + use DpulCollections.DataCase + + alias DpulCollections.{FiggyRepo, Repo} + alias DpulCollections.IndexingPipeline.{FiggyHydrator, FiggyResource, HydrationCacheEntry} + alias DpulCollections.IndexingPipeline + + def start_producer(batch_size \\ 1) do + pid = self() + + :telemetry.attach( + "ack-handler-#{pid |> :erlang.pid_to_list()}", + [:figgy_producer, :ack, :done], + fn _event, _, _, _ -> send(pid, {:ack_done}) end, + nil + ) + + {:ok, hydrator} = + FiggyHydrator.start_link( + cache_version: 0, + producer_module: FiggyTestProducer, + producer_options: {self()}, + batch_size: batch_size + ) + + hydrator + end + + test "hydration cache entry creation" do + {marker1, _marker2, _marker3} = FiggyTestFixtures.markers() + hydrator = start_producer() + + FiggyTestProducer.process(1) + assert_receive {:ack_done} + + cache_entry = IndexingPipeline.list_hydration_cache_entries() |> hd + assert cache_entry.record_id == marker1.id + assert cache_entry.cache_version == 0 + assert cache_entry.source_cache_order == marker1.timestamp + marker_1_id = marker1.id + + assert %{ + "id" => ^marker_1_id, + "internal_resource" => "EphemeraTerm" + } = cache_entry.data + + hydrator |> Broadway.stop(:normal) + end + + test "doesn't override newer hydration cache entries" do + # Create a hydration cache entry for a record that has a source_cache_order + # in the future. + IndexingPipeline.write_hydration_cache_entry(%{ + cache_version: 0, + record_id: "3cb7627b-defc-401b-9959-42ebc4488f74", + source_cache_order: ~U[2200-03-09 20:19:33.414040Z], + data: %{} + }) + + # Process that past record. + hydrator = start_producer() + FiggyTestProducer.process(1) + assert_receive {:ack_done} + hydrator |> Broadway.stop(:normal) + # Ensure there's only one hydration cache entry. + entries = IndexingPipeline.list_hydration_cache_entries() + assert length(entries) == 1 + # Ensure that entry has the source_cache_order we set at the beginning. + entry = entries |> hd + assert entry.source_cache_order == ~U[2200-03-09 20:19:33.414040Z] + end + + test "updates existing hydration cache entries" do + {marker1, _marker2, _marker3} = FiggyTestFixtures.markers() + # Create a hydration cache entry for a record that has a source_cache_order + # in the future. + IndexingPipeline.write_hydration_cache_entry(%{ + cache_version: 0, + record_id: "3cb7627b-defc-401b-9959-42ebc4488f74", + source_cache_order: ~U[1900-03-09 20:19:33.414040Z], + data: %{} + }) + + # Process that past record. + hydrator = start_producer() + FiggyTestProducer.process(1) + assert_receive {:ack_done} + hydrator |> Broadway.stop(:normal) + # Ensure there's only one hydration cache entry. + entries = IndexingPipeline.list_hydration_cache_entries() + assert length(entries) == 1 + # Ensure that entry has the source_cache_order we set at the beginning. + entry = entries |> hd + assert entry.source_cache_order == marker1.timestamp + end + + test "loads a marker from the database on startup" do + {marker1, marker2, _marker3} = FiggyTestFixtures.markers() + # Create a marker + IndexingPipeline.write_processor_marker(%{ + type: "hydrator", + cache_version: 0, + cache_location: marker1.timestamp, + cache_record_id: marker1.id + }) + + # Start the producer + hydrator = start_producer() + # Make sure the first record that comes back is what we expect + FiggyTestProducer.process(1) + assert_receive {:ack_done} + cache_entry = IndexingPipeline.list_hydration_cache_entries() |> hd + assert cache_entry.record_id == marker2.id + assert cache_entry.cache_version == 0 + assert cache_entry.source_cache_order == marker2.timestamp + hydrator |> Broadway.stop(:normal) + end + + def wait_for_hydrated_id(id, cache_version \\ 0) do + case IndexingPipeline.get_processor_marker!("hydrator", 0) do + %{cache_record_id: ^id} -> + true + + _ -> + :timer.sleep(50) + wait_for_hydrated_id(id, cache_version) + end + end + + test "a full hydration run" do + # Start the producer + hydrator = start_producer(50) + # Demand all of them. + count = FiggyRepo.aggregate(FiggyResource, :count) + FiggyTestProducer.process(count) + # Wait for the last ID to show up. + task = Task.async(fn -> wait_for_hydrated_id(FiggyTestSupport.last_marker().id) end) + Task.await(task, 15000) + entry_count = Repo.aggregate(HydrationCacheEntry, :count) + assert FiggyTestSupport.included_resource_count() == entry_count + :timer.sleep(2000) + hydrator |> Broadway.stop(:normal) + end +end diff --git a/test/dpul_collections/indexing_pipeline/resource_marker_test.exs b/test/dpul_collections/indexing_pipeline/resource_marker_test.exs new file mode 100644 index 00000000..8d7d4227 --- /dev/null +++ b/test/dpul_collections/indexing_pipeline/resource_marker_test.exs @@ -0,0 +1,29 @@ +defmodule DpulCollections.IndexingPipeline.ResourceMarkerTest do + use DpulCollections.DataCase + + alias DpulCollections.IndexingPipeline.ResourceMarker + + describe "marker comparison" do + test "sorts markers appropriately" do + {marker1, marker2, marker3} = FiggyTestFixtures.markers() + + assert Enum.sort([marker1, marker3, marker2], ResourceMarker) == [ + marker1, + marker2, + marker3 + ] + + assert ResourceMarker.compare(marker1, marker1) == :eq + assert ResourceMarker.compare(marker2, marker1) == :gt + assert ResourceMarker.compare(marker1, marker2) == :lt + + fabricated_marker = %ResourceMarker{ + timestamp: marker1.timestamp, + id: "00000000-0000-0000-0000-000000000000" + } + + assert ResourceMarker.compare(fabricated_marker, marker1) == :lt + assert ResourceMarker.compare(marker1, fabricated_marker) == :gt + end + end +end diff --git a/test/dpul_collections/indexing_pipeline_test.exs b/test/dpul_collections/indexing_pipeline_test.exs new file mode 100644 index 00000000..c670132a --- /dev/null +++ b/test/dpul_collections/indexing_pipeline_test.exs @@ -0,0 +1,68 @@ +defmodule DpulCollections.IndexingPipelineTest do + use DpulCollections.DataCase + + alias DpulCollections.IndexingPipeline + + describe "hydration_cache_entries" do + alias DpulCollections.IndexingPipeline.HydrationCacheEntry + + import DpulCollections.IndexingPipelineFixtures + + test "list_hydration_cache_entries/0 returns all hydration_cache_entries" do + hydration_cache_entry = hydration_cache_entry_fixture() + assert IndexingPipeline.list_hydration_cache_entries() == [hydration_cache_entry] + end + + test "get_hydration_cache_entry!/1 returns the hydration_cache_entry with given id" do + hydration_cache_entry = hydration_cache_entry_fixture() + + assert IndexingPipeline.get_hydration_cache_entry!(hydration_cache_entry.id) == + hydration_cache_entry + end + + test "delete_hydration_cache_entry/1 deletes the hydration_cache_entry" do + hydration_cache_entry = hydration_cache_entry_fixture() + + assert {:ok, %HydrationCacheEntry{}} = + IndexingPipeline.delete_hydration_cache_entry(hydration_cache_entry) + + assert_raise Ecto.NoResultsError, fn -> + IndexingPipeline.get_hydration_cache_entry!(hydration_cache_entry.id) + end + end + end + + describe "processor_markers" do + alias DpulCollections.IndexingPipeline.ProcessorMarker + + import DpulCollections.IndexingPipelineFixtures + + test "list_processor_markers/0 returns all processor_markers" do + processor_marker = processor_marker_fixture() + assert IndexingPipeline.list_processor_markers() == [processor_marker] + end + + test "get_processor_marker!/1 returns the processor_marker with given id" do + processor_marker = processor_marker_fixture() + assert IndexingPipeline.get_processor_marker!(processor_marker.id) == processor_marker + end + + test "delete_processor_marker/1 deletes the processor_marker" do + processor_marker = processor_marker_fixture() + + assert {:ok, %ProcessorMarker{}} = + IndexingPipeline.delete_processor_marker(processor_marker) + + assert_raise Ecto.NoResultsError, fn -> + IndexingPipeline.get_processor_marker!(processor_marker.id) + end + end + end + + describe "figgy database" do + test "get_figgy_resource!/1 returns a resource from the figgy db" do + ephemera_folder_id = "8b0631b7-e1e4-49c2-904f-cd3141167a80" + assert IndexingPipeline.get_figgy_resource!(ephemera_folder_id).id == ephemera_folder_id + end + end +end diff --git a/test/support/figgy_test_producer.ex b/test/support/figgy_test_producer.ex new file mode 100644 index 00000000..6fa605b1 --- /dev/null +++ b/test/support/figgy_test_producer.ex @@ -0,0 +1,69 @@ +defmodule FiggyTestProducer do + @moduledoc """ + A producer used for tests that allows you to control how many Figgy records + are provided to the FiggyHydrator via .process/1. + + FiggyHydrator demands from FiggyTestProducer, which never returns records + until asked by .process/1. When .process/1 is called, TestConsumer requests + records from FiggyProducer, and when it gets them it sends a message to + FiggyTestProducer, which then sends those records to FiggyHydrator. + """ + alias DpulCollections.IndexingPipeline.{FiggyHydrator, FiggyProducer} + use GenStage + + @impl GenStage + @type state :: %{consumer_pid: pid(), test_runner_pid: pid(), figgy_producer_pid: pid()} + @spec init({pid()}) :: {:producer, state()} + def init({test_runner_pid}) do + {:ok, figgy_producer_pid} = FiggyProducer.start_link() + {:ok, consumer_pid} = TestConsumer.start_link(figgy_producer_pid) + + {:producer, + %{ + consumer_pid: consumer_pid, + test_runner_pid: test_runner_pid, + figgy_producer_pid: figgy_producer_pid + }} + end + + @impl GenStage + @doc """ + Never return demand when requested - only when process/1 is run. + """ + def handle_demand(_demand, state) do + {:noreply, [], state} + end + + @impl GenStage + @doc """ + TestConsumer sends this message - when received, tell the test runner and return the messages to the Hydrator. + """ + @spec handle_info(TestConsumer.test_consumer_message(), state()) :: + {:noreply, [%Broadway.Message{}], state()} + def handle_info({:received, messages}, state) do + send(state.test_runner_pid, {:received, messages}) + {:noreply, messages, state} + end + + @impl GenStage + @doc """ + The TestProducer process receives this message from process/1 so that we can + pass the pid to TestConsumer. + """ + def handle_cast({:fulfill_messages, demand}, state) do + TestConsumer.request(state.consumer_pid, demand) + {:noreply, [], state} + end + + @doc """ + Request FiggyHydrator to process records. + """ + @spec process(Integer) :: :ok + def process(demand) do + # Get the PID for FiggyTestProducer GenServer, + # then cast fulfill message to itself + Broadway.producer_names(FiggyHydrator) + |> hd + |> GenServer.cast({:fulfill_messages, demand}) + end +end diff --git a/test/support/figgy_test_support.ex b/test/support/figgy_test_support.ex new file mode 100644 index 00000000..d01ea43c --- /dev/null +++ b/test/support/figgy_test_support.ex @@ -0,0 +1,23 @@ +defmodule FiggyTestSupport do + import Ecto.Query, warn: false + alias DpulCollections.IndexingPipeline.{ResourceMarker, FiggyResource} + alias DpulCollections.FiggyRepo + + # Get the last marker from the figgy repo. + def last_marker do + query = + from r in FiggyResource, + limit: 1, + order_by: [desc: r.updated_at, desc: r.id] + + FiggyRepo.all(query) |> hd |> ResourceMarker.from() + end + + def included_resource_count do + query = + from r in FiggyResource, + where: r.internal_resource == "EphemeraFolder" or r.internal_resource == "EphemeraTerm" + + FiggyRepo.aggregate(query, :count) + end +end diff --git a/test/support/fixtures/figgy_test_fixtures.ex b/test/support/fixtures/figgy_test_fixtures.ex new file mode 100644 index 00000000..8fab617b --- /dev/null +++ b/test/support/fixtures/figgy_test_fixtures.ex @@ -0,0 +1,26 @@ +defmodule FiggyTestFixtures do + alias DpulCollections.IndexingPipeline.ResourceMarker + + # @spec markers :: {ProcessorMarker.marker(), ProcessorMarker.marker(), ProcessorMarker.marker()} + # These are the first three known resource markers in the test database. + # They're here so that if they change, we don't have to change them in the + # whole test suite. + def markers do + marker1 = %ResourceMarker{ + timestamp: ~U[2018-03-09 20:19:33.414040Z], + id: "3cb7627b-defc-401b-9959-42ebc4488f74" + } + + marker2 = %ResourceMarker{ + timestamp: ~U[2018-03-09 20:19:34.465203Z], + id: "69990556-434c-476a-9043-bbf9a1bda5a4" + } + + marker3 = %ResourceMarker{ + timestamp: ~U[2018-03-09 20:19:34.486004Z], + id: "47276197-e223-471c-99d7-405c5f6c5285" + } + + {marker1, marker2, marker3} + end +end diff --git a/test/support/fixtures/indexing_pipeline_fixtures.ex b/test/support/fixtures/indexing_pipeline_fixtures.ex new file mode 100644 index 00000000..22325f3c --- /dev/null +++ b/test/support/fixtures/indexing_pipeline_fixtures.ex @@ -0,0 +1,40 @@ +defmodule DpulCollections.IndexingPipelineFixtures do + @moduledoc """ + This module defines test helpers for creating + entities via the `DpulCollections.IndexingPipeline` context. + """ + + @doc """ + Generate a hydration_cache_entry. + """ + def hydration_cache_entry_fixture(attrs \\ %{}) do + {:ok, hydration_cache_entry} = + attrs + |> Enum.into(%{ + cache_version: 42, + data: %{}, + record_id: "some record_id", + source_cache_order: ~U[2024-07-23 20:05:00Z] + }) + |> DpulCollections.IndexingPipeline.write_hydration_cache_entry() + + hydration_cache_entry + end + + @doc """ + Generate a processor_marker. + """ + def processor_marker_fixture(attrs \\ %{}) do + {:ok, processor_marker} = + attrs + |> Enum.into(%{ + cache_location: ~U[2024-07-23 20:40:00Z], + cache_version: 42, + type: "some type", + cache_record_id: "3cb7627b-defc-401b-9959-42ebc4488f74" + }) + |> DpulCollections.IndexingPipeline.write_processor_marker() + + processor_marker + end +end diff --git a/test/support/test_consumer.ex b/test/support/test_consumer.ex new file mode 100644 index 00000000..2e273c70 --- /dev/null +++ b/test/support/test_consumer.ex @@ -0,0 +1,41 @@ +defmodule TestConsumer do + @moduledoc """ + TestConsumer allows for manual consumption of messages from a Producer and + then notifies a receive_target of any messages it gets. We largely use this + for integration tests. + """ + use GenStage + + def start_link(producer) do + GenStage.start_link(__MODULE__, {producer, self()}) + end + + @impl GenStage + def init({producer_pid, receive_target_pid}) do + {:consumer, %{receive_target_pid: receive_target_pid, subscription: nil}, + subscribe_to: [producer_pid]} + end + + @impl GenStage + def handle_subscribe(:producer, _options, from, state) do + new_state = %{state | subscription: from} + {:manual, new_state} + end + + @type test_consumer_message :: {:received, [%Broadway.Message{}]} + @impl GenStage + def handle_events(events, _from, state) do + send(state.receive_target_pid, {:received, events}) + {:noreply, [], state} + end + + @impl GenStage + def handle_cast({:request, demand}, state) do + GenStage.ask(state.subscription, demand) + {:noreply, [], state} + end + + def request(consumer_pid, demand) do + GenServer.cast(consumer_pid, {:request, demand}) + end +end