Skip to content

Commit

Permalink
Merge pull request #59 from pulibrary/43-hydration
Browse files Browse the repository at this point in the history
43 hydration
  • Loading branch information
eliotjordan authored Aug 29, 2024
2 parents ae9690b + 856b320 commit aef00bc
Show file tree
Hide file tree
Showing 30 changed files with 1,618 additions and 4 deletions.
7 changes: 6 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ description: Common jobs for testing and building phoenix applications
executors:
builder:
docker:
- image: cimg/elixir:1.16.2-erlang-26.2.1-browsers
- image: cimg/elixir:1.17.2-erlang-26.2.1-browsers
environment:
TEST_POSTGRES_PORT: 5432
TEST_POSTGRES_FIGGY_HOST: figgy_database
- image: cimg/postgres:15.2
- image: ghcr.io/pulibrary/dpul-collections:figgy-fixtures
name: figgy_database
environment:
POSTGRES_PASSWORD: "postgres"
working_directory: ~/project

commands:
Expand Down
2 changes: 1 addition & 1 deletion architecture-decisions/0002-indexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ Each Processor will keep track of the last object they acted on in a ProcessorMa

| id | cache_location | cache_version | type |
|------|----------------|---------------|---------|
| INT | varchar | INT | VARCHAR |
| INT | DATETIME | INT | VARCHAR |

- For Hydrator, `cache_location` is an `updated_at` value from the Figgy database.
- For Transformer, `cache_location` is a `cache_order` value from the HydrationCache
Expand Down
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import Config

config :dpul_collections,
ecto_repos: [DpulCollections.Repo],
ecto_repos: [DpulCollections.Repo, DpulCollections.FiggyRepo],
generators: [timestamp_type: :utc_datetime]

# Configures the endpoint
Expand Down
17 changes: 17 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@ config :dpul_collections, DpulCollections.Repo,
show_sensitive_data_on_connection_error: true,
pool_size: 10

# Configure your other database
config :dpul_collections, DpulCollections.FiggyRepo,
username: "postgres",
password: "postgres",
hostname: "localhost",
port: "5435",
database: "postgres",
stacktrace: true,
show_sensitive_data_on_connection_error: true,
pool_size: 10

# Setup test to use a Stand-In Producer
config :dpul_collections,
producer_module: DpulCollections.IndexingPipeline.FiggyProducer,
# change if required for your dev/prod producer
producer_options: []

# For development, we disable any cache and enable
# debugging and code reloading.
#
Expand Down
12 changes: 12 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import Config
# The MIX_TEST_PARTITION environment variable can be used
# to provide built-in test partitioning in CI environment.
# Run `mix help test` for more information.

config :dpul_collections, DpulCollections.Repo,
username: "postgres",
password: "postgres",
Expand All @@ -14,6 +15,17 @@ config :dpul_collections, DpulCollections.Repo,
pool: Ecto.Adapters.SQL.Sandbox,
pool_size: System.schedulers_online() * 2

# Configure your other database
config :dpul_collections, DpulCollections.FiggyRepo,
username: "postgres",
password: "postgres",
hostname: System.get_env("TEST_POSTGRES_FIGGY_HOST") || "localhost",
port: System.get_env("TEST_POSTGRES_PORT") || 5435,
database: "postgres",
stacktrace: true,
show_sensitive_data_on_connection_error: true,
pool_size: 10

# We don't run a server during test. If one is required,
# you can enable the server option below.
config :dpul_collections, DpulCollectionsWeb.Endpoint,
Expand Down
67 changes: 67 additions & 0 deletions docs/indexing_pipeline/producer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
When Producer spins up, initialize last_queried_marker from the ProcessorMarkers table, or set it to 1900.

