diff --git a/src/clickhouse/handleSinkRequest.ts b/src/clickhouse/handleSinkRequest.ts index d0795ce..42dfe51 100644 --- a/src/clickhouse/handleSinkRequest.ts +++ b/src/clickhouse/handleSinkRequest.ts @@ -60,35 +60,19 @@ export async function handleSinkRequest({ data, ...metadata }: PayloadBody) { } export function saveKnownEntityChanges() { - return sqlite.commitBuffer(async (blocks, cursors, finalBlocks, moduleHashes, entityChanges) => { + return sqlite.commitBuffer(async (blocks, finalBlocks, moduleHashes, entityChanges) => { if (moduleHashes.length > 0) { - await client.insert({ - values: moduleHashes, - table: "module_hashes", - format: "JSONEachRow", - }); + await client.insert({ values: moduleHashes, table: "module_hashes", format: "JSONEachRow" }); } if (finalBlocks.length > 0) { - await client.insert({ - values: finalBlocks, - table: "final_blocks", - format: "JSONEachRow", - }); + await client.insert({ values: finalBlocks, table: "final_blocks", format: "JSONEachRow" }); } if (blocks.length > 0) { await client.insert({ values: blocks, table: "blocks", format: "JSONEachRow" }); } - if (cursors.length > 0) { - await client.insert({ - values: cursors, - table: "cursors", - format: "JSONEachRow", - }); - } - for (const [table, values] of Object.entries(entityChanges)) { if (values.length > 0) { // This check ensures that old stale data coming from SQLite diff --git a/src/clickhouse/stores.ts b/src/clickhouse/stores.ts index 501b2b6..343d6b0 100644 --- a/src/clickhouse/stores.ts +++ b/src/clickhouse/stores.ts @@ -1,13 +1,11 @@ import { logger } from "../logger.js"; import { readOnlyClient } from "./createClient.js"; -const hiddenTables = ["blocks", "module_hashes", "cursors", "final_blocks", "unparsed_json", "deleted_entity_changes"]; - class ClickhouseStore { public paused = false; private chainsPromise: Promise | null = null; - private publicTablesPromise: Promise | null = null; + private moduleHashesPromises: Promise | null = null; private knownTables = new Map(); @@ -23,17 +21,16 @@ class ClickhouseStore { return this.chainsPromise; } - public get publicTables() { - if (!this.publicTablesPromise) { - this.publicTablesPromise = readOnlyClient - .query({ query: "SHOW TABLES", format: "JSONEachRow" }) - .then((response) => response.json>()) - .then((names) => names.map(({ name }) => name)) - .then((names) => names.filter((table) => !hiddenTables.includes(table))) + public get moduleHashes() { + if (!this.moduleHashesPromises) { + this.moduleHashesPromises = readOnlyClient + .query({ query: "SELECT DISTINCT module_hash from module_hashes", format: "JSONEachRow" }) + .then((response) => response.json>()) + .then((moduleHashes) => moduleHashes.map(({ module_hash }) => module_hash)) .catch(() => []); } - return this.publicTablesPromise; + return this.moduleHashesPromises; } // in memory TABLE name cache @@ -63,7 +60,7 @@ class ClickhouseStore { public reset() { this.chainsPromise = null; - this.publicTablesPromise = null; + this.moduleHashesPromises = null; this.knownTables.clear(); logger.info("Cache has been cleared"); } diff --git a/src/clickhouse/tables/cursors.sql b/src/clickhouse/tables/cursors.sql deleted file mode 100644 index 5f12ea1..0000000 --- a/src/clickhouse/tables/cursors.sql +++ /dev/null @@ -1,10 +0,0 @@ -CREATE TABLE IF NOT EXISTS cursors ( - cursor String, - module_hash FixedString(40), - block_id FixedString(64), - block_number UInt32, - chain LowCardinality(String), -) -ENGINE = ReplacingMergeTree -PRIMARY KEY (cursor) -ORDER BY (cursor, module_hash, block_id); \ No newline at end of file diff --git a/src/clickhouse/tables/index.ts b/src/clickhouse/tables/index.ts index ab3c67b..b9b50b1 100644 --- a/src/clickhouse/tables/index.ts +++ b/src/clickhouse/tables/index.ts @@ -1,5 +1,4 @@ import blocks_sql from "./blocks.sql"; -import cursors_sql from "./cursors.sql"; import deleted_entity_changes_sql from "./deleted_entity_changes.sql"; import final_blocks_sql from "./final_blocks.sql"; import module_hashes_sql from "./module_hashes.sql"; @@ -9,7 +8,6 @@ export const blocks = await Bun.file(blocks_sql).text(); export const final_blocks = await Bun.file(final_blocks_sql).text(); export const module_hashes = await Bun.file(module_hashes_sql).text(); export const unparsed_json = await Bun.file(unparsed_json_sql).text(); -export const cursors = await Bun.file(cursors_sql).text(); export const deleted_entity_changes = await Bun.file(deleted_entity_changes_sql).text(); export default [ @@ -17,6 +15,5 @@ export default [ ["final_blocks", final_blocks], ["module_hashes", module_hashes], ["unparsed_json", unparsed_json], - ["cursors", cursors], ["deleted_entity_changes", deleted_entity_changes], ]; diff --git a/src/clickhouse/tables/module_hashes.sql b/src/clickhouse/tables/module_hashes.sql index a78e22f..b2d40e5 100644 --- a/src/clickhouse/tables/module_hashes.sql +++ b/src/clickhouse/tables/module_hashes.sql @@ -1,8 +1,11 @@ CREATE TABLE IF NOT EXISTS module_hashes ( - module_hash FixedString(40), - module_name String(), - chain LowCardinality(String), - type String(), + module_hash FixedString(40), + module_name String, + chain LowCardinality(String), + type String, + latest_cursor String, + latest_block_number UInt32, + latest_block_id FixedString(64), ) ENGINE = ReplacingMergeTree ORDER BY (module_hash, chain); \ No newline at end of file diff --git a/src/fetch/GET.ts b/src/fetch/GET.ts index 380ea91..da00966 100644 --- a/src/fetch/GET.ts +++ b/src/fetch/GET.ts @@ -4,7 +4,7 @@ import swaggerHtml from "../../swagger/index.html"; import { metrics } from "../prometheus.js"; import { blocks } from "./blocks.js"; import { NotFound, toFile, toJSON } from "./cors.js"; -import { findCursorsForMissingBlocks, findLatestCursor } from "./cursors.js"; +import { findLatestCursor } from "./cursors.js"; import health from "./health.js"; import { openapi } from "./openapi.js"; @@ -18,7 +18,6 @@ export default async function (req: Request) { if (pathname === "/openapi") return toJSON(await openapi()); if (pathname === "/blocks") return blocks(); if (pathname === "/cursors/latest") return findLatestCursor(req); - if (pathname === "/cursors/missing") return findCursorsForMissingBlocks(req); return NotFound; } diff --git a/src/fetch/cursors.ts b/src/fetch/cursors.ts index db7ce0b..08e4c05 100644 --- a/src/fetch/cursors.ts +++ b/src/fetch/cursors.ts @@ -11,13 +11,12 @@ export async function findLatestCursor(req: Request): Promise { } try { - const { table, chain } = parametersResult.payload; + const { moduleHash, chain } = parametersResult.payload; const query = ` - SELECT cursor, timestamp - FROM ${table} - WHERE chain = '${chain}' - ORDER BY timestamp DESC + SELECT latest_cursor, latest_block_number + FROM module_hashes + WHERE chain = '${chain}' AND module_hash = '${moduleHash}' LIMIT 1`; const response = await readOnlyClient.query({ query, format: "JSONEachRow" }); @@ -27,104 +26,35 @@ export async function findLatestCursor(req: Request): Promise { return toJSON(data[0]); } - return toText(`Bad request: no cursor found for '${table}' on '${chain}'.`, 400); + return toText(`Bad request: no cursor found for '${moduleHash}' on '${chain}'.`, 400); } catch (err) { logger.error(err); } return BadRequest; } -export async function findCursorsForMissingBlocks(req: Request): Promise { - const parametersResult = await verifyParameters(req); - if (!parametersResult.success) { - return parametersResult.error; - } - - try { - const { table, chain } = parametersResult.payload; - - const moduleHash = await getModuleHash(table, chain); - if (!moduleHash) { - return toText("Could not find module hash associated with table and chain", 500); - } - - // This query finds every block that does not have a next block (beginning of a missing range). - // It then pairs it with the first existing block after it (end of missing range). - // If the start block is the last one in the database (max value), it is ignored. - // When every range is found, they are joined with the 'cursors' table - // to find which cursor is associated with the min and max boundaries. - // The module_hash and the chain are used to determine the correct values to use. - const query = ` -SELECT block_ranges.from AS from_block_number, c1.cursor AS from_cursor, block_ranges.to AS to_block_number, c2.cursor AS to_cursor -FROM ( - SELECT c1.block_number AS from, MIN(c2.block_number) AS to - FROM cursors c1, cursors c2, ( - SELECT MAX(block_number) AS block_number - FROM cursors - WHERE chain = '${chain}' AND module_hash = '${moduleHash}' - ) AS maximum - WHERE c1.block_number + 1 NOT IN (SELECT block_number FROM cursors WHERE chain = '${chain}' AND module_hash = '${moduleHash}') - AND c1.chain = '${chain}' - AND c2.chain = '${chain}' - AND c1.block_number <> maximum.block_number - AND c2.block_number > c1.block_number - AND c1.module_hash = '${moduleHash}' - AND c2.module_hash = '${moduleHash}' - GROUP BY c1.block_number -) AS block_ranges -JOIN cursors c1 ON block_ranges.from = c1.block_number AND c1.chain = '${chain}' AND c1.module_hash = '${moduleHash}' -JOIN cursors c2 ON block_ranges.to = c2.block_number AND c2.chain = '${chain}' AND c2.module_hash = '${moduleHash}'`; - - const response = await readOnlyClient.query({ query, format: "JSONEachRow" }); - const data = await response.json< - Array<{ - from_block_number: number; - from_cursor: string; - to_block_number: number; - to_cursor: string; - }> - >(); - - const dto = data.map((record) => ({ - from: { block: record.from_block_number, cursor: record.from_cursor }, - to: { block: record.to_block_number, cursor: record.to_cursor }, - })); - - return toJSON(dto); - } catch (err) { - logger.error(err); - } - - return BadRequest; -} - -async function verifyParameters(req: Request): Promise> { +async function verifyParameters(req: Request): Promise> { const url = new URL(req.url); const chain = url.searchParams.get("chain"); - const table = url.searchParams.get("table"); + const moduleHash = url.searchParams.get("module_hash"); if (!chain) { return Err(toText("Missing parameter: chain", 400)); } - if (!table) { - return Err(toText("Missing parameter: table", 400)); + if (!moduleHash) { + return Err(toText("Missing parameter: module_hash", 400)); } if (!(await store.chains).includes(chain)) { + store.reset(); return Err(toText("Invalid parameter: chain=" + chain, 400)); } - if (!(await store.publicTables).includes(table)) { - return Err(toText("Invalid parameter: table=" + table, 400)); + if (!(await store.moduleHashes).includes(moduleHash)) { + store.reset(); + return Err(toText("Invalid parameter: moduleHash=" + moduleHash, 400)); } - return Ok({ chain, table }); -} - -async function getModuleHash(table: string, chain: string): Promise { - const query = `SELECT module_hash FROM ${table} WHERE chain = '${chain}'`; - const response = await readOnlyClient.query({ query, format: "JSONEachRow" }); - const data = await response.json>(); - return data[0]?.module_hash ?? null; + return Ok({ chain, moduleHash }); } diff --git a/src/fetch/openapi.ts b/src/fetch/openapi.ts index 51cf2c2..3b66e36 100644 --- a/src/fetch/openapi.ts +++ b/src/fetch/openapi.ts @@ -62,12 +62,9 @@ export async function openapi() { put: { tags: [TAGS.USAGE], summary: "Initialize the sink according to a SQL schema", - description: - "Supports `CREATE TABLE` statements
If an url is passed in, the body will not be executed.", + description: "Supports `CREATE TABLE` statements
If an url is passed in, the body will not be executed.", security: [{ "auth-key": [] }], - parameters: [ - { required: false, in: "query", name: "schema-url", schema: { type: "string" } }, - ], + parameters: [{ required: false, in: "query", name: "schema-url", schema: { type: "string" } }], requestBody: { content: { "text/plain": { @@ -99,9 +96,7 @@ export async function openapi() { url: "https://thegraph.com/docs/en/developing/creating-a-subgraph/#built-in-scalar-types", }, security: [{ "auth-key": [] }], - parameters: [ - { required: false, in: "query", name: "schema-url", schema: { type: "string" } }, - ], + parameters: [{ required: false, in: "query", name: "schema-url", schema: { type: "string" } }], requestBody: { content: { "text/plain": { @@ -198,10 +193,10 @@ export async function openapi() { parameters: [ { name: "chain", in: "query", required: true, schema: { enum: await store.chains } }, { - name: "table", + name: "module_hash", in: "query", required: true, - schema: { enum: await store.publicTables }, + schema: { enum: await store.moduleHashes }, }, ], responses: { @@ -217,39 +212,6 @@ export async function openapi() { }, }, }) - .addPath("/cursors/missing", { - get: { - tags: [TAGS.QUERIES], - summary: "Finds the missing blocks and returns the start and end cursors", - parameters: [ - { name: "chain", in: "query", required: true, schema: { enum: await store.chains } }, - { - name: "table", - in: "query", - required: true, - schema: { enum: await store.publicTables }, - }, - ], - responses: { - 200: { - description: "Success", - content: { - "application/json": { - schema: zodToJsonSchema( - z.array( - z.object({ - from: z.object({ block: z.number(), cursor: z.string() }), - to: z.object({ block: z.number(), cursor: z.string() }), - }) - ) - ), - }, - }, - }, - 400: PUT_RESPONSES[400], - }, - }, - }) .addPath("/blocks", { get: { tags: [TAGS.QUERIES], diff --git a/src/sqlite/sqlite.ts b/src/sqlite/sqlite.ts index 8d0bea4..bf7aa39 100644 --- a/src/sqlite/sqlite.ts +++ b/src/sqlite/sqlite.ts @@ -7,9 +7,9 @@ import tableSQL from "./table.sql"; const selectSQL = { blocks: "SELECT block_id, block_number, chain, timestamp FROM data_buffer WHERE batch_number <= ?;", - cursors: "SELECT cursor, module_hash, block_id, block_number, chain FROM data_buffer WHERE batch_number <= ?;", finalBlocks: "SELECT block_id FROM data_buffer WHERE batch_number <= ? AND is_final = 1;", - moduleHashes: "SELECT module_hash, module_name, chain, type FROM data_buffer WHERE batch_number <= ?;", + moduleHashes: `SELECT module_hash, module_name, chain, type, cursor AS latest_cursor, block_number AS latest_block_number, block_id AS latest_block_id + FROM data_buffer WHERE batch_number <= ?;`, sources: "SELECT DISTINCT source FROM data_buffer WHERE batch_number <= ?;", entityChanges: "SELECT entity_changes FROM data_buffer WHERE batch_number <= ? AND source = ?", }; @@ -31,7 +31,6 @@ class SQLite { private batchNumber; private selectBlocksStatement: Statement; - private selectCursorsStatement: Statement; private selectFinalBlocksStatement: Statement; private selectModuleHashesStatement: Statement; private selectSourcesStatement: Statement<{ source: string }, [number]>; @@ -49,7 +48,6 @@ class SQLite { this.batchNumber = this.initialBatchNumber; this.selectBlocksStatement = this.db.prepare(selectSQL.blocks); - this.selectCursorsStatement = this.db.prepare(selectSQL.cursors); this.selectFinalBlocksStatement = this.db.prepare(selectSQL.finalBlocks); this.selectModuleHashesStatement = this.db.prepare(selectSQL.moduleHashes); this.selectSourcesStatement = this.db.prepare(selectSQL.sources); @@ -80,7 +78,6 @@ class SQLite { public async commitBuffer( onData: ( blocks: unknown[], - cursors: unknown[], finalBlocks: unknown[], moduleHashes: unknown[], entityChanges: Record @@ -90,7 +87,6 @@ class SQLite { this.batchNumber++; const blocks = this.selectBlocksStatement.all(this.batchNumber); - const cursors = this.selectCursorsStatement.all(this.batchNumber); const finalBlocks = this.selectFinalBlocksStatement.all(this.batchNumber); const moduleHashes = this.selectModuleHashesStatement.all(this.batchNumber); const entityChanges: Record> = {}; @@ -104,7 +100,7 @@ class SQLite { } } - await onData(blocks, cursors, finalBlocks, moduleHashes, entityChanges); + await onData(blocks, finalBlocks, moduleHashes, entityChanges); this.deleteStatement.run(this.batchNumber); } catch (err) { return UnknownErr(err);