From 1112ffa1c07197595736ec2b59240610f7da905d Mon Sep 17 00:00:00 2001 From: Eliot Jordan Date: Wed, 24 Jul 2024 11:02:59 -0500 Subject: [PATCH 1/2] Update ADR language for caches instead of logs --- architecture-decisions/0002-indexing.md | 106 ++++++++++++------------ 1 file changed, 53 insertions(+), 53 deletions(-) diff --git a/architecture-decisions/0002-indexing.md b/architecture-decisions/0002-indexing.md index 235548b0..a3802f37 100644 --- a/architecture-decisions/0002-indexing.md +++ b/architecture-decisions/0002-indexing.md @@ -12,7 +12,7 @@ DPUL-Collections must have a resilient indexing pipeline that can quickly harves We will initially pull data from Figgy, so the performance requirements in this document are based on the size of Figgy's database. -Often times systems like this use event streaming platforms such as Kafka, but we'd like to prevent adding new technology to our stack. We think we can use Postgres tables as a compact event log. +Often times systems like this use event streaming platforms such as Kafka, but we'd like to prevent adding new technology to our stack. Instead we'll use Postgres to cache our data. Many of the ideas and concepts that led to this architecture were introduced to us in [Designing Data Intensive Applications](https://catalog.princeton.edu/catalog/99127097737806421). @@ -20,30 +20,30 @@ Many of the ideas and concepts that led to this architecture were introduced to Our indexing pipeline will consist of three steps - Hydration, Transformation, and Indexing. Collectively we'll call these the Processors. -Each step has a performance requirement - the lower bound is the point at which we stop optimizing in the case of running that full process, the upper bound is the maximum we'll allow it to take before re-architecting. +Each step has a performance requirement - the lower bound is the point at which we stop optimizing in the case of running that full process, the upper bound is the maximum we'll allow it to take before re-architecting For newly added records (not a full pipeline run of all records), we expect to see changes within five minutes of persistence in Figgy, as our stakeholders often do patron requests by "Completing" a record in Figgy and then sending a resource to a patron. They shouldn't have to wait more than 5 minutes to do that. ```mermaid flowchart LR - A[Figgy Postgres] -->|Hydrate| B[Hydration Log] - B -->|Transform| C[Transformation Log] + A[Figgy Postgres] -->|Hydrate| B[Hydration Cache] + B -->|Transform| C[Transformation Cache] C -->|Index| D[Solr Index] ``` ### Hydration -The Hydrator will query Figgy's `orm_resources` table for newly updated records and copy them into a local postgres cache (the Hydration Log). This pattern will allow us to do the transformation and indexing steps no matter the uptime or performance characteristics of our source repository. +The Hydrator will query Figgy's `orm_resources` table for newly updated records and copy them into a local postgres cache (the Hydration Cache). This pattern will allow us to do the transformation and indexing steps no matter the uptime or performance characteristics of our source repository. -The Hydration Log has the following structure: +The Hydration Cache has the following structure: -| id | data | log_order | log_version | record_id | source_log_order | -|------|-------|-----------|-------------|-----------|------------------| -| INT | BLOB | DATETIME | INT | VARCHAR | DATETIME | +| id | data | cache_order | cache_version | record_id | source_cache_order | +|------|-------|-------------|---------------|-----------|--------------------| +| INT | BLOB | DATETIME | INT | VARCHAR | DATETIME | We'll pull records as well as DeletionMarkers so we'll know and record when records have been deleted from Figgy. -If retries have been enqueued, the Hydrator will pull from the retry queue instad of from Figgy. For each resource ID in the retry queue, the Hydrator will duplicate the last row found for that resource in the Hydration Log, updating its log order to be the next number in the sequence. +If retries have been enqueued, the Hydrator will pull from the retry queue instad of from Figgy. For each resource ID in the retry queue, the Hydrator will duplicate the last row found for that resource in the Hydration Cache, updating its cache order to be the next number in the sequence. #### Performance Requirements for Full Hydration @@ -55,11 +55,11 @@ The faster we can do a full re-harvest, the faster we can pull in broad metadata ### Transformation -The Transformer will query the Hydration Log to fetch the records cached by the Hydration step, convert them to a Solr document, and store that solr document in a local postgres cache (the Transformation Log) with the following structure: +The Transformer will query the Hydration Cache to fetch the records cached by the Hydration step, convert them to a Solr document, and store that solr document in a local postgres cache (the Transformation Cache) with the following structure: -| id | data | log_order | log_version | record_id | error | source_log_order | -|------|-------|-----------|-------------|-----------|---------|------------------| -| INT | BLOB | DATETIME | INT | VARCHAR | TEXT | DATETIME | +| id | data | cache_order | cache_version | record_id | error | source_cache_order | +|------|-------|-------------|---------------|-----------|---------|--------------------| +| INT | BLOB | DATETIME | INT | VARCHAR | TEXT | DATETIME | #### Performance Requirements for Full Transformation @@ -71,7 +71,7 @@ We will need to do a re-transformation when we add new fields to the index, whic ### Indexing -The Indexer will query the Transformation Log to fetch the records cached by the Transformation step and index them into Solr as a batch. +The Indexer will query the Transformation Cache to fetch the records cached by the Transformation step and index them into Solr as a batch. #### Performance Requirements for Full Indexing @@ -85,69 +85,69 @@ We expect reindexing to need to happen often - either because of changing weight ```mermaid sequenceDiagram title A full Indexing Pipeline workflow -Participant LogLocationTable +Participant ProcessorMarkers Participant FiggyDB Participant RetryQueue -Participant HydratorV1 as Hydrator(log_version: 1) -Participant HydrationLog -Participant TransformerV1 as Transformer(log_version: 1) -Participant TransformationLog -Participant IndexerV1 as Indexer(log_version: 1, solr_collection: dpul) +Participant HydratorV1 as Hydrator(cache_version: 1) +Participant HydrationCache +Participant TransformerV1 as Transformer(cache_version: 1) +Participant TransformationCache +Participant IndexerV1 as Indexer(cache_version: 1, solr_collection: dpul) Participant SolrIndex -HydratorV1->>LogLocationTable: Set(type: hydrator, log_location: pre_figgy_timestamp, log_version: 1) -loop Populate the Log in Batches +HydratorV1->>ProcessorMarkers: Set(type: hydrator, cache_location: pre_figgy_timestamp, cache_version: 1) +loop Populate the Cache in Batches HydratorV1->>RetryQueue: Get resource IDs from queue -HydratorV1->>HydratorV1: Duplicate retry resource rows into log -HydratorV1->>LogLocationTable: Get last log_location -HydratorV1->>FiggyDB: Get X (e.g. 500) records with update_at later than log_location -HydratorV1->>HydrationLog: Store the records with log version 1 -HydratorV1->>LogLocationTable: Set(log_location: latest_updated_at_from_batch) +HydratorV1->>HydratorV1: Duplicate retry resource rows into cache +HydratorV1->>ProcessorMarkers: Get last cache_location +HydratorV1->>FiggyDB: Get X (e.g. 500) records with update_at later than cache_location +HydratorV1->>HydrationCache: Store the records with cache version 1 +HydratorV1->>ProcessorMarkers: Set(cache_location: latest_updated_at_from_batch) HydratorV1->>HydratorV1: Sleep for poll interval if recordset is empty end -TransformerV1->>LogLocationTable: Set(type: transformer, log_location: 0, log_version: 1) -loop Populate the TransformationLog in Batches -TransformerV1->>LogLocationTable: Get last log_location -TransformerV1->>HydrationLog: Get X (e.g. 500) records with log_order higher than log_location -TransformerV1->>TransformationLog: Store the transformed records with log version 1 -TransformerV1->>LogLocationTable: Set(log_location: highest_log_order from that batch) +TransformerV1->>ProcessorMarkers: Set(type: transformer, cache_location: 0, cache_version: 1) +loop Populate the TransformationCache in Batches +TransformerV1->>ProcessorMarkers: Get last cache_location +TransformerV1->>HydrationCache: Get X (e.g. 500) records with cache_order higher than cache_location +TransformerV1->>TransformationCache: Store the transformed records with cache version 1 +TransformerV1->>ProcessorMarkers: Set(cache_location: highest_cache_order from that batch) TransformerV1->>TransformerV1: Sleep for poll interval if recordset is empty end -IndexerV1->>LogLocationTable: Set(type: indexer, log_location: 0, log_version: 1) +IndexerV1->>ProcessorMarkers: Set(type: indexer, cache_location: 0, cache_version: 1) loop Populate the SolrIndex in Batches -IndexerV1->>LogLocationTable: Get last log_location -IndexerV1->>TransformationLog: Get X (e.g. 500) records with log_order higher than log_location +IndexerV1->>ProcessorMarkers: Get last cache_location +IndexerV1->>TransformationCache: Get X (e.g. 500) records with cache_order higher than cache_location IndexerV1->>SolrIndex: Store the documents -IndexerV1->>LogLocationTable: Set(log_location: highest_log_order from that batch) +IndexerV1->>ProcessorMarkers: Set(cache_location: highest_cache_order from that batch) IndexerV1->>IndexerV1: Sleep for poll interval if recordset is empty end ``` ## Commonalities between Processors -Each Processor will keep track of the last object they acted on in a LogLocationTable with the following structure: +Each Processor will keep track of the last object they acted on in a ProcessorMarkers table with the following structure: -| id | log_location | log_version | type | -|------|--------------|-------------|---------| -| INT | varchar | INT | VARCHAR | +| id | cache_location | cache_version | type | +|------|----------------|---------------|---------| +| INT | varchar | INT | VARCHAR | -- For Hydrator, `log_location` is an `updated_at` value from the Figgy database. -- For Transformer, `log_location` is a `log_order` value from the HydrationLog -- For Indexer, `log_location` is a `log_order` value from the TransformationLog +- For Hydrator, `cache_location` is an `updated_at` value from the Figgy database. +- For Transformer, `cache_location` is a `cache_order` value from the HydrationCache +- For Indexer, `cache_location` is a `cache_order` value from the TransformationCache -The value of `log_version` will be the same for each Processor within a given pipeline. It will be configured manually before a full pipeline run. It will be used to read the correct rows out of each log. +The value of `cache_version` will be the same for each Processor within a given pipeline. It will be configured manually before a full pipeline run. It will be used to read the correct rows out of each cache. ## Concurrent Logic -To support concurrency in these processes: +To support concurrency in these processes we will use the source_cache_order field as an optimistic lock. -- When writing to a log, the processor will do an upsert with a query like `INSERT INTO hydration_log(data, log_order, log_version, record_id, source_log_order) VALUES ('{}', NOW(), 1, '1da0340e-df0d-47cb-9567-4049e26141d9', ) ON CONFLICT (record_id, log_version) DO UPDATE SET log_order = NOW(), data = '{}', source_log_order = WHERE source_log_order <= ` - * If there's already a row for the above record_id and log_version, it will update it, meaning no compaction necessary. There's only ever one record per record_id in the log. - * If we're running an update for something from the log that's older than the current state, it won't update the row - a more recent event already ran. This means we can process the log in any order and at any level of parallelization. - * If we're re-running a process because we've reset the log_order in the LogLocationTable, then it will write new records because it allows updating rows where source_log_order is equal. +- When writing to a cache, the processor will do an upsert with a query like `INSERT INTO hydration_cache(data, cache_order, cache_version, record_id, source_cache_order) VALUES ('{}', NOW(), 1, '1da0340e-df0d-47cb-9567-4049e26141d9', ) ON CONFLICT (record_id, cache_version) DO UPDATE SET cache_order = NOW(), data = '{}', source_cache_order = WHERE source_cache_order <= ` + * If there's already a row for the above record_id and cache_version, it will update it, meaning that there's only ever one row per record_id in the cache. + * If we're running an update for something from the cache that's older than the current state, it won't update the row - a more recent event already ran. This means we can process the cache in any order and at any level of parallelization. + * If we're re-running a process because we've reset the cache_order in the ProcessorMarkers table, then it will write new records because it allows updating rows where source_cache_order is equal. ## Resilience and Error Handling If postgres or Solr fails, we should let the Processors crash and restart indefinitely. When the service comes back up, they will resume their expected operation. @@ -162,12 +162,12 @@ When a Transformation error occurs: ## Consequences -The event logs will contain every deleted figgy resource. +The caches will contain every deleted figgy resource. Keeping track of three different tables may be complicated. However, we expect to be able to scale this architecture out to allow for multiple harvest sources and transformation steps in the future. Handling Transformer errors at first will require a lot of DLS intervention. We might change that in the future, but we want to get a handle on the kinds of errors that are happening and record the kinds of automatic interventions that might be useful to implement. -Two of the new tables (the Logs) could be very large, requiring more disk space - each containing every resource we're indexing into Solr. However, we think they're necessary to meet our performance and reliability goals. +Two of the new tables (the Caches) could be very large, requiring more disk space - each containing every resource we're indexing into Solr. However, we think they're necessary to meet our performance and reliability goals. We're relying on Figgy having a single database we can harvest from. If Figgy's database architecture or schema change, we'll have to change our code. From a628fdbffab1d99862de9bda04a7645218ea2a0a Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 24 Jul 2024 09:19:39 -0700 Subject: [PATCH 2/2] Delay the specifics of how we handle errors. Co-authored-by: Anna Headley Co-authored-by: Amin Zare Co-authored-by: Eliot Jordan Co-authored-by: Shaun Ellis --- architecture-decisions/0002-indexing.md | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/architecture-decisions/0002-indexing.md b/architecture-decisions/0002-indexing.md index a3802f37..f5a3cda4 100644 --- a/architecture-decisions/0002-indexing.md +++ b/architecture-decisions/0002-indexing.md @@ -43,8 +43,6 @@ The Hydration Cache has the following structure: We'll pull records as well as DeletionMarkers so we'll know and record when records have been deleted from Figgy. -If retries have been enqueued, the Hydrator will pull from the retry queue instad of from Figgy. For each resource ID in the retry queue, the Hydrator will duplicate the last row found for that resource in the Hydration Cache, updating its cache order to be the next number in the sequence. - #### Performance Requirements for Full Hydration 1 Hour - 2 Days @@ -57,9 +55,9 @@ The faster we can do a full re-harvest, the faster we can pull in broad metadata The Transformer will query the Hydration Cache to fetch the records cached by the Hydration step, convert them to a Solr document, and store that solr document in a local postgres cache (the Transformation Cache) with the following structure: -| id | data | cache_order | cache_version | record_id | error | source_cache_order | -|------|-------|-------------|---------------|-----------|---------|--------------------| -| INT | BLOB | DATETIME | INT | VARCHAR | TEXT | DATETIME | +| id | data | cache_order | cache_version | record_id | source_cache_order | +|------|-------|-------------|---------------|-----------|--------------------| +| INT | BLOB | DATETIME | INT | VARCHAR | DATETIME | #### Performance Requirements for Full Transformation @@ -87,7 +85,6 @@ We expect reindexing to need to happen often - either because of changing weight sequenceDiagram title A full Indexing Pipeline workflow Participant ProcessorMarkers Participant FiggyDB -Participant RetryQueue Participant HydratorV1 as Hydrator(cache_version: 1) Participant HydrationCache Participant TransformerV1 as Transformer(cache_version: 1) @@ -97,8 +94,6 @@ Participant SolrIndex HydratorV1->>ProcessorMarkers: Set(type: hydrator, cache_location: pre_figgy_timestamp, cache_version: 1) loop Populate the Cache in Batches -HydratorV1->>RetryQueue: Get resource IDs from queue -HydratorV1->>HydratorV1: Duplicate retry resource rows into cache HydratorV1->>ProcessorMarkers: Get last cache_location HydratorV1->>FiggyDB: Get X (e.g. 500) records with update_at later than cache_location HydratorV1->>HydrationCache: Store the records with cache version 1 @@ -150,15 +145,8 @@ To support concurrency in these processes we will use the source_cache_order fie * If we're re-running a process because we've reset the cache_order in the ProcessorMarkers table, then it will write new records because it allows updating rows where source_cache_order is equal. ## Resilience and Error Handling -If postgres or Solr fails, we should let the Processors crash and restart indefinitely. When the service comes back up, they will resume their expected operation. - -When a Transformation error occurs: -1. The Transformer does its best to create a Solr record, with incomplete data. -1. It gets logged by writing the error message in the `error` field and sending the notification to Honeybadger. -1. DLS can review errors via scripts and Honeybadger weekly review. -1. DLS fixes error(s). -1. DLS adds the record ID to the retry queue. +The system will have a way to automatically retry errors caused by system downtime, and log errors caused by bugs in our code. ## Consequences