Skip to content

Commit

Permalink
close file after read
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Feb 28, 2024
1 parent f71d0d9 commit 863d397
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "substreams-sink-clickhouse",
"version": "0.3.5",
"version": "0.3.6",
"description": "Substreams Clickhouse Sink",
"type": "module",
"homepage": "https://github.com/pinax-network/substreams-sink-clickhouse",
Expand Down
26 changes: 21 additions & 5 deletions src/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ export type Buffer = Map<string, Values[]>;

const path = "buffer.txt";
const encoding = "utf-16le";
const writer = fs.createWriteStream(path, {flags: "a", encoding});

// create a write stream in "append" mode
let writer = fs.createWriteStream(path, {flags: "a", encoding});
export let inserts = 0;

export function bulkInsert(rows: {table: string, values: Values}[]) {
Expand Down Expand Up @@ -38,6 +39,8 @@ export async function read(): Promise<Buffer> {
}
});
rl.on("close", () => {
input.close();
rl.close();
return resolve(buffer);
});
rl.on("error", (err) => {
Expand All @@ -46,17 +49,30 @@ export async function read(): Promise<Buffer> {
});
}

export async function flush(verbose = false) {
export async function flush(verbose = false): Promise<void> {
if ( !fs.existsSync(path) ) return;
await close();
const buffer = await read();
for ( const [table, values] of buffer.entries() ) {
await client.insert({table, values, format: "JSONEachRow"})
if ( verbose ) logger.info('[buffer::flush]', `\tinserted ${values.length} rows into ${table}`);
buffer.delete(table);
inserts++;
}
// clear the buffer
fs.createWriteStream(path, {encoding});
// clear the buffer and overwrite existing writer in "write" mode
writer = fs.createWriteStream(path, {flags: "w", encoding});
}

export function close(): Promise<void> {
return new Promise((resolve, reject) => {
if ( !writer ) return resolve();
if ( writer.destroyed ) return resolve();
if ( writer.closed ) return resolve();

writer.close((err) => {
if (err) return reject(err);
return resolve();
});
});
}

export function count(buffer: Buffer) {
Expand Down

0 comments on commit 863d397

Please sign in to comment.