diff --git a/package.json b/package.json index 8d7d899..84ee74c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "substreams-sink-clickhouse", - "version": "0.3.5", + "version": "0.3.6", "description": "Substreams Clickhouse Sink", "type": "module", "homepage": "https://github.com/pinax-network/substreams-sink-clickhouse", diff --git a/src/buffer.ts b/src/buffer.ts index ddf52de..f8b64ee 100644 --- a/src/buffer.ts +++ b/src/buffer.ts @@ -9,8 +9,9 @@ export type Buffer = Map; const path = "buffer.txt"; const encoding = "utf-16le"; -const writer = fs.createWriteStream(path, {flags: "a", encoding}); +// create a write stream in "append" mode +let writer = fs.createWriteStream(path, {flags: "a", encoding}); export let inserts = 0; export function bulkInsert(rows: {table: string, values: Values}[]) { @@ -38,6 +39,8 @@ export async function read(): Promise { } }); rl.on("close", () => { + input.close(); + rl.close(); return resolve(buffer); }); rl.on("error", (err) => { @@ -46,17 +49,30 @@ export async function read(): Promise { }); } -export async function flush(verbose = false) { +export async function flush(verbose = false): Promise { if ( !fs.existsSync(path) ) return; + await close(); const buffer = await read(); for ( const [table, values] of buffer.entries() ) { await client.insert({table, values, format: "JSONEachRow"}) if ( verbose ) logger.info('[buffer::flush]', `\tinserted ${values.length} rows into ${table}`); - buffer.delete(table); inserts++; } - // clear the buffer - fs.createWriteStream(path, {encoding}); + // clear the buffer and overwrite existing writer in "write" mode + writer = fs.createWriteStream(path, {flags: "w", encoding}); +} + +export function close(): Promise { + return new Promise((resolve, reject) => { + if ( !writer ) return resolve(); + if ( writer.destroyed ) return resolve(); + if ( writer.closed ) return resolve(); + + writer.close((err) => { + if (err) return reject(err); + return resolve(); + }); + }); } export function count(buffer: Buffer) {