Skip to content

Commit

Permalink
Merge pull request #245 from pulibrary/104-bugs
Browse files Browse the repository at this point in the history
Only get the next set of entries for the relevant cache version
  • Loading branch information
tpendragon authored Nov 18, 2024
2 parents 117cb2b + 7bc0865 commit eb0e913
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 21 deletions.
34 changes: 22 additions & 12 deletions lib/dpul_collections/indexing_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,20 @@ defmodule DpulCollections.IndexingPipeline do

@spec get_hydration_cache_entries_since!(
marker :: CacheEntryMarker.t(),
count :: integer
count :: integer,
cache_version :: integer
) :: list(Figgy.HydrationCacheEntry)
def get_hydration_cache_entries_since!(
%CacheEntryMarker{timestamp: cache_order, id: id},
count
count,
cache_version
) do
query =
from r in Figgy.HydrationCacheEntry,
where:
(r.cache_order == ^cache_order and r.record_id > ^id) or
r.cache_order > ^cache_order,
r.cache_version == ^cache_version and
((r.cache_order == ^cache_order and r.record_id > ^id) or
r.cache_order > ^cache_order),
limit: ^count,
order_by: [asc: r.cache_order, asc: r.record_id]

Expand All @@ -103,11 +106,13 @@ defmodule DpulCollections.IndexingPipeline do

@spec get_hydration_cache_entries_since!(
nil,
count :: integer
count :: integer,
cache_version :: integer
) :: list(Figgy.HydrationCacheEntry)
def get_hydration_cache_entries_since!(nil, count) do
def get_hydration_cache_entries_since!(nil, count, cache_version) do
query =
from r in Figgy.HydrationCacheEntry,
where: r.cache_version == ^cache_version,
limit: ^count,
order_by: [asc: r.source_cache_order, asc: r.record_id]

Expand Down Expand Up @@ -310,17 +315,20 @@ defmodule DpulCollections.IndexingPipeline do

@spec get_transformation_cache_entries_since!(
marker :: CacheEntryMarker.t(),
count :: integer
count :: integer,
cache_version :: integer
) :: list(Figgy.TransformationCacheEntry)
def get_transformation_cache_entries_since!(
%CacheEntryMarker{timestamp: cache_order, id: id},
count
count,
cache_version
) do
query =
from r in Figgy.TransformationCacheEntry,
where:
(r.cache_order == ^cache_order and r.record_id > ^id) or
r.cache_order > ^cache_order,
r.cache_version == ^cache_version and
((r.cache_order == ^cache_order and r.record_id > ^id) or
r.cache_order > ^cache_order),
limit: ^count,
order_by: [asc: r.cache_order, asc: r.record_id]

Expand All @@ -329,11 +337,13 @@ defmodule DpulCollections.IndexingPipeline do

@spec get_transformation_cache_entries_since!(
nil,
count :: integer
count :: integer,
cache_version :: integer
) :: list(Figgy.TransformationCacheEntry)
def get_transformation_cache_entries_since!(nil, count) do
def get_transformation_cache_entries_since!(nil, count, cache_version) do
query =
from r in Figgy.TransformationCacheEntry,
where: r.cache_version == ^cache_version,
limit: ^count,
order_by: [asc: r.cache_order, asc: r.record_id]

Expand Down
3 changes: 2 additions & 1 deletion lib/dpul_collections/indexing_pipeline/database_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do
last_queried_marker: last_queried_marker,
pulled_records: pulled_records,
acked_records: acked_records,
cache_version: cache_version,
stored_demand: stored_demand,
source_module: source_module
}
) do
total_demand = stored_demand + demand

records =
source_module.get_cache_entries_since!(last_queried_marker, total_demand)
source_module.get_cache_entries_since!(last_queried_marker, total_demand, cache_version)

new_state =
state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer.Source do
"""
@callback get_cache_entries_since!(
last_queried_marker :: CacheEntryMarker.t(),
total_demand :: integer
total_demand :: integer,
cache_version :: integer
) :: list(struct)
end
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource do
"figgy_hydrator"
end

def get_cache_entries_since!(last_queried_marker, total_demand) do
def get_cache_entries_since!(last_queried_marker, total_demand, _cache_version) do
IndexingPipeline.get_figgy_resources_since!(last_queried_marker, total_demand)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingProducerSource do
"figgy_indexer"
end

def get_cache_entries_since!(last_queried_marker, total_demand) do
IndexingPipeline.get_transformation_cache_entries_since!(last_queried_marker, total_demand)
def get_cache_entries_since!(last_queried_marker, total_demand, cache_version) do
IndexingPipeline.get_transformation_cache_entries_since!(
last_queried_marker,
total_demand,
cache_version
)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationProducerSource do
"figgy_transformer"
end

def get_cache_entries_since!(last_queried_marker, total_demand) do
IndexingPipeline.get_hydration_cache_entries_since!(last_queried_marker, total_demand)
def get_cache_entries_since!(last_queried_marker, total_demand, cache_version) do
IndexingPipeline.get_hydration_cache_entries_since!(
last_queried_marker,
total_demand,
cache_version
)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.IndexingIntegrationTest do
end

test "when cache version > 0, processor marker cache version is correct" do
FiggyTestFixtures.transformation_cache_markers()
FiggyTestFixtures.transformation_cache_markers(1)

cache_version = 1
indexer = start_indexing_producer(cache_version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.TransformationIntegrationTest d
end

test "transformation cache entry creation with cache version > 0" do
{marker1, _marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers()
{marker1, _marker2, _marker3} = FiggyTestFixtures.hydration_cache_markers(1)

cache_version = 1
transformer = start_transformation_producer(cache_version)
Expand Down

0 comments on commit eb0e913

Please sign in to comment.