diff --git a/package.json b/package.json index b145850..3e14579 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "substreams-sink-clickhouse", - "version": "0.3.8", + "version": "0.3.9", "description": "Substreams Clickhouse Sink", "type": "module", "homepage": "https://github.com/pinax-network/substreams-sink-clickhouse", diff --git a/src/buffer.ts b/src/buffer.ts index 90d8b72..9cc7079 100644 --- a/src/buffer.ts +++ b/src/buffer.ts @@ -1,4 +1,3 @@ -import readline from "readline"; import fs from "fs"; import { client } from "./clickhouse/createClient.js"; import { logger } from "./logger.js"; @@ -9,23 +8,21 @@ export type Buffer = Map; const path = "buffer.txt"; const encoding = "utf-8"; +const highWaterMark = 1024 * 1024; // 1MB -// create a write stream in "append" mode -let writer = fs.createWriteStream(path, {flags: "a", encoding}); +// create a Bun writer to incementally write to the buffer file +// https://bun.sh/guides/write-file/filesink +let writer = Bun.file(path).writer({ highWaterMark }); // 1MB export let inserts = 0; export function bulkInsert(rows: {table: string, values: Values}[]) { return Promise.all(rows.map(({table, values}) => insert(table, values))); } -export function insert(table: string, values: Values): Promise { +export async function insert(table: string, values: Values): Promise { store.check_table(table); - return new Promise((resolve, reject) => { - writer.write(JSON.stringify({table, values}) + "\n", (err) => { - if (err) return reject(err); - return resolve(); - }); - }); + writer.write(JSON.stringify({table, values}) + "\n"); + await writer.flush(); } export async function read(): Promise { @@ -46,18 +43,17 @@ export async function read(): Promise { } export async function flush(verbose = false): Promise { - if ( !fs.existsSync(path) ) { - writer = fs.createWriteStream(path, {flags: "w", encoding}); - return; - } + if ( !fs.existsSync(path) ) return; // nothing to flush const buffer = await read(); for ( const [table, values] of buffer.entries() ) { await client.insert({table, values, format: "JSONEachRow"}) if ( verbose ) logger.info('[buffer::flush]', `\tinserted ${values.length} rows into ${table}`); inserts++; } - // clear the buffer and overwrite existing writer in "write" mode - writer = fs.createWriteStream(path, {flags: "w", encoding}); + // erase the buffer and overwrite existing writer in "write" mode + await writer.end(); + fs.rmSync(path); + writer = Bun.file(path).writer({ highWaterMark }); } export function count(buffer: Buffer) {