From 5aed420b2f64abb027c350f485c3970888b69b36 Mon Sep 17 00:00:00 2001 From: Julien Rousseau Date: Wed, 22 Nov 2023 12:56:41 -0500 Subject: [PATCH 1/7] handle entity change updates --- src/clickhouse/handleSinkRequest.ts | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index 8746e2f..974e455 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -152,12 +152,18 @@ async function handleEntityChange( switch (change.operation) { case "OPERATION_CREATE": + prometheus.entity_changes_inserted.inc(); return insertEntityChange(table, values, { ...metadata, id: change.id }); + // Updates are inserted as new rows in ClickHouse. This allows for the full history. + // If the user wants to override old data, they can specify it in their schema + // by setting the timestamp in the sorting key and by using a ReplacingMergeTree. case "OPERATION_UPDATE": - return updateEntityChange(); + prometheus.entity_changes_updated.inc(); + return insertEntityChange(table, values, { ...metadata, id: change.id }); case "OPERATION_DELETE": + prometheus.entity_changes_deleted.inc(); return deleteEntityChange(); default: @@ -195,21 +201,10 @@ function insertEntityChange( metadata.cursor ) ); - - prometheus.entity_changes_inserted.inc(); -} - -// TODO: implement function -function updateEntityChange(): Promise { - prometheus.entity_changes_updated.inc(); - return Promise.resolve(); - - // return client.update(); } // TODO: implement function function deleteEntityChange(): Promise { - prometheus.entity_changes_deleted.inc(); return Promise.resolve(); // return client.delete({ values, table: change.entity }); From ac1e176f3b8e72e0929a9d1d01761496b4c880fc Mon Sep 17 00:00:00 2001 From: Julien Rousseau Date: Wed, 22 Nov 2023 12:56:57 -0500 Subject: [PATCH 2/7] prepare for entity change deletes --- src/clickhouse/tables/deleted_entity_changes.sql | 11 +++++++++++ src/clickhouse/tables/index.ts | 3 +++ 2 files changed, 14 insertions(+) create mode 100644 src/clickhouse/tables/deleted_entity_changes.sql diff --git a/src/clickhouse/tables/deleted_entity_changes.sql b/src/clickhouse/tables/deleted_entity_changes.sql new file mode 100644 index 0000000..94b920a --- /dev/null +++ b/src/clickhouse/tables/deleted_entity_changes.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS deleted_entity_changes ( + chain LowCardinality(String), + source LowCardinality(String), + block_id FixedString(64), + block_number UInt32, + module_hash FixedString(40), + timestamp DateTime64(3, 'UTC'), +) +ENGINE = ReplacingMergeTree +PRIMARY KEY (source, block_id) +ORDER BY (source, block_id, block_number, chain, timestamp); \ No newline at end of file diff --git a/src/clickhouse/tables/index.ts b/src/clickhouse/tables/index.ts index 36c3ea1..ab3c67b 100644 --- a/src/clickhouse/tables/index.ts +++ b/src/clickhouse/tables/index.ts @@ -1,5 +1,6 @@ import blocks_sql from "./blocks.sql"; import cursors_sql from "./cursors.sql"; +import deleted_entity_changes_sql from "./deleted_entity_changes.sql"; import final_blocks_sql from "./final_blocks.sql"; import module_hashes_sql from "./module_hashes.sql"; import unparsed_json_sql from "./unparsed_json.sql"; @@ -9,6 +10,7 @@ export const final_blocks = await Bun.file(final_blocks_sql).text(); export const module_hashes = await Bun.file(module_hashes_sql).text(); export const unparsed_json = await Bun.file(unparsed_json_sql).text(); export const cursors = await Bun.file(cursors_sql).text(); +export const deleted_entity_changes = await Bun.file(deleted_entity_changes_sql).text(); export default [ ["blocks", blocks], @@ -16,4 +18,5 @@ export default [ ["module_hashes", module_hashes], ["unparsed_json", unparsed_json], ["cursors", cursors], + ["deleted_entity_changes", deleted_entity_changes], ]; From eb6ec3ea3f9bb724468adcf74df1c300d1da64b5 Mon Sep 17 00:00:00 2001 From: Julien Rousseau Date: Wed, 22 Nov 2023 13:01:52 -0500 Subject: [PATCH 3/7] initial delete processing --- src/clickhouse/handleSinkRequest.ts | 33 ++++++++++++++++++++++------- src/sqlite/sqlite.ts | 2 +- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index 974e455..55b71a7 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -121,7 +121,8 @@ function handleNoEntityChange(metadata: { clock: Clock; manifest: Manifest; curs manifest.moduleName, manifest.type, Number(new Date(clock.timestamp)), - cursor + cursor, + false ) ); } @@ -164,7 +165,7 @@ async function handleEntityChange( case "OPERATION_DELETE": prometheus.entity_changes_deleted.inc(); - return deleteEntityChange(); + return deleteEntityChange(table, { ...metadata, id: change.id }); default: prometheus.entity_changes_unsupported.inc(); @@ -198,14 +199,30 @@ function insertEntityChange( metadata.manifest.moduleName, metadata.manifest.type, Number(new Date(metadata.clock.timestamp)), - metadata.cursor + metadata.cursor, + false ) ); } -// TODO: implement function -function deleteEntityChange(): Promise { - return Promise.resolve(); - - // return client.delete({ values, table: change.entity }); +function deleteEntityChange( + source: string, + metadata: { id: string; clock: Clock; manifest: Manifest; cursor: string } +) { + sqliteQueue.add(() => { + sqlite.insert( + "", + source, + metadata.manifest.chain, + metadata.clock.id, + metadata.clock.number, + metadata.manifest.finalBlockOnly, + metadata.manifest.moduleHash, + metadata.manifest.moduleName, + metadata.manifest.type, + Number(new Date(metadata.clock.timestamp)), + metadata.cursor, + true + ); + }); } diff --git a/src/sqlite/sqlite.ts b/src/sqlite/sqlite.ts index 345f418..a930226 100644 --- a/src/sqlite/sqlite.ts +++ b/src/sqlite/sqlite.ts @@ -65,7 +65,7 @@ class SQLite { this.db.run("BEGIN TRANSACTION;"); } - public insert(entityChanges: string, source: string, chain: string, blockId: string, blockNumber: number, isFinal: boolean, moduleHash: string, moduleName: string, type: string, timestamp: number, cursor: string) { + public insert(entityChanges: string, source: string, chain: string, blockId: string, blockNumber: number, isFinal: boolean, moduleHash: string, moduleName: string, type: string, timestamp: number, cursor: string, isDelete: boolean) { this.insertStatement.run(this.batchNumber, entityChanges, source, chain, blockId, blockNumber, isFinal ? 1 : 0, moduleHash, moduleName, type, timestamp, cursor); } From 41bf9a43b79adcd65eaadfca49907a7020b19083 Mon Sep 17 00:00:00 2001 From: Julien Rousseau Date: Wed, 22 Nov 2023 16:05:10 -0500 Subject: [PATCH 4/7] implemented entity change delete --- src/clickhouse/handleSinkRequest.ts | 38 ++++++------------- .../tables/deleted_entity_changes.sql | 2 + src/sqlite/sqlite.ts | 2 +- 3 files changed, 14 insertions(+), 28 deletions(-) diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index 55b71a7..95e371e 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -121,8 +121,7 @@ function handleNoEntityChange(metadata: { clock: Clock; manifest: Manifest; curs manifest.moduleName, manifest.type, Number(new Date(clock.timestamp)), - cursor, - false + cursor ) ); } @@ -163,9 +162,17 @@ async function handleEntityChange( prometheus.entity_changes_updated.inc(); return insertEntityChange(table, values, { ...metadata, id: change.id }); + // Deleted entity changes are not actually removed from the database. + // They are stored in the 'deleted_entity_changes' table with their timestamp. + // Again, this allows to keep the full history while also providing the required information + // to correctly filter out unwanted data if necessary. case "OPERATION_DELETE": prometheus.entity_changes_deleted.inc(); - return deleteEntityChange(table, { ...metadata, id: change.id }); + return insertEntityChange( + "deleted_entity_changes", + { source: table }, + { ...metadata, id: change.id } + ); default: prometheus.entity_changes_unsupported.inc(); @@ -199,30 +206,7 @@ function insertEntityChange( metadata.manifest.moduleName, metadata.manifest.type, Number(new Date(metadata.clock.timestamp)), - metadata.cursor, - false + metadata.cursor ) ); } - -function deleteEntityChange( - source: string, - metadata: { id: string; clock: Clock; manifest: Manifest; cursor: string } -) { - sqliteQueue.add(() => { - sqlite.insert( - "", - source, - metadata.manifest.chain, - metadata.clock.id, - metadata.clock.number, - metadata.manifest.finalBlockOnly, - metadata.manifest.moduleHash, - metadata.manifest.moduleName, - metadata.manifest.type, - Number(new Date(metadata.clock.timestamp)), - metadata.cursor, - true - ); - }); -} diff --git a/src/clickhouse/tables/deleted_entity_changes.sql b/src/clickhouse/tables/deleted_entity_changes.sql index 94b920a..3e45439 100644 --- a/src/clickhouse/tables/deleted_entity_changes.sql +++ b/src/clickhouse/tables/deleted_entity_changes.sql @@ -1,10 +1,12 @@ CREATE TABLE IF NOT EXISTS deleted_entity_changes ( + id String, chain LowCardinality(String), source LowCardinality(String), block_id FixedString(64), block_number UInt32, module_hash FixedString(40), timestamp DateTime64(3, 'UTC'), + cursor String, ) ENGINE = ReplacingMergeTree PRIMARY KEY (source, block_id) diff --git a/src/sqlite/sqlite.ts b/src/sqlite/sqlite.ts index a930226..345f418 100644 --- a/src/sqlite/sqlite.ts +++ b/src/sqlite/sqlite.ts @@ -65,7 +65,7 @@ class SQLite { this.db.run("BEGIN TRANSACTION;"); } - public insert(entityChanges: string, source: string, chain: string, blockId: string, blockNumber: number, isFinal: boolean, moduleHash: string, moduleName: string, type: string, timestamp: number, cursor: string, isDelete: boolean) { + public insert(entityChanges: string, source: string, chain: string, blockId: string, blockNumber: number, isFinal: boolean, moduleHash: string, moduleName: string, type: string, timestamp: number, cursor: string) { this.insertStatement.run(this.batchNumber, entityChanges, source, chain, blockId, blockNumber, isFinal ? 1 : 0, moduleHash, moduleName, type, timestamp, cursor); } From 3b61515654b166c3f03e633265915aa35658ef22 Mon Sep 17 00:00:00 2001 From: Julien Rousseau Date: Wed, 22 Nov 2023 17:06:27 -0500 Subject: [PATCH 5/7] updated docs --- README.md | 53 ++++++++++++++++++++++------- src/clickhouse/handleSinkRequest.ts | 2 +- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 8487795..d61d2cc 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,19 @@ ## Features +
+Entity changes support + +Support for these entity change operations: + +- `OPERATION_CREATE`: The received entity changes are directly inserted into ClickHouse according to a provided [schema](#schema-initialization). + +- `OPERATION_UPDATE`: By default, updates are treated as new items. This allows to build an history of every transactions. If required, previous records can be replaced by specifying the engine as `ReplacingMergeTree`. See this [article](https://clickhouse.com/docs/en/guides/developer/deduplication#using-replacingmergetree-for-upserts). + +- `OPERATION_DELETE`: Entity changes are not actually deleted from the database. Again, this allows to build an history of every transactions. The deleted fields are inserted into `deleted_entity_changes`. This table can then be used to filter out deleted data if required. + +
+
Serverless data sinking @@ -133,7 +146,7 @@ Options: --allow-unparsed Enable storage in 'unparsed_json' table (default: false, env: ALLOW_UNPARSED) --transaction-size Number of insert statements in a SQLite transaction (default: 50, env: TRANSACTION_SIZE) --resume Save the cached data from the previous process into ClickHouse (default: true, env: RESUME) - --buffer SQLite database to use as an insertion buffer. Use ':memory:' to make it volatile. (default: buffer.sqlite, env: BUFFER) + --buffer SQLite database to use as an insertion buffer. Use ':memory:' to make it volatile. (default: buffer.db, env: BUFFER) -h, --help display help for command ``` @@ -171,14 +184,18 @@ The `USER_DIMENSION` is generated by the user provided schema and is augmented b ```mermaid erDiagram USER_DIMENSION }|--|{ blocks : " " - USER_DIMENSION }|--|{ module_hashes : " " + module_hashes }|--|{ USER_DIMENSION : " " USER_DIMENSION }|--|{ cursors : " " - blocks }|--|{ final_blocks : " " + deleted_entity_changes }|--|{ blocks : " " + module_hashes }|--|{ deleted_entity_changes : " " + deleted_entity_changes }|--|{ cursors : " " - blocks }|--|{ unparsed_json : " " + unparsed_json }|--|{ blocks : " " module_hashes }|--|{ unparsed_json : " " - cursors }|--|{ unparsed_json : " " + unparsed_json }|--|{ cursors : " " + + blocks }|--|{ final_blocks : " " USER_DIMENSION { user_data unknown @@ -191,6 +208,17 @@ erDiagram cursor String } + deleted_entity_changes { + source LowCardinality(String) + id String + chain LowCardinality(String) + block_id FixedString(64) + block_number UInt32 + module_hash FixedString(40) + timestamp DateTime(3_UTC) + cursor String + } + unparsed_json { raw_data String source LowCardinality(String) @@ -232,13 +260,14 @@ erDiagram **Indexes** -| Table | Fields | -| ------------- | -------------------------------------------- | -| blocks | `(block_id, block_number, chain, timestamp)` | -| module_hashes | `module_hash` | -| cursors | `(cursor, module_hash, block_id)` | -| unparsed_json | `(source, chain, module_hash, block_id)` | -| final_blocks | `block_id` | +| Table | Fields | +| ---------------------- | ---------------------------------------------------- | +| blocks | `(block_id, block_number, chain, timestamp)` | +| deleted_entity_changes | `(source, block_id, block_number, chain, timestamp)` | +| module_hashes | `module_hash` | +| cursors | `(cursor, module_hash, block_id)` | +| unparsed_json | `(source, chain, module_hash, block_id)` | +| final_blocks | `block_id` | ### Database initialization diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index 95e371e..d7b043e 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -157,7 +157,7 @@ async function handleEntityChange( // Updates are inserted as new rows in ClickHouse. This allows for the full history. // If the user wants to override old data, they can specify it in their schema - // by setting the timestamp in the sorting key and by using a ReplacingMergeTree. + // by using a ReplacingMergeTree. case "OPERATION_UPDATE": prometheus.entity_changes_updated.inc(); return insertEntityChange(table, values, { ...metadata, id: change.id }); From b953f91430d07a3a0a89dd0cb20a49e6e6988853 Mon Sep 17 00:00:00 2001 From: Julien Rousseau Date: Thu, 23 Nov 2023 10:05:09 -0500 Subject: [PATCH 6/7] edited the sqlite object interface --- src/clickhouse/handleSinkRequest.ts | 41 +++-------------------------- src/sqlite/sqlite.ts | 36 ++++++++++++++++++------- 2 files changed, 30 insertions(+), 47 deletions(-) diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index d7b043e..2cbbf70 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -93,9 +93,7 @@ export function saveKnownEntityChanges() { if (await store.existsTable(table)) { await client.insert({ table, values, format: "JSONEachRow" }); } else { - logger.info( - `Skipped (${values.length}) records assigned to table '${table}' because it does not exist.` - ); + logger.info(`Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`); } } } @@ -108,22 +106,7 @@ function batchSizeLimitReached() { function handleNoEntityChange(metadata: { clock: Clock; manifest: Manifest; cursor: string }) { const { clock, manifest, cursor } = metadata; - - sqliteQueue.add(() => - sqlite.insert( - "", - "", - manifest.chain, - clock.id, - clock.number, - manifest.finalBlockOnly, - manifest.moduleHash, - manifest.moduleName, - manifest.type, - Number(new Date(clock.timestamp)), - cursor - ) - ); + sqliteQueue.add(() => sqlite.insert("", "", clock, manifest, cursor)); } async function handleEntityChange( @@ -168,11 +151,7 @@ async function handleEntityChange( // to correctly filter out unwanted data if necessary. case "OPERATION_DELETE": prometheus.entity_changes_deleted.inc(); - return insertEntityChange( - "deleted_entity_changes", - { source: table }, - { ...metadata, id: change.id } - ); + return insertEntityChange("deleted_entity_changes", { source: table }, { ...metadata, id: change.id }); default: prometheus.entity_changes_unsupported.inc(); @@ -195,18 +174,6 @@ function insertEntityChange( values["cursor"] = metadata.cursor; // Block cursor for current substreams sqliteQueue.add(() => - sqlite.insert( - JSON.stringify(values), - table, - metadata.manifest.chain, - metadata.clock.id, - metadata.clock.number, - metadata.manifest.finalBlockOnly, - metadata.manifest.moduleHash, - metadata.manifest.moduleName, - metadata.manifest.type, - Number(new Date(metadata.clock.timestamp)), - metadata.cursor - ) + sqlite.insert(JSON.stringify(values), table, metadata.clock, metadata.manifest, metadata.cursor) ); } diff --git a/src/sqlite/sqlite.ts b/src/sqlite/sqlite.ts index 345f418..381880a 100644 --- a/src/sqlite/sqlite.ts +++ b/src/sqlite/sqlite.ts @@ -2,6 +2,7 @@ import { file } from "bun"; import Database, { Statement } from "bun:sqlite"; import { config } from "../config.js"; import { Err, Ok, Result } from "../result.js"; +import { Clock, Manifest } from "../schemas.js"; import tableSQL from "./table.sql"; const selectSQL = { @@ -65,11 +66,26 @@ class SQLite { this.db.run("BEGIN TRANSACTION;"); } - public insert(entityChanges: string, source: string, chain: string, blockId: string, blockNumber: number, isFinal: boolean, moduleHash: string, moduleName: string, type: string, timestamp: number, cursor: string) { - this.insertStatement.run(this.batchNumber, entityChanges, source, chain, blockId, blockNumber, isFinal ? 1 : 0, moduleHash, moduleName, type, timestamp, cursor); + public insert(entityChanges: string, source: string, clock: Clock, manifest: Manifest, cursor: string) { + const { chain, finalBlockOnly, moduleHash, moduleName, type } = manifest; + const { id: blockId, number: blockNumber, timestamp: timestampStr } = clock; + + const isFinal = finalBlockOnly ? 1 : 0; + const timestamp = Number(new Date(timestampStr)); + + const args = [source, chain, blockId, blockNumber, isFinal, moduleHash, moduleName, type, timestamp, cursor]; + this.insertStatement.run(this.batchNumber, entityChanges, ...args); } - public async commitBuffer(onData: (blocks: unknown[], cursors: unknown[], finalBlocks: unknown[], moduleHashes: unknown[], entityChanges: Record) => Promise): Promise { + public async commitBuffer( + onData: ( + blocks: unknown[], + cursors: unknown[], + finalBlocks: unknown[], + moduleHashes: unknown[], + entityChanges: Record + ) => Promise + ): Promise { try { this.batchNumber++; @@ -82,7 +98,9 @@ class SQLite { const sources = this.selectSourcesStatement.all(this.batchNumber); for (const { source } of sources) { if (source.length > 0) { - entityChanges[source] = this.selecEntityChangesStatement.all(this.batchNumber, source).map((response) => JSON.parse(response.entity_changes)); + entityChanges[source] = this.selecEntityChangesStatement + .all(this.batchNumber, source) + .map((response) => JSON.parse(response.entity_changes)); } } @@ -103,16 +121,14 @@ class SQLite { private get initialBatchNumber() { try { - const response = this.db - .query<{ batch_number: number }, any>( - `SELECT MAX(batch_number) AS batch_number + const sql = `SELECT MAX(batch_number) AS batch_number FROM ( SELECT batch_number FROM data_buffer UNION ALL SELECT 0 AS batch_number - )` - ) - .get(); + )`; + + const response = this.db.query<{ batch_number: number }, any>(sql).get(); return response!.batch_number + 1; } catch { return 0; From 551e709269608a70e4eb022ad8ae3ab36fd62eb3 Mon Sep 17 00:00:00 2001 From: Julien Rousseau Date: Thu, 23 Nov 2023 10:56:11 -0500 Subject: [PATCH 7/7] removed automatic tables from openapi doc --- src/clickhouse/stores.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/clickhouse/stores.ts b/src/clickhouse/stores.ts index 2d024d2..501b2b6 100644 --- a/src/clickhouse/stores.ts +++ b/src/clickhouse/stores.ts @@ -1,6 +1,8 @@ import { logger } from "../logger.js"; import { readOnlyClient } from "./createClient.js"; +const hiddenTables = ["blocks", "module_hashes", "cursors", "final_blocks", "unparsed_json", "deleted_entity_changes"]; + class ClickhouseStore { public paused = false; @@ -23,7 +25,6 @@ class ClickhouseStore { public get publicTables() { if (!this.publicTablesPromise) { - const hiddenTables = ["blocks", "module_hashes", "cursors", "final_blocks"]; this.publicTablesPromise = readOnlyClient .query({ query: "SHOW TABLES", format: "JSONEachRow" }) .then((response) => response.json>())