Skip to content

Commit

Permalink
Merge pull request #92 from pinax-network/feature/module-hashes-exten…
Browse files Browse the repository at this point in the history
…sion

Move cursors table in module_hashes
  • Loading branch information
DenisCarriere authored Nov 29, 2023
2 parents 20f921d + 26a4860 commit e82a788
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 184 deletions.
22 changes: 3 additions & 19 deletions src/clickhouse/handleSinkRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,19 @@ export async function handleSinkRequest({ data, ...metadata }: PayloadBody) {
}

export function saveKnownEntityChanges() {
return sqlite.commitBuffer(async (blocks, cursors, finalBlocks, moduleHashes, entityChanges) => {
return sqlite.commitBuffer(async (blocks, finalBlocks, moduleHashes, entityChanges) => {
if (moduleHashes.length > 0) {
await client.insert({
values: moduleHashes,
table: "module_hashes",
format: "JSONEachRow",
});
await client.insert({ values: moduleHashes, table: "module_hashes", format: "JSONEachRow" });
}

if (finalBlocks.length > 0) {
await client.insert({
values: finalBlocks,
table: "final_blocks",
format: "JSONEachRow",
});
await client.insert({ values: finalBlocks, table: "final_blocks", format: "JSONEachRow" });
}

if (blocks.length > 0) {
await client.insert({ values: blocks, table: "blocks", format: "JSONEachRow" });
}

if (cursors.length > 0) {
await client.insert({
values: cursors,
table: "cursors",
format: "JSONEachRow",
});
}

for (const [table, values] of Object.entries(entityChanges)) {
if (values.length > 0) {
// This check ensures that old stale data coming from SQLite
Expand Down
21 changes: 9 additions & 12 deletions src/clickhouse/stores.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { logger } from "../logger.js";
import { readOnlyClient } from "./createClient.js";

const hiddenTables = ["blocks", "module_hashes", "cursors", "final_blocks", "unparsed_json", "deleted_entity_changes"];

class ClickhouseStore {
public paused = false;

private chainsPromise: Promise<string[]> | null = null;
private publicTablesPromise: Promise<string[]> | null = null;
private moduleHashesPromises: Promise<string[]> | null = null;

private knownTables = new Map<string, boolean>();

Expand All @@ -23,17 +21,16 @@ class ClickhouseStore {
return this.chainsPromise;
}

public get publicTables() {
if (!this.publicTablesPromise) {
this.publicTablesPromise = readOnlyClient
.query({ query: "SHOW TABLES", format: "JSONEachRow" })
.then((response) => response.json<Array<{ name: string }>>())
.then((names) => names.map(({ name }) => name))
.then((names) => names.filter((table) => !hiddenTables.includes(table)))
public get moduleHashes() {
if (!this.moduleHashesPromises) {
this.moduleHashesPromises = readOnlyClient
.query({ query: "SELECT DISTINCT module_hash from module_hashes", format: "JSONEachRow" })
.then((response) => response.json<Array<{ module_hash: string }>>())
.then((moduleHashes) => moduleHashes.map(({ module_hash }) => module_hash))
.catch(() => []);
}

return this.publicTablesPromise;
return this.moduleHashesPromises;
}

// in memory TABLE name cache
Expand Down Expand Up @@ -63,7 +60,7 @@ class ClickhouseStore {

public reset() {
this.chainsPromise = null;
this.publicTablesPromise = null;
this.moduleHashesPromises = null;
this.knownTables.clear();
logger.info("Cache has been cleared");
}
Expand Down
10 changes: 0 additions & 10 deletions src/clickhouse/tables/cursors.sql

This file was deleted.

3 changes: 0 additions & 3 deletions src/clickhouse/tables/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import blocks_sql from "./blocks.sql";
import cursors_sql from "./cursors.sql";
import deleted_entity_changes_sql from "./deleted_entity_changes.sql";
import final_blocks_sql from "./final_blocks.sql";
import module_hashes_sql from "./module_hashes.sql";
Expand All @@ -9,14 +8,12 @@ export const blocks = await Bun.file(blocks_sql).text();
export const final_blocks = await Bun.file(final_blocks_sql).text();
export const module_hashes = await Bun.file(module_hashes_sql).text();
export const unparsed_json = await Bun.file(unparsed_json_sql).text();
export const cursors = await Bun.file(cursors_sql).text();
export const deleted_entity_changes = await Bun.file(deleted_entity_changes_sql).text();

export default [
["blocks", blocks],
["final_blocks", final_blocks],
["module_hashes", module_hashes],
["unparsed_json", unparsed_json],
["cursors", cursors],
["deleted_entity_changes", deleted_entity_changes],
];
11 changes: 7 additions & 4 deletions src/clickhouse/tables/module_hashes.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
CREATE TABLE IF NOT EXISTS module_hashes (
module_hash FixedString(40),
module_name String(),
chain LowCardinality(String),
type String(),
module_hash FixedString(40),
module_name String,
chain LowCardinality(String),
type String,
latest_cursor String,
latest_block_number UInt32,
latest_block_id FixedString(64),
)
ENGINE = ReplacingMergeTree
ORDER BY (module_hash, chain);
3 changes: 1 addition & 2 deletions src/fetch/GET.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import swaggerHtml from "../../swagger/index.html";
import { metrics } from "../prometheus.js";
import { blocks } from "./blocks.js";
import { NotFound, toFile, toJSON } from "./cors.js";
import { findCursorsForMissingBlocks, findLatestCursor } from "./cursors.js";
import { findLatestCursor } from "./cursors.js";
import health from "./health.js";
import { openapi } from "./openapi.js";

Expand All @@ -18,7 +18,6 @@ export default async function (req: Request) {
if (pathname === "/openapi") return toJSON(await openapi());
if (pathname === "/blocks") return blocks();
if (pathname === "/cursors/latest") return findLatestCursor(req);
if (pathname === "/cursors/missing") return findCursorsForMissingBlocks(req);

return NotFound;
}
98 changes: 14 additions & 84 deletions src/fetch/cursors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ export async function findLatestCursor(req: Request): Promise<Response> {
}

try {
const { table, chain } = parametersResult.payload;
const { moduleHash, chain } = parametersResult.payload;

const query = `
SELECT cursor, timestamp
FROM ${table}
WHERE chain = '${chain}'
ORDER BY timestamp DESC
SELECT latest_cursor, latest_block_number
FROM module_hashes
WHERE chain = '${chain}' AND module_hash = '${moduleHash}'
LIMIT 1`;

const response = await readOnlyClient.query({ query, format: "JSONEachRow" });
Expand All @@ -27,104 +26,35 @@ export async function findLatestCursor(req: Request): Promise<Response> {
return toJSON(data[0]);
}

return toText(`Bad request: no cursor found for '${table}' on '${chain}'.`, 400);
return toText(`Bad request: no cursor found for '${moduleHash}' on '${chain}'.`, 400);
} catch (err) {
logger.error(err);
}
return BadRequest;
}

export async function findCursorsForMissingBlocks(req: Request): Promise<Response> {
const parametersResult = await verifyParameters(req);
if (!parametersResult.success) {
return parametersResult.error;
}

try {
const { table, chain } = parametersResult.payload;

const moduleHash = await getModuleHash(table, chain);
if (!moduleHash) {
return toText("Could not find module hash associated with table and chain", 500);
}

// This query finds every block that does not have a next block (beginning of a missing range).
// It then pairs it with the first existing block after it (end of missing range).
// If the start block is the last one in the database (max value), it is ignored.
// When every range is found, they are joined with the 'cursors' table
// to find which cursor is associated with the min and max boundaries.
// The module_hash and the chain are used to determine the correct values to use.
const query = `
SELECT block_ranges.from AS from_block_number, c1.cursor AS from_cursor, block_ranges.to AS to_block_number, c2.cursor AS to_cursor
FROM (
SELECT c1.block_number AS from, MIN(c2.block_number) AS to
FROM cursors c1, cursors c2, (
SELECT MAX(block_number) AS block_number
FROM cursors
WHERE chain = '${chain}' AND module_hash = '${moduleHash}'
) AS maximum
WHERE c1.block_number + 1 NOT IN (SELECT block_number FROM cursors WHERE chain = '${chain}' AND module_hash = '${moduleHash}')
AND c1.chain = '${chain}'
AND c2.chain = '${chain}'
AND c1.block_number <> maximum.block_number
AND c2.block_number > c1.block_number
AND c1.module_hash = '${moduleHash}'
AND c2.module_hash = '${moduleHash}'
GROUP BY c1.block_number
) AS block_ranges
JOIN cursors c1 ON block_ranges.from = c1.block_number AND c1.chain = '${chain}' AND c1.module_hash = '${moduleHash}'
JOIN cursors c2 ON block_ranges.to = c2.block_number AND c2.chain = '${chain}' AND c2.module_hash = '${moduleHash}'`;

const response = await readOnlyClient.query({ query, format: "JSONEachRow" });
const data = await response.json<
Array<{
from_block_number: number;
from_cursor: string;
to_block_number: number;
to_cursor: string;
}>
>();

const dto = data.map((record) => ({
from: { block: record.from_block_number, cursor: record.from_cursor },
to: { block: record.to_block_number, cursor: record.to_cursor },
}));

return toJSON(dto);
} catch (err) {
logger.error(err);
}

return BadRequest;
}

async function verifyParameters(req: Request): Promise<Result<{ chain: string; table: string }, Response>> {
async function verifyParameters(req: Request): Promise<Result<{ chain: string; moduleHash: string }, Response>> {
const url = new URL(req.url);
const chain = url.searchParams.get("chain");
const table = url.searchParams.get("table");
const moduleHash = url.searchParams.get("module_hash");

if (!chain) {
return Err(toText("Missing parameter: chain", 400));
}

if (!table) {
return Err(toText("Missing parameter: table", 400));
if (!moduleHash) {
return Err(toText("Missing parameter: module_hash", 400));
}

if (!(await store.chains).includes(chain)) {
store.reset();
return Err(toText("Invalid parameter: chain=" + chain, 400));
}

if (!(await store.publicTables).includes(table)) {
return Err(toText("Invalid parameter: table=" + table, 400));
if (!(await store.moduleHashes).includes(moduleHash)) {
store.reset();
return Err(toText("Invalid parameter: moduleHash=" + moduleHash, 400));
}

return Ok({ chain, table });
}

async function getModuleHash(table: string, chain: string): Promise<string | null> {
const query = `SELECT module_hash FROM ${table} WHERE chain = '${chain}'`;
const response = await readOnlyClient.query({ query, format: "JSONEachRow" });
const data = await response.json<Array<{ module_hash: string }>>();
return data[0]?.module_hash ?? null;
return Ok({ chain, moduleHash });
}
48 changes: 5 additions & 43 deletions src/fetch/openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,9 @@ export async function openapi() {
put: {
tags: [TAGS.USAGE],
summary: "Initialize the sink according to a SQL schema",
description:
"Supports `CREATE TABLE` statements<br/>If an url is passed in, the body will not be executed.",
description: "Supports `CREATE TABLE` statements<br/>If an url is passed in, the body will not be executed.",
security: [{ "auth-key": [] }],
parameters: [
{ required: false, in: "query", name: "schema-url", schema: { type: "string" } },
],
parameters: [{ required: false, in: "query", name: "schema-url", schema: { type: "string" } }],
requestBody: {
content: {
"text/plain": {
Expand Down Expand Up @@ -99,9 +96,7 @@ export async function openapi() {
url: "https://thegraph.com/docs/en/developing/creating-a-subgraph/#built-in-scalar-types",
},
security: [{ "auth-key": [] }],
parameters: [
{ required: false, in: "query", name: "schema-url", schema: { type: "string" } },
],
parameters: [{ required: false, in: "query", name: "schema-url", schema: { type: "string" } }],
requestBody: {
content: {
"text/plain": {
Expand Down Expand Up @@ -198,10 +193,10 @@ export async function openapi() {
parameters: [
{ name: "chain", in: "query", required: true, schema: { enum: await store.chains } },
{
name: "table",
name: "module_hash",
in: "query",
required: true,
schema: { enum: await store.publicTables },
schema: { enum: await store.moduleHashes },
},
],
responses: {
Expand All @@ -217,39 +212,6 @@ export async function openapi() {
},
},
})
.addPath("/cursors/missing", {
get: {
tags: [TAGS.QUERIES],
summary: "Finds the missing blocks and returns the start and end cursors",
parameters: [
{ name: "chain", in: "query", required: true, schema: { enum: await store.chains } },
{
name: "table",
in: "query",
required: true,
schema: { enum: await store.publicTables },
},
],
responses: {
200: {
description: "Success",
content: {
"application/json": {
schema: zodToJsonSchema(
z.array(
z.object({
from: z.object({ block: z.number(), cursor: z.string() }),
to: z.object({ block: z.number(), cursor: z.string() }),
})
)
),
},
},
},
400: PUT_RESPONSES[400],
},
},
})
.addPath("/blocks", {
get: {
tags: [TAGS.QUERIES],
Expand Down
Loading

0 comments on commit e82a788

Please sign in to comment.