From 7bc0865819a2e5513e044aef2f473859b47a9002 Mon Sep 17 00:00:00 2001 From: Anna Headley <845363+hackartisan@users.noreply.github.com> Date: Mon, 18 Nov 2024 12:29:28 -0500 Subject: [PATCH] Only get the next set of entries for the relevant cache version refs #104 --- lib/dpul_collections/indexing_pipeline.ex | 34 ++++++++++++------- .../indexing_pipeline/database_producer.ex | 3 +- .../database_producer/source.ex | 3 +- .../figgy/hydration_producer_source.ex | 2 +- .../figgy/indexing_producer_source.ex | 8 +++-- .../figgy/transformation_producer_source.ex | 8 +++-- .../figgy/indexing_integration_test.exs | 2 +- .../figgy/transformation_integration_test.exs | 2 +- 8 files changed, 41 insertions(+), 21 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline.ex b/lib/dpul_collections/indexing_pipeline.ex index 96666906..6bdc0f82 100644 --- a/lib/dpul_collections/indexing_pipeline.ex +++ b/lib/dpul_collections/indexing_pipeline.ex @@ -84,17 +84,20 @@ defmodule DpulCollections.IndexingPipeline do @spec get_hydration_cache_entries_since!( marker :: CacheEntryMarker.t(), - count :: integer + count :: integer, + cache_version :: integer ) :: list(Figgy.HydrationCacheEntry) def get_hydration_cache_entries_since!( %CacheEntryMarker{timestamp: cache_order, id: id}, - count + count, + cache_version ) do query = from r in Figgy.HydrationCacheEntry, where: - (r.cache_order == ^cache_order and r.record_id > ^id) or - r.cache_order > ^cache_order, + r.cache_version == ^cache_version and + ((r.cache_order == ^cache_order and r.record_id > ^id) or + r.cache_order > ^cache_order), limit: ^count, order_by: [asc: r.cache_order, asc: r.record_id] @@ -103,11 +106,13 @@ defmodule DpulCollections.IndexingPipeline do @spec get_hydration_cache_entries_since!( nil, - count :: integer + count :: integer, + cache_version :: integer ) :: list(Figgy.HydrationCacheEntry) - def get_hydration_cache_entries_since!(nil, count) do + def get_hydration_cache_entries_since!(nil, count, cache_version) do query = from r in Figgy.HydrationCacheEntry, + where: r.cache_version == ^cache_version, limit: ^count, order_by: [asc: r.source_cache_order, asc: r.record_id] @@ -310,17 +315,20 @@ defmodule DpulCollections.IndexingPipeline do @spec get_transformation_cache_entries_since!( marker :: CacheEntryMarker.t(), - count :: integer + count :: integer, + cache_version :: integer ) :: list(Figgy.TransformationCacheEntry) def get_transformation_cache_entries_since!( %CacheEntryMarker{timestamp: cache_order, id: id}, - count + count, + cache_version ) do query = from r in Figgy.TransformationCacheEntry, where: - (r.cache_order == ^cache_order and r.record_id > ^id) or - r.cache_order > ^cache_order, + r.cache_version == ^cache_version and + ((r.cache_order == ^cache_order and r.record_id > ^id) or + r.cache_order > ^cache_order), limit: ^count, order_by: [asc: r.cache_order, asc: r.record_id] @@ -329,11 +337,13 @@ defmodule DpulCollections.IndexingPipeline do @spec get_transformation_cache_entries_since!( nil, - count :: integer + count :: integer, + cache_version :: integer ) :: list(Figgy.TransformationCacheEntry) - def get_transformation_cache_entries_since!(nil, count) do + def get_transformation_cache_entries_since!(nil, count, cache_version) do query = from r in Figgy.TransformationCacheEntry, + where: r.cache_version == ^cache_version, limit: ^count, order_by: [asc: r.cache_order, asc: r.record_id] diff --git a/lib/dpul_collections/indexing_pipeline/database_producer.ex b/lib/dpul_collections/indexing_pipeline/database_producer.ex index cf5011d3..ba9d13f2 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer.ex @@ -54,6 +54,7 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do last_queried_marker: last_queried_marker, pulled_records: pulled_records, acked_records: acked_records, + cache_version: cache_version, stored_demand: stored_demand, source_module: source_module } @@ -61,7 +62,7 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do total_demand = stored_demand + demand records = - source_module.get_cache_entries_since!(last_queried_marker, total_demand) + source_module.get_cache_entries_since!(last_queried_marker, total_demand, cache_version) new_state = state diff --git a/lib/dpul_collections/indexing_pipeline/database_producer/source.ex b/lib/dpul_collections/indexing_pipeline/database_producer/source.ex index a7f21dce..523ed9cb 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer/source.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer/source.ex @@ -14,6 +14,7 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer.Source do """ @callback get_cache_entries_since!( last_queried_marker :: CacheEntryMarker.t(), - total_demand :: integer + total_demand :: integer, + cache_version :: integer ) :: list(struct) end diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer_source.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer_source.ex index 8aa3a1de..902c5ac1 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer_source.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer_source.ex @@ -5,7 +5,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource do "figgy_hydrator" end - def get_cache_entries_since!(last_queried_marker, total_demand) do + def get_cache_entries_since!(last_queried_marker, total_demand, _cache_version) do IndexingPipeline.get_figgy_resources_since!(last_queried_marker, total_demand) end end diff --git a/lib/dpul_collections/indexing_pipeline/figgy/indexing_producer_source.ex b/lib/dpul_collections/indexing_pipeline/figgy/indexing_producer_source.ex index eca513bf..bb97a65a 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/indexing_producer_source.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/indexing_producer_source.ex @@ -6,7 +6,11 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerSource do "figgy_indexer" end - def get_cache_entries_since!(last_queried_marker, total_demand) do - IndexingPipeline.get_transformation_cache_entries_since!(last_queried_marker, total_demand) + def get_cache_entries_since!(last_queried_marker, total_demand, cache_version) do + IndexingPipeline.get_transformation_cache_entries_since!( + last_queried_marker, + total_demand, + cache_version + ) end end diff --git a/lib/dpul_collections/indexing_pipeline/figgy/transformation_producer_source.ex b/lib/dpul_collections/indexing_pipeline/figgy/transformation_producer_source.ex index d971db7d..cc9d1539 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/transformation_producer_source.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/transformation_producer_source.ex @@ -6,7 +6,11 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducerSource do "figgy_transformer" end - def get_cache_entries_since!(last_queried_marker, total_demand) do - IndexingPipeline.get_hydration_cache_entries_since!(last_queried_marker, total_demand) + def get_cache_entries_since!(last_queried_marker, total_demand, cache_version) do + IndexingPipeline.get_hydration_cache_entries_since!( + last_queried_marker, + total_demand, + cache_version + ) end end diff --git a/test/dpul_collections/indexing_pipeline/integration/figgy/indexing_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/figgy/indexing_integration_test.exs index 20337801..1875cd3b 100644 --- a/test/dpul_collections/indexing_pipeline/integration/figgy/indexing_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/figgy/indexing_integration_test.exs @@ -49,7 +49,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingIntegrationTest do end test "when cache version > 0, processor marker cache version is correct" do - FiggyTestFixtures.transformation_cache_markers() + FiggyTestFixtures.transformation_cache_markers(1) cache_version = 1 indexer = start_indexing_producer(cache_version) diff --git a/test/dpul_collections/indexing_pipeline/integration/figgy/transformation_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/figgy/transformation_integration_test.exs index 3f0d3050..ba133b9c 100644 --- a/test/dpul_collections/indexing_pipeline/integration/figgy/transformation_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/figgy/transformation_integration_test.exs @@ -48,7 +48,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationIntegrationTest d end test "transformation cache entry creation with cache version > 0" do - {marker1, _marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers() + {marker1, _marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers(1) cache_version = 1 transformer = start_transformation_producer(cache_version)