From ab4c3c6e41f34a83f75a61e163a52fb0dcb3e57a Mon Sep 17 00:00:00 2001 From: Ringo Yip Date: Wed, 1 Nov 2023 17:41:20 -0700 Subject: [PATCH] Encode with message indexes for ksql support --- src/wireDecoder.ts | 19 ++++++++++++++----- src/wireEncoder.ts | 16 +++++++++++++--- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/src/wireDecoder.ts b/src/wireDecoder.ts index 861d463..331ae79 100644 --- a/src/wireDecoder.ts +++ b/src/wireDecoder.ts @@ -1,5 +1,14 @@ -export default (buffer: Buffer) => ({ - magicByte: buffer.slice(0, 1), - registryId: buffer.slice(1, 5).readInt32BE(0), - payload: buffer.slice(5, buffer.length), -}) +export default (buffer: Buffer) => { + const magicByte = buffer.slice(0, 1) + const registryId = buffer.slice(1, 5).readInt32BE(0) + const messageIndexesLength = buffer.slice(5, 9).readInt32BE(0) + const messageIndexes = buffer.slice(9, 9 + messageIndexesLength) + const payload = buffer.slice(9 + messageIndexesLength) + + return { + magicByte, + registryId, + messageIndexes, + payload, + } +} \ No newline at end of file diff --git a/src/wireEncoder.ts b/src/wireEncoder.ts index e22002e..61b7d01 100644 --- a/src/wireEncoder.ts +++ b/src/wireEncoder.ts @@ -2,9 +2,19 @@ const DEFAULT_OFFSET = 0 export const MAGIC_BYTE = Buffer.alloc(1) -export const encode = (registryId: number, payload: Buffer) => { +export const encode = (registryId: number, payload: Buffer, messageIndexes: number[] = [0]) => { const registryIdBuffer = Buffer.alloc(4) registryIdBuffer.writeInt32BE(registryId, DEFAULT_OFFSET) - return Buffer.concat([MAGIC_BYTE, registryIdBuffer, payload]) -} + const messageIndexesBuffer = Buffer.from(messageIndexes) + const messageIndexesLength = Buffer.alloc(4) + messageIndexesLength.writeInt32BE(messageIndexesBuffer.length, DEFAULT_OFFSET) + + return Buffer.concat([ + MAGIC_BYTE, + registryIdBuffer, + messageIndexesLength, + messageIndexesBuffer, + payload, + ]) +} \ No newline at end of file