Skip to content

Commit

Permalink
Merge pull request #87 from pinax-network/feature/update-delete-entit…
Browse files Browse the repository at this point in the history
…y-changes

Update and delete entity changes
  • Loading branch information
DenisCarriere authored Nov 27, 2023
2 parents 9a0045d + d250525 commit 20f921d
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 81 deletions.
53 changes: 41 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@

## Features

<details>
<summary><b><a href="https://crates.io/crates/substreams-entity-change/">Entity changes</a> support</b></summary>

Support for these entity change operations:

- `OPERATION_CREATE`: The received entity changes are directly inserted into ClickHouse according to a provided [schema](#schema-initialization).

- `OPERATION_UPDATE`: By default, updates are treated as new items. This allows to build an history of every transactions. If required, previous records can be replaced by specifying the engine as `ReplacingMergeTree`. See this [article](https://clickhouse.com/docs/en/guides/developer/deduplication#using-replacingmergetree-for-upserts).

- `OPERATION_DELETE`: Entity changes are not actually deleted from the database. Again, this allows to build an history of every transactions. The deleted fields are inserted into `deleted_entity_changes`. This table can then be used to filter out deleted data if required.

</details>

<details>
<summary><b>Serverless data sinking</b></summary>

Expand Down Expand Up @@ -133,7 +146,7 @@ Options:
--allow-unparsed <boolean> Enable storage in 'unparsed_json' table (default: false, env: ALLOW_UNPARSED)
--transaction-size <number> Number of insert statements in a SQLite transaction (default: 50, env: TRANSACTION_SIZE)
--resume <boolean> Save the cached data from the previous process into ClickHouse (default: true, env: RESUME)
--buffer <string> SQLite database to use as an insertion buffer. Use ':memory:' to make it volatile. (default: buffer.sqlite, env: BUFFER)
--buffer <string> SQLite database to use as an insertion buffer. Use ':memory:' to make it volatile. (default: buffer.db, env: BUFFER)
-h, --help display help for command
```

Expand Down Expand Up @@ -171,14 +184,18 @@ The `USER_DIMENSION` is generated by the user provided schema and is augmented b
```mermaid
erDiagram
USER_DIMENSION }|--|{ blocks : " "
USER_DIMENSION }|--|{ module_hashes : " "
module_hashes }|--|{ USER_DIMENSION : " "
USER_DIMENSION }|--|{ cursors : " "
blocks }|--|{ final_blocks : " "
deleted_entity_changes }|--|{ blocks : " "
module_hashes }|--|{ deleted_entity_changes : " "
deleted_entity_changes }|--|{ cursors : " "
blocks }|--|{ unparsed_json : " "
unparsed_json }|--|{ blocks : " "
module_hashes }|--|{ unparsed_json : " "
cursors }|--|{ unparsed_json : " "
unparsed_json }|--|{ cursors : " "
blocks }|--|{ final_blocks : " "
USER_DIMENSION {
user_data unknown
Expand All @@ -191,6 +208,17 @@ erDiagram
cursor String
}
deleted_entity_changes {
source LowCardinality(String)
id String
chain LowCardinality(String)
block_id FixedString(64)
block_number UInt32
module_hash FixedString(40)
timestamp DateTime(3_UTC)
cursor String
}
unparsed_json {
raw_data String
source LowCardinality(String)
Expand Down Expand Up @@ -232,13 +260,14 @@ erDiagram

**Indexes**

| Table | Fields |
| ------------- | -------------------------------------------- |
| blocks | `(block_id, block_number, chain, timestamp)` |
| module_hashes | `module_hash` |
| cursors | `(cursor, module_hash, block_id)` |
| unparsed_json | `(source, chain, module_hash, block_id)` |
| final_blocks | `block_id` |
| Table | Fields |
| ---------------------- | ---------------------------------------------------- |
| blocks | `(block_id, block_number, chain, timestamp)` |
| deleted_entity_changes | `(source, block_id, block_number, chain, timestamp)` |
| module_hashes | `module_hash` |
| cursors | `(cursor, module_hash, block_id)` |
| unparsed_json | `(source, chain, module_hash, block_id)` |
| final_blocks | `block_id` |

### Database initialization

Expand Down
71 changes: 16 additions & 55 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ export function saveKnownEntityChanges() {
if (await store.existsTable(table)) {
await client.insert({ table, values, format: "JSONEachRow" });
} else {
logger.info(
`Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`
);
logger.info(`Skipped (${values.length}) records assigned to table '${table}' because it does not exist.`);
}
}
}
Expand All @@ -111,22 +109,7 @@ function batchSizeLimitReached() {

function handleNoEntityChange(metadata: { clock: Clock; manifest: Manifest; cursor: string }) {
const { clock, manifest, cursor } = metadata;

sqliteQueue.add(() =>
sqlite.insert(
"",
"",
manifest.chain,
clock.id,
clock.number,
manifest.finalBlockOnly,
manifest.moduleHash,
manifest.moduleName,
manifest.type,
Number(new Date(clock.timestamp)),
cursor
)
);
sqliteQueue.add(() => sqlite.insert("", "", clock, manifest, cursor));
}

async function handleEntityChange(
Expand All @@ -140,6 +123,7 @@ async function handleEntityChange(
const jsonData = JSON.stringify(values);
const clock = JSON.stringify(metadata.clock);
const manifest = JSON.stringify(metadata.manifest);
const environment = { chain: metadata.manifest.chain, module_hash: metadata.manifest.moduleHash };

if (!tableExists) {
if (!config.allowUnparsed) {
Expand All @@ -155,13 +139,23 @@ async function handleEntityChange(

switch (change.operation) {
case "OPERATION_CREATE":
prometheus.entity_changes_inserted.inc(environment);
return insertEntityChange(table, values, { ...metadata, id: change.id });

// Updates are inserted as new rows in ClickHouse. This allows for the full history.
// If the user wants to override old data, they can specify it in their schema
// by using a ReplacingMergeTree.
case "OPERATION_UPDATE":
return updateEntityChange();
prometheus.entity_changes_updated.inc(environment);
return insertEntityChange(table, values, { ...metadata, id: change.id });

// Deleted entity changes are not actually removed from the database.
// They are stored in the 'deleted_entity_changes' table with their timestamp.
// Again, this allows to keep the full history while also providing the required information
// to correctly filter out unwanted data if necessary.
case "OPERATION_DELETE":
return deleteEntityChange();
prometheus.entity_changes_deleted.inc(environment);
return insertEntityChange("deleted_entity_changes", { source: table }, { ...metadata, id: change.id });

default:
prometheus.entity_changes_unsupported.inc();
Expand All @@ -184,39 +178,6 @@ function insertEntityChange(
values["cursor"] = metadata.cursor; // Block cursor for current substreams

sqliteQueue.add(() =>
sqlite.insert(
JSON.stringify(values),
table,
metadata.manifest.chain,
metadata.clock.id,
metadata.clock.number,
metadata.manifest.finalBlockOnly,
metadata.manifest.moduleHash,
metadata.manifest.moduleName,
metadata.manifest.type,
Number(new Date(metadata.clock.timestamp)),
metadata.cursor
)
sqlite.insert(JSON.stringify(values), table, metadata.clock, metadata.manifest, metadata.cursor)
);

prometheus.entity_changes_inserted.inc({
chain: metadata.manifest.chain,
module_hash: metadata.manifest.moduleHash,
});
}

// TODO: implement function
function updateEntityChange(): Promise<void> {
prometheus.entity_changes_updated.inc();
return Promise.resolve();

// return client.update();
}

// TODO: implement function
function deleteEntityChange(): Promise<void> {
prometheus.entity_changes_deleted.inc();
return Promise.resolve();

// return client.delete({ values, table: change.entity });
}
3 changes: 2 additions & 1 deletion src/clickhouse/stores.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { logger } from "../logger.js";
import { readOnlyClient } from "./createClient.js";

const hiddenTables = ["blocks", "module_hashes", "cursors", "final_blocks", "unparsed_json", "deleted_entity_changes"];

class ClickhouseStore {
public paused = false;

Expand All @@ -23,7 +25,6 @@ class ClickhouseStore {

public get publicTables() {
if (!this.publicTablesPromise) {
const hiddenTables = ["blocks", "module_hashes", "cursors", "final_blocks"];
this.publicTablesPromise = readOnlyClient
.query({ query: "SHOW TABLES", format: "JSONEachRow" })
.then((response) => response.json<Array<{ name: string }>>())
Expand Down
13 changes: 13 additions & 0 deletions src/clickhouse/tables/deleted_entity_changes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS deleted_entity_changes (
id String,
chain LowCardinality(String),
source LowCardinality(String),
block_id FixedString(64),
block_number UInt32,
module_hash FixedString(40),
timestamp DateTime64(3, 'UTC'),
cursor String,
)
ENGINE = ReplacingMergeTree
PRIMARY KEY (source, block_id)
ORDER BY (source, block_id, block_number, chain, timestamp);
3 changes: 3 additions & 0 deletions src/clickhouse/tables/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import blocks_sql from "./blocks.sql";
import cursors_sql from "./cursors.sql";
import deleted_entity_changes_sql from "./deleted_entity_changes.sql";
import final_blocks_sql from "./final_blocks.sql";
import module_hashes_sql from "./module_hashes.sql";
import unparsed_json_sql from "./unparsed_json.sql";
Expand All @@ -9,11 +10,13 @@ export const final_blocks = await Bun.file(final_blocks_sql).text();
export const module_hashes = await Bun.file(module_hashes_sql).text();
export const unparsed_json = await Bun.file(unparsed_json_sql).text();
export const cursors = await Bun.file(cursors_sql).text();
export const deleted_entity_changes = await Bun.file(deleted_entity_changes_sql).text();

export default [
["blocks", blocks],
["final_blocks", final_blocks],
["module_hashes", module_hashes],
["unparsed_json", unparsed_json],
["cursors", cursors],
["deleted_entity_changes", deleted_entity_changes],
];
6 changes: 3 additions & 3 deletions src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ export const request_errors = registerCounter("request_errors", "Total failed re
export const sink_requests = registerCounter("sink_requests", "Total sink requests", ["chain", "module_hash"])!;

export const entity_changes_inserted = registerCounter("entity_changes_inserted", "Total inserted entity changes", ["chain", "module_hash"])!;
export const entity_changes_updated = registerCounter("entity_changes_updated", "Total updated entity changes")!;
export const entity_changes_deleted = registerCounter("entity_changes_deleted", "Total deleted entity changes")!;
export const entity_changes_unsupported = registerCounter("entity_changes_unsupported", "Total unsupported entity changes")!;
export const entity_changes_updated = registerCounter("entity_changes_updated", "Total updated entity changes", ["chain", "module_hash"])!;
export const entity_changes_deleted = registerCounter("entity_changes_deleted", "Total deleted entity changes", ["chain", "module_hash"])!;
export const entity_changes_unsupported = registerCounter("entity_changes_unsupported", "Total unsupported entity changes", ["chain", "module_hash"])!;
36 changes: 26 additions & 10 deletions src/sqlite/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { file } from "bun";
import Database, { Statement } from "bun:sqlite";
import { config } from "../config.js";
import { Ok, Result, UnknownErr } from "../result.js";
import { Clock, Manifest } from "../schemas.js";
import tableSQL from "./table.sql";

const selectSQL = {
Expand Down Expand Up @@ -65,11 +66,26 @@ class SQLite {
this.db.run("BEGIN TRANSACTION;");
}

public insert(entityChanges: string, source: string, chain: string, blockId: string, blockNumber: number, isFinal: boolean, moduleHash: string, moduleName: string, type: string, timestamp: number, cursor: string) {
this.insertStatement.run(this.batchNumber, entityChanges, source, chain, blockId, blockNumber, isFinal ? 1 : 0, moduleHash, moduleName, type, timestamp, cursor);
public insert(entityChanges: string, source: string, clock: Clock, manifest: Manifest, cursor: string) {
const { chain, finalBlockOnly, moduleHash, moduleName, type } = manifest;
const { id: blockId, number: blockNumber, timestamp: timestampStr } = clock;

const isFinal = finalBlockOnly ? 1 : 0;
const timestamp = Number(new Date(timestampStr));

const args = [source, chain, blockId, blockNumber, isFinal, moduleHash, moduleName, type, timestamp, cursor];
this.insertStatement.run(this.batchNumber, entityChanges, ...args);
}

public async commitBuffer(onData: (blocks: unknown[], cursors: unknown[], finalBlocks: unknown[], moduleHashes: unknown[], entityChanges: Record<string, unknown[]>) => Promise<void>): Promise<Result> {
public async commitBuffer(
onData: (
blocks: unknown[],
cursors: unknown[],
finalBlocks: unknown[],
moduleHashes: unknown[],
entityChanges: Record<string, unknown[]>
) => Promise<void>
): Promise<Result> {
try {
this.batchNumber++;

Expand All @@ -82,7 +98,9 @@ class SQLite {
const sources = this.selectSourcesStatement.all(this.batchNumber);
for (const { source } of sources) {
if (source.length > 0) {
entityChanges[source] = this.selecEntityChangesStatement.all(this.batchNumber, source).map((response) => JSON.parse(response.entity_changes));
entityChanges[source] = this.selecEntityChangesStatement
.all(this.batchNumber, source)
.map((response) => JSON.parse(response.entity_changes));
}
}

Expand All @@ -97,16 +115,14 @@ class SQLite {

private get initialBatchNumber() {
try {
const response = this.db
.query<{ batch_number: number }, any>(
`SELECT MAX(batch_number) AS batch_number
const sql = `SELECT MAX(batch_number) AS batch_number
FROM (
SELECT batch_number FROM data_buffer
UNION ALL
SELECT 0 AS batch_number
)`
)
.get();
)`;

const response = this.db.query<{ batch_number: number }, any>(sql).get();
return response!.batch_number + 1;
} catch {
return 0;
Expand Down

0 comments on commit 20f921d

Please sign in to comment.