Skip to content

Commit

Permalink
Merge pull request #54 from pulibrary/update-adr
Browse files Browse the repository at this point in the history
Update ADR language for caches instead of logs
  • Loading branch information
tpendragon authored Jul 24, 2024
2 parents 31cff91 + a628fdb commit 46894f5
Showing 1 changed file with 52 additions and 64 deletions.
116 changes: 52 additions & 64 deletions architecture-decisions/0002-indexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,37 @@ DPUL-Collections must have a resilient indexing pipeline that can quickly harves

We will initially pull data from Figgy, so the performance requirements in this document are based on the size of Figgy's database.

Often times systems like this use event streaming platforms such as Kafka, but we'd like to prevent adding new technology to our stack. We think we can use Postgres tables as a compact event log.
Often times systems like this use event streaming platforms such as Kafka, but we'd like to prevent adding new technology to our stack. Instead we'll use Postgres to cache our data.

Many of the ideas and concepts that led to this architecture were introduced to us in [Designing Data Intensive Applications](https://catalog.princeton.edu/catalog/99127097737806421).

## Decision

Our indexing pipeline will consist of three steps - Hydration, Transformation, and Indexing. Collectively we'll call these the Processors.

Each step has a performance requirement - the lower bound is the point at which we stop optimizing in the case of running that full process, the upper bound is the maximum we'll allow it to take before re-architecting.
Each step has a performance requirement - the lower bound is the point at which we stop optimizing in the case of running that full process, the upper bound is the maximum we'll allow it to take before re-architecting

For newly added records (not a full pipeline run of all records), we expect to see changes within five minutes of persistence in Figgy, as our stakeholders often do patron requests by "Completing" a record in Figgy and then sending a resource to a patron. They shouldn't have to wait more than 5 minutes to do that.

```mermaid
flowchart LR
A[Figgy Postgres] -->|Hydrate| B[Hydration Log]
B -->|Transform| C[Transformation Log]
A[Figgy Postgres] -->|Hydrate| B[Hydration Cache]
B -->|Transform| C[Transformation Cache]
C -->|Index| D[Solr Index]
```

### Hydration

The Hydrator will query Figgy's `orm_resources` table for newly updated records and copy them into a local postgres cache (the Hydration Log). This pattern will allow us to do the transformation and indexing steps no matter the uptime or performance characteristics of our source repository.
The Hydrator will query Figgy's `orm_resources` table for newly updated records and copy them into a local postgres cache (the Hydration Cache). This pattern will allow us to do the transformation and indexing steps no matter the uptime or performance characteristics of our source repository.

The Hydration Log has the following structure:
The Hydration Cache has the following structure:

| id | data | log_order | log_version | record_id | source_log_order |
|------|-------|-----------|-------------|-----------|------------------|
| INT | BLOB | DATETIME | INT | VARCHAR | DATETIME |
| id | data | cache_order | cache_version | record_id | source_cache_order |
|------|-------|-------------|---------------|-----------|--------------------|
| INT | BLOB | DATETIME | INT | VARCHAR | DATETIME |

We'll pull records as well as DeletionMarkers so we'll know and record when records have been deleted from Figgy.

If retries have been enqueued, the Hydrator will pull from the retry queue instad of from Figgy. For each resource ID in the retry queue, the Hydrator will duplicate the last row found for that resource in the Hydration Log, updating its log order to be the next number in the sequence.

#### Performance Requirements for Full Hydration

1 Hour - 2 Days
Expand All @@ -55,11 +53,11 @@ The faster we can do a full re-harvest, the faster we can pull in broad metadata

### Transformation

The Transformer will query the Hydration Log to fetch the records cached by the Hydration step, convert them to a Solr document, and store that solr document in a local postgres cache (the Transformation Log) with the following structure:
The Transformer will query the Hydration Cache to fetch the records cached by the Hydration step, convert them to a Solr document, and store that solr document in a local postgres cache (the Transformation Cache) with the following structure:

| id | data | log_order | log_version | record_id | error | source_log_order |
|------|-------|-----------|-------------|-----------|---------|------------------|
| INT | BLOB | DATETIME | INT | VARCHAR | TEXT | DATETIME |
| id | data | cache_order | cache_version | record_id | source_cache_order |
|------|-------|-------------|---------------|-----------|--------------------|
| INT | BLOB | DATETIME | INT | VARCHAR | DATETIME |

#### Performance Requirements for Full Transformation

Expand All @@ -71,7 +69,7 @@ We will need to do a re-transformation when we add new fields to the index, whic

### Indexing

The Indexer will query the Transformation Log to fetch the records cached by the Transformation step and index them into Solr as a batch.
The Indexer will query the Transformation Cache to fetch the records cached by the Transformation step and index them into Solr as a batch.

#### Performance Requirements for Full Indexing

Expand All @@ -85,89 +83,79 @@ We expect reindexing to need to happen often - either because of changing weight

```mermaid
sequenceDiagram title A full Indexing Pipeline workflow
Participant LogLocationTable
Participant ProcessorMarkers
Participant FiggyDB
Participant RetryQueue
Participant HydratorV1 as Hydrator(log_version: 1)
Participant HydrationLog
Participant TransformerV1 as Transformer(log_version: 1)
Participant TransformationLog
Participant IndexerV1 as Indexer(log_version: 1, solr_collection: dpul)
Participant HydratorV1 as Hydrator(cache_version: 1)
Participant HydrationCache
Participant TransformerV1 as Transformer(cache_version: 1)
Participant TransformationCache
Participant IndexerV1 as Indexer(cache_version: 1, solr_collection: dpul)
Participant SolrIndex
HydratorV1->>LogLocationTable: Set(type: hydrator, log_location: pre_figgy_timestamp, log_version: 1)
loop Populate the Log in Batches
HydratorV1->>RetryQueue: Get resource IDs from queue
HydratorV1->>HydratorV1: Duplicate retry resource rows into log
HydratorV1->>LogLocationTable: Get last log_location
HydratorV1->>FiggyDB: Get X (e.g. 500) records with update_at later than log_location
HydratorV1->>HydrationLog: Store the records with log version 1
HydratorV1->>LogLocationTable: Set(log_location: latest_updated_at_from_batch)
HydratorV1->>ProcessorMarkers: Set(type: hydrator, cache_location: pre_figgy_timestamp, cache_version: 1)
loop Populate the Cache in Batches
HydratorV1->>ProcessorMarkers: Get last cache_location
HydratorV1->>FiggyDB: Get X (e.g. 500) records with update_at later than cache_location
HydratorV1->>HydrationCache: Store the records with cache version 1
HydratorV1->>ProcessorMarkers: Set(cache_location: latest_updated_at_from_batch)
HydratorV1->>HydratorV1: Sleep for poll interval if recordset is empty
end
TransformerV1->>LogLocationTable: Set(type: transformer, log_location: 0, log_version: 1)
loop Populate the TransformationLog in Batches
TransformerV1->>LogLocationTable: Get last log_location
TransformerV1->>HydrationLog: Get X (e.g. 500) records with log_order higher than log_location
TransformerV1->>TransformationLog: Store the transformed records with log version 1
TransformerV1->>LogLocationTable: Set(log_location: highest_log_order from that batch)
TransformerV1->>ProcessorMarkers: Set(type: transformer, cache_location: 0, cache_version: 1)
loop Populate the TransformationCache in Batches
TransformerV1->>ProcessorMarkers: Get last cache_location
TransformerV1->>HydrationCache: Get X (e.g. 500) records with cache_order higher than cache_location
TransformerV1->>TransformationCache: Store the transformed records with cache version 1
TransformerV1->>ProcessorMarkers: Set(cache_location: highest_cache_order from that batch)
TransformerV1->>TransformerV1: Sleep for poll interval if recordset is empty
end
IndexerV1->>LogLocationTable: Set(type: indexer, log_location: 0, log_version: 1)
IndexerV1->>ProcessorMarkers: Set(type: indexer, cache_location: 0, cache_version: 1)
loop Populate the SolrIndex in Batches
IndexerV1->>LogLocationTable: Get last log_location
IndexerV1->>TransformationLog: Get X (e.g. 500) records with log_order higher than log_location
IndexerV1->>ProcessorMarkers: Get last cache_location
IndexerV1->>TransformationCache: Get X (e.g. 500) records with cache_order higher than cache_location
IndexerV1->>SolrIndex: Store the documents
IndexerV1->>LogLocationTable: Set(log_location: highest_log_order from that batch)
IndexerV1->>ProcessorMarkers: Set(cache_location: highest_cache_order from that batch)
IndexerV1->>IndexerV1: Sleep for poll interval if recordset is empty
end
```

## Commonalities between Processors

Each Processor will keep track of the last object they acted on in a LogLocationTable with the following structure:
Each Processor will keep track of the last object they acted on in a ProcessorMarkers table with the following structure:


| id | log_location | log_version | type |
|------|--------------|-------------|---------|
| INT | varchar | INT | VARCHAR |
| id | cache_location | cache_version | type |
|------|----------------|---------------|---------|
| INT | varchar | INT | VARCHAR |

- For Hydrator, `log_location` is an `updated_at` value from the Figgy database.
- For Transformer, `log_location` is a `log_order` value from the HydrationLog
- For Indexer, `log_location` is a `log_order` value from the TransformationLog
- For Hydrator, `cache_location` is an `updated_at` value from the Figgy database.
- For Transformer, `cache_location` is a `cache_order` value from the HydrationCache
- For Indexer, `cache_location` is a `cache_order` value from the TransformationCache

The value of `log_version` will be the same for each Processor within a given pipeline. It will be configured manually before a full pipeline run. It will be used to read the correct rows out of each log.
The value of `cache_version` will be the same for each Processor within a given pipeline. It will be configured manually before a full pipeline run. It will be used to read the correct rows out of each cache.

## Concurrent Logic

To support concurrency in these processes:
To support concurrency in these processes we will use the source_cache_order field as an optimistic lock.

- When writing to a log, the processor will do an upsert with a query like `INSERT INTO hydration_log(data, log_order, log_version, record_id, source_log_order) VALUES ('{}', NOW(), 1, '1da0340e-df0d-47cb-9567-4049e26141d9', <updated_at_from_figgy>) ON CONFLICT (record_id, log_version) DO UPDATE SET log_order = NOW(), data = '{}', source_log_order = <updated_at_from_figgy> WHERE source_log_order <= <updated_at_from_figgy>`
* If there's already a row for the above record_id and log_version, it will update it, meaning no compaction necessary. There's only ever one record per record_id in the log.
* If we're running an update for something from the log that's older than the current state, it won't update the row - a more recent event already ran. This means we can process the log in any order and at any level of parallelization.
* If we're re-running a process because we've reset the log_order in the LogLocationTable, then it will write new records because it allows updating rows where source_log_order is equal.
- When writing to a cache, the processor will do an upsert with a query like `INSERT INTO hydration_cache(data, cache_order, cache_version, record_id, source_cache_order) VALUES ('{}', NOW(), 1, '1da0340e-df0d-47cb-9567-4049e26141d9', <updated_at_from_figgy>) ON CONFLICT (record_id, cache_version) DO UPDATE SET cache_order = NOW(), data = '{}', source_cache_order = <updated_at_from_figgy> WHERE source_cache_order <= <updated_at_from_figgy>`
* If there's already a row for the above record_id and cache_version, it will update it, meaning that there's only ever one row per record_id in the cache.
* If we're running an update for something from the cache that's older than the current state, it won't update the row - a more recent event already ran. This means we can process the cache in any order and at any level of parallelization.
* If we're re-running a process because we've reset the cache_order in the ProcessorMarkers table, then it will write new records because it allows updating rows where source_cache_order is equal.

## Resilience and Error Handling
If postgres or Solr fails, we should let the Processors crash and restart indefinitely. When the service comes back up, they will resume their expected operation.

When a Transformation error occurs:

1. The Transformer does its best to create a Solr record, with incomplete data.
1. It gets logged by writing the error message in the `error` field and sending the notification to Honeybadger.
1. DLS can review errors via scripts and Honeybadger weekly review.
1. DLS fixes error(s).
1. DLS adds the record ID to the retry queue.
The system will have a way to automatically retry errors caused by system downtime, and log errors caused by bugs in our code.

## Consequences

The event logs will contain every deleted figgy resource.
The caches will contain every deleted figgy resource.

Keeping track of three different tables may be complicated. However, we expect to be able to scale this architecture out to allow for multiple harvest sources and transformation steps in the future.

Handling Transformer errors at first will require a lot of DLS intervention. We might change that in the future, but we want to get a handle on the kinds of errors that are happening and record the kinds of automatic interventions that might be useful to implement.

Two of the new tables (the Logs) could be very large, requiring more disk space - each containing every resource we're indexing into Solr. However, we think they're necessary to meet our performance and reliability goals.
Two of the new tables (the Caches) could be very large, requiring more disk space - each containing every resource we're indexing into Solr. However, we think they're necessary to meet our performance and reliability goals.

We're relying on Figgy having a single database we can harvest from. If Figgy's database architecture or schema change, we'll have to change our code.

0 comments on commit 46894f5

Please sign in to comment.