```mermaid
sequenceDiagram title Figgy Producer
Participant FiggyDatabase
Participant Producer
Participant Acknowledger
Participant Consumer
Participant Batcher as Batcher (batch size of 2)
Participant HydrationCache
Consumer->>Producer: Demand 3 records
Producer->>FiggyDatabase: Query records since last_queried_marker (1900)
FiggyDatabase->>Producer: Return 3 records
Producer->>Producer: Set last_queried_marker to last updated_at
Producer->>Producer: Add all 3 {record_id, updated_at} to pulled_records
Producer->>Consumer: Deliver record[1,2,3]
Consumer->>Consumer: Process record[1]
Consumer->>Batcher: Deliver record[1]
Consumer->>Consumer: Process record[3]
Consumer->>Batcher: Deliver record[3]
Batcher->>HydrationCache: Writes record[1,3]
Batcher->>Acknowledger: Acknowledge record[1,3]
Acknowledger->>Acknowledger: Do error handling
Acknowledger->>Producer: Acknowledge record[1,3]
Producer->>Producer: Update state: Append record[1,3] to acked_records
Producer->>Producer: Update state: Sort acked_records by updated_at
Producer->>Producer: Run Acknowledging Records algorithm
Consumer->>Consumer: Process record[2]
Consumer->>Batcher: Deliver record[2]
Batcher->>Batcher: 1 minute passes
Batcher->>HydrationCache: Writes record[2]
Batcher->>Acknowledger: Acknowledge record[2]
Acknowledger->>Acknowledger: Do error handling
Acknowledger->>Producer: Acknowledge record[2]
Producer->>Producer: Update state: Append record[2] to acked_records
Producer->>Producer: Update state: Sort acked_records by updated_at
Producer->>Producer: Run Acknowledging Records algorithm
Consumer->>Producer: Demand 3 records
Producer->>FiggyDatabase: Query records since last_queried_marker
```

## Managing Producer State

### Acknowledging Records

When receiving acknowledgement for [1,3]:

Start state: `{last_queried_marker: record[3].updated_at, pulled_records: [1,2,3], acked_records: [1,3]}`

If the first element is the same in pulled_records and acked_records, then remove that element from both. Repeat until there's no match. Then write the timestamp from the last element that got removed from pulled_records.

If the first marker in acked_records is less than the first marker in pulled_records, discard it. This means we must have acked that record already, and we don't need to ack it again. This could happen in certain producer crash scenarios.

The processor will block during this acknowledgement, so you don't have to worry about race conditions here.

End State: `{last_queried_marker: record[3].updated_at, pulled_records: [2,3], acked_records: [3]}`

Write `1.updated_at` to `ProcessorMarkers`

When receiving Acknowledgement for [2]:

Start State: `{last_queried_marker: record[3].updated_at, pulled_records: [2,3], acked_records: [2,3]}`

End State: `{last_queried_marker: record[3].updated_at, pulled_records: [], acked_records: []}`

Write `3.updated_at` to `ProcessorMarkers`
5 changes: 5 additions & 0 deletions lib/dpul_collections/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ defmodule DpulCollections.Application do
children = [
DpulCollectionsWeb.Telemetry,
DpulCollections.Repo,
DpulCollections.FiggyRepo,
# Controllable Hydrator for testing in dev.
# {DpulCollections.IndexingPipeline.FiggyHydrator, producer_module: FiggyTestProducer, producer_options: {self()}},
# Production Hydrator
# DpulCollections.IndexingPipeline.FiggyHydrator,
{DNSCluster, query: Application.get_env(:dpul_collections, :dns_cluster_query) || :ignore},
{Phoenix.PubSub, name: DpulCollections.PubSub},
# Start the Finch HTTP client for sending emails
Expand Down
6 changes: 6 additions & 0 deletions lib/dpul_collections/figgy_repo.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule DpulCollections.FiggyRepo do
use Ecto.Repo,
otp_app: :dpul_collections,
adapter: Ecto.Adapters.Postgres,
read_only: true
end
Loading

0 comments on commit aef00bc

Please sign in to comment.