From 5f172d6dd446d1f0d656a61e5908b52ceeca87f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Ledwo=C5=84?= Date: Thu, 23 Jan 2025 11:47:08 +0100 Subject: [PATCH] refactor: record events in memory in mrbv2 (#27734) --- plugin-server/src/config/config.ts | 4 + .../batch-consumer-factory.ts | 6 +- .../session-recording-v2/consumer.ts | 175 ++++--- .../kafka/{parser.ts => message-parser.ts} | 9 +- .../kafka/offset-manager.ts | 73 +++ .../blackhole-session-batch-writer.ts | 12 + .../sessions/promise-queue.ts | 40 ++ .../session-recording-v2/sessions/recorder.ts | 32 ++ .../sessions/session-batch-manager.ts | 65 +++ .../sessions/session-batch-recorder.ts | 85 +++ .../session-recording-v2/teams/team-filter.ts | 49 +- .../versions/lib-version-monitor.ts | 17 +- plugin-server/src/types.ts | 3 + .../kafka/message-parser.test.ts | 219 ++++++++ .../kafka/offset-manager.test.ts | 198 +++++++ .../session-recording-v2/kafka/parser.test.ts | 165 ------ .../sessions/recorder.test.ts | 228 ++++++++ .../sessions/session-batch-manager.test.ts | 330 ++++++++++++ .../sessions/session-batch-recorder.test.ts | 490 ++++++++++++++++++ .../teams/team-filter.test.ts | 172 ++---- .../versions/lib-version-monitor.test.ts | 226 +++----- 21 files changed, 2027 insertions(+), 571 deletions(-) rename plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/{parser.ts => message-parser.ts} (88%) create mode 100644 plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/offset-manager.ts create mode 100644 plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/blackhole-session-batch-writer.ts create mode 100644 plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/promise-queue.ts create mode 100644 plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/recorder.ts create mode 100644 plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.ts create mode 100644 plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts create mode 100644 plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts create mode 100644 plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/offset-manager.test.ts delete mode 100644 plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/parser.test.ts create mode 100644 plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/recorder.test.ts create mode 100644 plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.test.ts create mode 100644 plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 73ec570062725..c610c12ef23ea 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -213,6 +213,10 @@ export function getDefaultConfig(): PluginsServerConfig { INGESTION_CONSUMER_CONSUME_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION, INGESTION_CONSUMER_OVERFLOW_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, INGESTION_CONSUMER_DLQ_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, + + // Session recording V2 + SESSION_RECORDING_MAX_BATCH_SIZE_KB: 100 * 1024, // 100MB + SESSION_RECORDING_MAX_BATCH_AGE_MS: 10 * 1000, // 10 seconds } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/batch-consumer-factory.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/batch-consumer-factory.ts index b9afcee0613fc..9df26b6a213fd 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/batch-consumer-factory.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/batch-consumer-factory.ts @@ -36,9 +36,9 @@ export class DefaultBatchConsumerFactory implements BatchConsumerFactory { groupId, topic, eachBatch, - callEachBatchWhenEmpty: true, // Useful as we will still want to account for flushing sessions - autoCommit: true, - autoOffsetStore: true, // TODO: remove this once we implement our own offset store logic + callEachBatchWhenEmpty: true, // Required, as we want to flush session batches periodically + autoCommit: false, + autoOffsetStore: false, sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, maxPollIntervalMs: this.serverConfig.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, // the largest size of a message that can be fetched by the consumer. diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts index 183060b449372..0f1253e9e4780 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/consumer.ts @@ -17,14 +17,17 @@ import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW, } from './constants' +import { KafkaMessageParser } from './kafka/message-parser' import { KafkaMetrics } from './kafka/metrics' -import { KafkaParser } from './kafka/parser' +import { KafkaOffsetManager } from './kafka/offset-manager' import { SessionRecordingMetrics } from './metrics' import { PromiseScheduler } from './promise-scheduler' +import { BlackholeSessionBatchWriter } from './sessions/blackhole-session-batch-writer' +import { SessionBatchManager } from './sessions/session-batch-manager' +import { SessionBatchRecorder, SessionBatchRecorderInterface } from './sessions/session-batch-recorder' import { TeamFilter } from './teams/team-filter' import { TeamService } from './teams/team-service' import { MessageWithTeam } from './teams/types' -import { BatchMessageProcessor } from './types' import { CaptureIngestionWarningFn } from './types' import { getPartitionsForTopic } from './utils' import { LibVersionMonitor } from './versions/lib-version-monitor' @@ -41,10 +44,13 @@ export class SessionRecordingIngester { isStopping = false private isDebugLoggingEnabled: ValueMatcher - private readonly messageProcessor: BatchMessageProcessor private readonly metrics: SessionRecordingMetrics private readonly promiseScheduler: PromiseScheduler private readonly batchConsumerFactory: BatchConsumerFactory + private readonly sessionBatchManager: SessionBatchManager + private readonly kafkaParser: KafkaMessageParser + private readonly teamFilter: TeamFilter + private readonly libVersionMonitor?: LibVersionMonitor constructor( private config: PluginsServerConfig, @@ -52,32 +58,43 @@ export class SessionRecordingIngester { batchConsumerFactory: BatchConsumerFactory, ingestionWarningProducer?: KafkaProducerWrapper ) { - this.isDebugLoggingEnabled = buildIntegerMatcher(config.SESSION_RECORDING_DEBUG_PARTITION, true) - const kafkaMetrics = KafkaMetrics.getInstance() - const kafkaParser = new KafkaParser(kafkaMetrics) - const teamService = new TeamService() - this.metrics = SessionRecordingMetrics.getInstance() - this.promiseScheduler = new PromiseScheduler() + this.topic = consumeOverflow + ? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW + : KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS this.batchConsumerFactory = batchConsumerFactory - const teamFilter = new TeamFilter(teamService, kafkaParser) - this.messageProcessor = teamFilter + this.isDebugLoggingEnabled = buildIntegerMatcher(config.SESSION_RECORDING_DEBUG_PARTITION, true) + + this.promiseScheduler = new PromiseScheduler() + this.kafkaParser = new KafkaMessageParser(KafkaMetrics.getInstance()) + this.teamFilter = new TeamFilter(new TeamService()) if (ingestionWarningProducer) { const captureWarning: CaptureIngestionWarningFn = async (teamId, type, details, debounce) => { await captureIngestionWarning(ingestionWarningProducer, teamId, type, details, debounce) } - - this.messageProcessor = new LibVersionMonitor( - teamFilter, - captureWarning, - VersionMetrics.getInstance() - ) + this.libVersionMonitor = new LibVersionMonitor(captureWarning, VersionMetrics.getInstance()) } - this.topic = consumeOverflow - ? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW - : KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS + this.metrics = SessionRecordingMetrics.getInstance() + + const offsetManager = new KafkaOffsetManager(async (offsets) => { + await new Promise((resolve, reject) => { + try { + this.batchConsumer!.consumer.commitSync(offsets) + resolve() + } catch (error) { + reject(error) + } + }) + }, this.topic) + this.sessionBatchManager = new SessionBatchManager({ + maxBatchSizeBytes: (config.SESSION_RECORDING_MAX_BATCH_SIZE_KB ?? 0) * 1024, + maxBatchAgeMs: config.SESSION_RECORDING_MAX_BATCH_AGE_MS ?? 1000, + createBatch: () => new SessionBatchRecorder(new BlackholeSessionBatchWriter()), + offsetManager, + }) + this.consumerGroupId = this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID } @@ -90,6 +107,24 @@ export class SessionRecordingIngester { } } + public async handleEachBatch(messages: Message[], context: { heartbeat: () => void }): Promise { + context.heartbeat() + + if (messages.length > 0) { + logger.info('🔁', `blob_ingester_consumer_v2 - handling batch`, { + size: messages.length, + partitionsInBatch: [...new Set(messages.map((x) => x.partition))], + assignedPartitions: this.assignedPartitions, + }) + } + + await runInstrumentedFunction({ + statsKey: `recordingingesterv2.handleEachBatch`, + sendTimeoutGuardToSentry: false, + func: async () => this.processBatchMessages(messages, context), + }) + } + private async processBatchMessages(messages: Message[], context: { heartbeat: () => void }): Promise { // Increment message received counter for each message messages.forEach((message) => { @@ -98,50 +133,71 @@ export class SessionRecordingIngester { const batchSize = messages.length const batchSizeKb = messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024 - this.metrics.observeKafkaBatchSize(batchSize) this.metrics.observeKafkaBatchSizeKb(batchSizeKb) - const parsedMessages = await runInstrumentedFunction({ + const processedMessages = await runInstrumentedFunction({ statsKey: `recordingingesterv2.handleEachBatch.parseBatch`, func: async () => { - return this.messageProcessor.parseBatch(messages) + const parsedMessages = await this.kafkaParser.parseBatch(messages) + const messagesWithTeam = await this.teamFilter.filterBatch(parsedMessages) + const processedMessages = this.libVersionMonitor + ? await this.libVersionMonitor.processBatch(messagesWithTeam) + : messagesWithTeam + return processedMessages }, }) + context.heartbeat() await runInstrumentedFunction({ statsKey: `recordingingesterv2.handleEachBatch.processMessages`, - func: async () => this.processMessages(parsedMessages), + func: async () => this.processMessages(processedMessages, context.heartbeat), }) } - private async processMessages(parsedMessages: MessageWithTeam[]): Promise { - if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) { - await Promise.all(parsedMessages.map((m) => this.consume(m))) - } else { + private async processMessages(parsedMessages: MessageWithTeam[], heartbeat: () => void) { + await this.sessionBatchManager.withBatch(async (batch) => { for (const message of parsedMessages) { - await this.consume(message) + this.consume(message, batch) } - } + return Promise.resolve() + }) + + heartbeat() + + await this.sessionBatchManager.flushIfNeeded() } - public async handleEachBatch(messages: Message[], context: { heartbeat: () => void }): Promise { - context.heartbeat() + private consume(message: MessageWithTeam, batch: SessionBatchRecorderInterface) { + // we have to reset this counter once we're consuming messages since then we know we're not re-balancing + // otherwise the consumer continues to report however many sessions were revoked at the last re-balance forever + this.metrics.resetSessionsRevoked() + const { team, message: parsedMessage } = message + const debugEnabled = this.isDebugLoggingEnabled(parsedMessage.metadata.partition) - if (messages.length > 0) { - logger.info('🔁', `blob_ingester_consumer_v2 - handling batch`, { - size: messages.length, - partitionsInBatch: [...new Set(messages.map((x) => x.partition))], - assignedPartitions: this.assignedPartitions, + if (debugEnabled) { + logger.debug('🔄', 'processing_session_recording', { + partition: parsedMessage.metadata.partition, + offset: parsedMessage.metadata.offset, + distinct_id: parsedMessage.distinct_id, + session_id: parsedMessage.session_id, + raw_size: parsedMessage.metadata.rawSize, }) } - await runInstrumentedFunction({ - statsKey: `recordingingesterv2.handleEachBatch`, - sendTimeoutGuardToSentry: false, - func: async () => this.processBatchMessages(messages, context), - }) + const { partition } = parsedMessage.metadata + const isDebug = this.isDebugLoggingEnabled(partition) + if (isDebug) { + logger.info('🔁', '[blob_ingester_consumer_v2] - [PARTITION DEBUG] - consuming event', { + ...parsedMessage.metadata, + team_id: team.teamId, + session_id: parsedMessage.session_id, + }) + } + + this.metrics.observeSessionInfo(parsedMessage.metadata.rawSize) + batch.record(message) } public async start(): Promise { @@ -233,38 +289,6 @@ export class SessionRecordingIngester { return this.assignedTopicPartitions.map((x) => x.partition) } - private async consume(messageWithTeam: MessageWithTeam): Promise { - // we have to reset this counter once we're consuming messages since then we know we're not re-balancing - // otherwise the consumer continues to report however many sessions were revoked at the last re-balance forever - this.metrics.resetSessionsRevoked() - const { team, message } = messageWithTeam - const debugEnabled = this.isDebugLoggingEnabled(message.metadata.partition) - - if (debugEnabled) { - logger.debug('🔄', 'processing_session_recording', { - partition: message.metadata.partition, - offset: message.metadata.offset, - distinct_id: message.distinct_id, - session_id: message.session_id, - raw_size: message.metadata.rawSize, - }) - } - - const { partition } = message.metadata - const isDebug = this.isDebugLoggingEnabled(partition) - if (isDebug) { - logger.info('🔁', '[blob_ingester_consumer_v2] - [PARTITION DEBUG] - consuming event', { - ...message.metadata, - team_id: team.teamId, - session_id: message.session_id, - }) - } - - this.metrics.observeSessionInfo(message.metadata.rawSize) - - return Promise.resolve() - } - private async onRevokePartitions(topicPartitions: TopicPartition[]): Promise { /** * The revoke_partitions indicates that the consumer group has had partitions revoked. @@ -277,7 +301,6 @@ export class SessionRecordingIngester { } this.metrics.resetSessionsHandled() - - return Promise.resolve() + await this.sessionBatchManager.discardPartitions(revokedPartitions) } } diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/parser.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.ts similarity index 88% rename from plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/parser.ts rename to plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.ts index 6fc0d4f5fc514..25a29bb89c316 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/parser.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/message-parser.ts @@ -10,10 +10,15 @@ import { ParsedMessageData } from './types' const GZIP_HEADER = Uint8Array.from([0x1f, 0x8b, 0x08, 0x00]) const decompressWithGzip = promisify(gunzip) -export class KafkaParser { +export class KafkaMessageParser { constructor(private readonly metrics: KafkaMetrics) {} - public async parseMessage(message: Message): Promise { + public async parseBatch(messages: Message[]): Promise { + const parsedMessages = await Promise.all(messages.map((message) => this.parseMessage(message))) + return parsedMessages.filter((msg) => msg !== null) as ParsedMessageData[] + } + + private async parseMessage(message: Message): Promise { const dropMessage = (reason: string, extra?: Record) => { this.metrics.incrementMessageDropped('session_recordings_blob_ingestion', reason) diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/offset-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/offset-manager.ts new file mode 100644 index 0000000000000..a43c09e90c3fe --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/kafka/offset-manager.ts @@ -0,0 +1,73 @@ +import { TopicPartitionOffset } from 'node-rdkafka' + +import { SessionBatchRecorderInterface } from '../sessions/session-batch-recorder' +import { MessageWithTeam } from '../teams/types' + +interface PartitionOffset { + partition: number + offset: number +} + +type CommitOffsetsCallback = (offsets: TopicPartitionOffset[]) => Promise + +class OffsetTrackingSessionBatchRecorderWrapper implements SessionBatchRecorderInterface { + constructor( + private readonly recorder: SessionBatchRecorderInterface, + private readonly offsetManager: KafkaOffsetManager + ) {} + + public record(message: MessageWithTeam): number { + const bytesWritten = this.recorder.record(message) + this.offsetManager.trackOffset(message.message.metadata) + return bytesWritten + } + + public async flush(): Promise { + await this.recorder.flush() + } + + public discardPartition(partition: number): void { + this.recorder.discardPartition(partition) + this.offsetManager.discardPartition(partition) + } + + public get size(): number { + return this.recorder.size + } +} + +export class KafkaOffsetManager { + private partitionOffsets: Map = new Map() + + constructor(private readonly commitOffsets: CommitOffsetsCallback, private readonly topic: string) {} + + public wrapBatch(recorder: SessionBatchRecorderInterface): SessionBatchRecorderInterface { + return new OffsetTrackingSessionBatchRecorderWrapper(recorder, this) + } + + public trackOffset({ partition, offset }: PartitionOffset): void { + // We track the next offset to process + this.partitionOffsets.set(partition, offset + 1) + } + + public discardPartition(partition: number): void { + this.partitionOffsets.delete(partition) + } + + public async commit(): Promise { + const topicPartitionOffsets: TopicPartitionOffset[] = [] + + for (const [partition, offset] of this.partitionOffsets.entries()) { + topicPartitionOffsets.push({ + topic: this.topic, + partition, + offset, + }) + } + + if (topicPartitionOffsets.length > 0) { + await this.commitOffsets(topicPartitionOffsets) + this.partitionOffsets.clear() + } + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/blackhole-session-batch-writer.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/blackhole-session-batch-writer.ts new file mode 100644 index 0000000000000..69ccffad3a66d --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/blackhole-session-batch-writer.ts @@ -0,0 +1,12 @@ +import { PassThrough } from 'stream' + +import { SessionBatchWriter, StreamWithFinish } from './session-batch-recorder' + +export class BlackholeSessionBatchWriter implements SessionBatchWriter { + public async open(): Promise { + return Promise.resolve({ + stream: new PassThrough(), + finish: async () => Promise.resolve(), + }) + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/promise-queue.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/promise-queue.ts new file mode 100644 index 0000000000000..8093f22905f75 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/promise-queue.ts @@ -0,0 +1,40 @@ +type QueuedCallback = { + callback: () => Promise + resolve: (value: R) => void + reject: (error: unknown) => void +} + +export class PromiseQueue { + private callbackQueue: QueuedCallback[] = [] + private isExecuting = false + + constructor() {} + + public async add(callback: () => Promise): Promise { + return new Promise((resolve, reject) => { + this.callbackQueue.push({ callback, resolve, reject }) + process.nextTick(() => this.processNextCallback()) + }) + } + + private async processNextCallback(): Promise { + if (this.isExecuting || this.callbackQueue.length === 0) { + return + } + + this.isExecuting = true + const { callback, resolve, reject } = this.callbackQueue.shift()! + + try { + const result = await callback() + resolve(result) + } catch (error) { + reject(error) + } finally { + this.isExecuting = false + if (this.callbackQueue.length > 0) { + process.nextTick(() => this.processNextCallback()) + } + } + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/recorder.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/recorder.ts new file mode 100644 index 0000000000000..ea611fe745bcc --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/recorder.ts @@ -0,0 +1,32 @@ +import { Writable } from 'stream' + +import { ParsedMessageData } from '../kafka/types' + +export class SessionRecorder { + private chunks: string[] = [] + private size: number = 0 + + public recordMessage(message: ParsedMessageData): number { + let bytesWritten = 0 + + Object.entries(message.eventsByWindowId).forEach(([windowId, events]) => { + events.forEach((event) => { + const serializedLine = JSON.stringify([windowId, event]) + '\n' + this.chunks.push(serializedLine) + bytesWritten += Buffer.byteLength(serializedLine) + }) + }) + + this.size += bytesWritten + return bytesWritten + } + + public async dump(stream: Writable): Promise { + for (const chunk of this.chunks) { + if (!stream.write(chunk)) { + // Handle backpressure + await new Promise((resolve) => stream.once('drain', resolve)) + } + } + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.ts new file mode 100644 index 0000000000000..110280c66a298 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.ts @@ -0,0 +1,65 @@ +import { KafkaOffsetManager } from '../kafka/offset-manager' +import { PromiseQueue } from './promise-queue' +import { SessionBatchRecorderInterface } from './session-batch-recorder' + +export interface SessionBatchManagerConfig { + maxBatchSizeBytes: number + maxBatchAgeMs: number + createBatch: () => SessionBatchRecorderInterface + offsetManager: KafkaOffsetManager +} + +export class SessionBatchManager { + private currentBatch: SessionBatchRecorderInterface + private queue: PromiseQueue + private readonly maxBatchSizeBytes: number + private readonly maxBatchAgeMs: number + private readonly createBatch: () => SessionBatchRecorderInterface + private readonly offsetManager: KafkaOffsetManager + private lastFlushTime: number + + constructor(config: SessionBatchManagerConfig) { + this.maxBatchSizeBytes = config.maxBatchSizeBytes + this.maxBatchAgeMs = config.maxBatchAgeMs + this.createBatch = config.createBatch + this.offsetManager = config.offsetManager + this.currentBatch = this.offsetManager.wrapBatch(this.createBatch()) + this.queue = new PromiseQueue() + this.lastFlushTime = Date.now() + } + + public async withBatch(callback: (batch: SessionBatchRecorderInterface) => Promise): Promise { + return this.queue.add(() => callback(this.currentBatch)) + } + + public async flush(): Promise { + return this.queue.add(async () => { + await this.rotateBatch() + }) + } + + public async flushIfNeeded(): Promise { + return this.queue.add(async () => { + const timeSinceLastFlush = Date.now() - this.lastFlushTime + if (this.currentBatch.size >= this.maxBatchSizeBytes || timeSinceLastFlush >= this.maxBatchAgeMs) { + await this.rotateBatch() + } + }) + } + + public async discardPartitions(partitions: number[]): Promise { + return this.queue.add(async () => { + for (const partition of partitions) { + this.currentBatch.discardPartition(partition) + } + return Promise.resolve() + }) + } + + private async rotateBatch(): Promise { + await this.currentBatch.flush() + await this.offsetManager.commit() + this.currentBatch = this.offsetManager.wrapBatch(this.createBatch()) + this.lastFlushTime = Date.now() + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts new file mode 100644 index 0000000000000..1815f89ff606c --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.ts @@ -0,0 +1,85 @@ +import { Writable } from 'stream' + +import { MessageWithTeam } from '../teams/types' +import { SessionRecorder } from './recorder' + +export interface StreamWithFinish { + stream: Writable + finish: () => Promise +} + +export interface SessionBatchWriter { + open(): Promise +} + +export interface SessionBatchRecorderInterface { + record(message: MessageWithTeam): number + flush(): Promise + discardPartition(partition: number): void + readonly size: number +} + +export class SessionBatchRecorder implements SessionBatchRecorderInterface { + private readonly partitionSessions = new Map>() + private readonly partitionSizes = new Map() + private _size: number = 0 + + constructor(private readonly writer: SessionBatchWriter) {} + + public record(message: MessageWithTeam): number { + const { partition } = message.message.metadata + const sessionId = message.message.session_id + + if (!this.partitionSessions.has(partition)) { + this.partitionSessions.set(partition, new Map()) + this.partitionSizes.set(partition, 0) + } + + const sessions = this.partitionSessions.get(partition)! + if (!sessions.has(sessionId)) { + sessions.set(sessionId, new SessionRecorder()) + } + + const recorder = sessions.get(sessionId)! + const bytesWritten = recorder.recordMessage(message.message) + + // Update both partition size and total size + const currentPartitionSize = this.partitionSizes.get(partition)! + this.partitionSizes.set(partition, currentPartitionSize + bytesWritten) + this._size += bytesWritten + + return bytesWritten + } + + public discardPartition(partition: number): void { + const partitionSize = this.partitionSizes.get(partition) + if (partitionSize) { + this._size -= partitionSize + this.partitionSizes.delete(partition) + this.partitionSessions.delete(partition) + } + } + + public async flush(): Promise { + const { stream, finish } = await this.writer.open() + + // Flush sessions grouped by partition + for (const sessions of this.partitionSessions.values()) { + for (const recorder of sessions.values()) { + await recorder.dump(stream) + } + } + + stream.end() + await finish() + + // Clear sessions, partition sizes, and total size after successful flush + this.partitionSessions.clear() + this.partitionSizes.clear() + this._size = 0 + } + + public get size(): number { + return this._size + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording-v2/teams/team-filter.ts b/plugin-server/src/main/ingestion-queues/session-recording-v2/teams/team-filter.ts index feb210cfbd765..d678a7f84e4ff 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording-v2/teams/team-filter.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording-v2/teams/team-filter.ts @@ -1,46 +1,31 @@ -import { Message, MessageHeader } from 'node-rdkafka' +import { MessageHeader } from 'node-rdkafka' import { status } from '../../../../utils/status' import { eventDroppedCounter } from '../../metrics' -import { KafkaParser } from '../kafka/parser' -import { BatchMessageProcessor } from '../types' +import { ParsedMessageData } from '../kafka/types' import { TeamService } from './team-service' import { MessageWithTeam, Team } from './types' -export class TeamFilter implements BatchMessageProcessor { - constructor(private readonly teamService: TeamService, private readonly parser: KafkaParser) {} +export class TeamFilter { + constructor(private readonly teamService: TeamService) {} - public async parseMessage(message: Message): Promise { - const team = await this.validateTeamToken(message, message.headers) - if (!team) { - return null - } - - const parsedMessage = await this.parser.parseMessage(message) - if (!parsedMessage) { - return null - } - - return { - team, - message: parsedMessage, - } - } - - public async parseBatch(messages: Message[]): Promise { - const parsedMessages: MessageWithTeam[] = [] + public async filterBatch(messages: ParsedMessageData[]): Promise { + const messagesWithTeam: MessageWithTeam[] = [] for (const message of messages) { - const messageWithTeam = await this.parseMessage(message) - if (messageWithTeam) { - parsedMessages.push(messageWithTeam) + const team = await this.validateTeam(message) + if (team) { + messagesWithTeam.push({ + team, + message, + }) } } - return parsedMessages + return messagesWithTeam } - private async validateTeamToken(message: Message, headers: MessageHeader[] | undefined): Promise { + private async validateTeam(message: ParsedMessageData): Promise { const dropMessage = (reason: string, extra?: Record) => { // TODO refactor eventDroppedCounter @@ -52,13 +37,13 @@ export class TeamFilter implements BatchMessageProcessor implements BatchMessageProcessor { - constructor( - private readonly sourceProcessor: BatchMessageProcessor, - private readonly captureWarning: CaptureIngestionWarningFn, - private readonly metrics: VersionMetrics - ) {} +export class LibVersionMonitor { + constructor(private readonly captureWarning: CaptureIngestionWarningFn, private readonly metrics: VersionMetrics) {} - public async parseBatch(messages: TInput[]): Promise { - const processedMessages = await this.sourceProcessor.parseBatch(messages) - await Promise.all(processedMessages.map((message) => this.checkLibVersion(message))) - return processedMessages + public async processBatch(messages: MessageWithTeam[]): Promise { + await Promise.all(messages.map((message) => this.checkLibVersion(message))) + return messages } private async checkLibVersion(message: MessageWithTeam): Promise { diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 83ad08239b37d..e2ac8fa2eb586 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -320,6 +320,9 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig CYCLOTRON_DATABASE_URL: string CYCLOTRON_SHARD_DEPTH_LIMIT: number + + SESSION_RECORDING_MAX_BATCH_SIZE_KB: number | undefined + SESSION_RECORDING_MAX_BATCH_AGE_MS: number | undefined } export interface Hub extends PluginsServerConfig { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts new file mode 100644 index 0000000000000..0c1fc3bed8516 --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/message-parser.test.ts @@ -0,0 +1,219 @@ +import { promisify } from 'node:util' +import { Message } from 'node-rdkafka' +import { gzip } from 'zlib' + +import { KafkaMessageParser } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/message-parser' +import { KafkaMetrics } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/metrics' + +const compressWithGzip = promisify(gzip) + +describe('KafkaMessageParser', () => { + let parser: KafkaMessageParser + let mockKafkaMetrics: jest.Mocked + + beforeEach(() => { + mockKafkaMetrics = { + incrementMessageDropped: jest.fn(), + } as jest.Mocked + parser = new KafkaMessageParser(mockKafkaMetrics) + }) + + const createMessage = (data: any, overrides: Partial = {}): Message => ({ + value: Buffer.from(JSON.stringify(data)), + size: 100, + topic: 'test-topic', + offset: 0, + partition: 0, + timestamp: 1234567890, + ...overrides, + }) + + describe('parseBatch', () => { + it('handles valid snapshot message', async () => { + const snapshotItems = [ + { type: 1, timestamp: 1234567890 }, + { type: 2, timestamp: 1234567891 }, + ] + const messages = [ + createMessage({ + data: JSON.stringify({ + event: '$snapshot_items', + properties: { + $session_id: 'session1', + $window_id: 'window1', + $snapshot_items: snapshotItems, + }, + }), + distinct_id: 'user123', + }), + ] + + const results = await parser.parseBatch(messages) + + expect(results).toHaveLength(1) + expect(results[0]).toMatchObject({ + metadata: { + partition: 0, + topic: 'test-topic', + rawSize: 100, + offset: 0, + timestamp: 1234567890, + }, + headers: undefined, + distinct_id: 'user123', + session_id: 'session1', + eventsByWindowId: { + window1: snapshotItems, + }, + eventsRange: { + start: 1234567890, + end: 1234567891, + }, + snapshot_source: undefined, + }) + expect(mockKafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled() + }) + + it('handles gzipped message', async () => { + const snapshotItems = [ + { type: 1, timestamp: 1234567890 }, + { type: 2, timestamp: 1234567891 }, + ] + const data = { + data: JSON.stringify({ + event: '$snapshot_items', + properties: { + $session_id: 'session1', + $window_id: 'window1', + $snapshot_items: snapshotItems, + }, + }), + distinct_id: 'user123', + } + + const gzippedData = await compressWithGzip(JSON.stringify(data)) + const messages = [createMessage(data, { value: gzippedData })] + + const results = await parser.parseBatch(messages) + + expect(results).toHaveLength(1) + expect(results[0]).toMatchObject({ + session_id: 'session1', + distinct_id: 'user123', + eventsByWindowId: { + window1: snapshotItems, + }, + eventsRange: { + start: 1234567890, + end: 1234567891, + }, + }) + expect(mockKafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled() + }) + + it('filters out message with missing value', async () => { + const messages = [createMessage({}, { value: null })] + + const results = await parser.parseBatch(messages) + + expect(results).toHaveLength(0) + expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( + 'session_recordings_blob_ingestion', + 'message_value_or_timestamp_is_empty' + ) + }) + + it('filters out message with missing timestamp', async () => { + const messages = [createMessage({}, { timestamp: undefined })] + + const results = await parser.parseBatch(messages) + + expect(results).toHaveLength(0) + expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( + 'session_recordings_blob_ingestion', + 'message_value_or_timestamp_is_empty' + ) + }) + + it('filters out message with invalid gzip data', async () => { + const messages = [createMessage({}, { value: Buffer.from([0x1f, 0x8b, 0x08, 0x00, 0x00]) })] + + const results = await parser.parseBatch(messages) + + expect(results).toHaveLength(0) + expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( + 'session_recordings_blob_ingestion', + 'invalid_gzip_data' + ) + }) + + it('filters out message with invalid json', async () => { + const messages = [createMessage({}, { value: Buffer.from('invalid json') })] + + const results = await parser.parseBatch(messages) + + expect(results).toHaveLength(0) + expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( + 'session_recordings_blob_ingestion', + 'invalid_json' + ) + }) + + it('filters out non-snapshot message', async () => { + const messages = [ + createMessage({ + data: JSON.stringify({ + event: 'not_a_snapshot', + properties: { + $session_id: 'session1', + }, + }), + }), + ] + + const results = await parser.parseBatch(messages) + + expect(results).toHaveLength(0) + expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( + 'session_recordings_blob_ingestion', + 'received_non_snapshot_message' + ) + }) + + it('handles empty batch', async () => { + const results = await parser.parseBatch([]) + expect(results).toEqual([]) + }) + + it('processes multiple messages in parallel', async () => { + const messages = [ + createMessage({ + data: JSON.stringify({ + event: '$snapshot_items', + properties: { + $session_id: 'session1', + $window_id: 'window1', + $snapshot_items: [{ timestamp: 1, type: 2 }], + }, + }), + }), + createMessage({ + data: JSON.stringify({ + event: '$snapshot_items', + properties: { + $session_id: 'session2', + $window_id: 'window2', + $snapshot_items: [{ timestamp: 2, type: 2 }], + }, + }), + }), + ] + + const results = await parser.parseBatch(messages) + + expect(results).toHaveLength(2) + expect(results[0]?.session_id).toBe('session1') + expect(results[1]?.session_id).toBe('session2') + }) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/offset-manager.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/offset-manager.test.ts new file mode 100644 index 0000000000000..f199990535c61 --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/offset-manager.test.ts @@ -0,0 +1,198 @@ +import { KafkaOffsetManager } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/offset-manager' +import { SessionBatchRecorder } from '../../../../../src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder' +import { MessageWithTeam } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/types' + +describe('KafkaOffsetManager', () => { + let offsetManager: KafkaOffsetManager + let mockCommitOffsets: jest.Mock> + let mockRecorder: jest.Mocked + const TEST_TOPIC = 'test_topic' + + beforeEach(() => { + mockCommitOffsets = jest.fn().mockResolvedValue(undefined) + mockRecorder = { + record: jest.fn().mockReturnValue(100), + flush: jest.fn().mockResolvedValue(undefined), + size: 0, + discardPartition: jest.fn(), + } as unknown as jest.Mocked + + offsetManager = new KafkaOffsetManager(mockCommitOffsets, TEST_TOPIC) + }) + + const createMessage = (metadata: { partition: number; offset: number }): MessageWithTeam => ({ + team: { + teamId: 1, + consoleLogIngestionEnabled: false, + }, + message: { + distinct_id: 'distinct_id', + session_id: 'session1', + eventsByWindowId: { window1: [] }, + eventsRange: { start: 0, end: 0 }, + metadata: { + partition: metadata.partition, + offset: metadata.offset, + topic: 'test_topic', + timestamp: 0, + rawSize: 0, + }, + }, + }) + + it('should track offsets when recording messages', async () => { + const wrapper = offsetManager.wrapBatch(mockRecorder) + const message: MessageWithTeam = { + team: { teamId: 1, consoleLogIngestionEnabled: false }, + message: { + metadata: { partition: 1, offset: 100 }, + }, + } as MessageWithTeam + + wrapper.record(message) + + await wrapper.flush() + await offsetManager.commit() + + expect(mockCommitOffsets).toHaveBeenCalledWith([{ topic: TEST_TOPIC, partition: 1, offset: 101 }]) + }) + + it('should commit offsets for multiple partitions', async () => { + const wrapper = offsetManager.wrapBatch(mockRecorder) + const messages = [ + { partition: 1, offset: 100 }, + { partition: 1, offset: 101 }, + { partition: 2, offset: 200 }, + ] + + for (const metadata of messages) { + wrapper.record({ + team: { teamId: 1, consoleLogIngestionEnabled: false }, + message: { metadata }, + } as MessageWithTeam) + } + + await wrapper.flush() + await offsetManager.commit() + + expect(mockCommitOffsets).toHaveBeenCalledWith([ + { topic: TEST_TOPIC, partition: 1, offset: 102 }, // Last offset + 1 + { topic: TEST_TOPIC, partition: 2, offset: 201 }, // Last offset + 1 + ]) + }) + + it('should clear offsets after commit', async () => { + const wrapper = offsetManager.wrapBatch(mockRecorder) + const message: MessageWithTeam = { + team: { teamId: 1, consoleLogIngestionEnabled: false }, + message: { + metadata: { partition: 1, offset: 100 }, + }, + } as MessageWithTeam + + wrapper.record(message) + await wrapper.flush() + await offsetManager.commit() + + // Second commit should not commit anything + await offsetManager.commit() + + expect(mockCommitOffsets).toHaveBeenCalledTimes(1) + }) + + it('should handle commit failures', async () => { + const error = new Error('Commit failed') + mockCommitOffsets.mockRejectedValueOnce(error) + + const wrapper = offsetManager.wrapBatch(mockRecorder) + wrapper.record({ + team: { teamId: 1, consoleLogIngestionEnabled: false }, + message: { + metadata: { partition: 1, offset: 100 }, + }, + } as MessageWithTeam) + + await wrapper.flush() + await expect(offsetManager.commit()).rejects.toThrow(error) + }) + + describe('partition handling', () => { + it('should delegate discardPartition to inner recorder', () => { + const wrappedBatch = offsetManager.wrapBatch(mockRecorder) + wrappedBatch.discardPartition(1) + + expect(mockRecorder.discardPartition).toHaveBeenCalledWith(1) + }) + + it('should not commit offsets for discarded partitions', async () => { + const wrappedBatch = offsetManager.wrapBatch(mockRecorder) + + // Record messages for two partitions + wrappedBatch.record(createMessage({ partition: 1, offset: 100 })) + wrappedBatch.record(createMessage({ partition: 2, offset: 200 })) + + // Discard partition 1 + wrappedBatch.discardPartition(1) + + await offsetManager.commit() + + // Should only commit offset for partition 2 + expect(mockCommitOffsets).toHaveBeenCalledWith([ + { + topic: 'test_topic', + partition: 2, + offset: 201, + }, + ]) + }) + + it('should handle discarding already committed partitions', async () => { + const wrappedBatch = offsetManager.wrapBatch(mockRecorder) + + // Record and commit a message + wrappedBatch.record(createMessage({ partition: 1, offset: 100 })) + await offsetManager.commit() + + // Discard the partition after commit + wrappedBatch.discardPartition(1) + + // Record new message for same partition + wrappedBatch.record(createMessage({ partition: 1, offset: 101 })) + await offsetManager.commit() + + expect(mockCommitOffsets).toHaveBeenCalledTimes(2) + expect(mockCommitOffsets).toHaveBeenLastCalledWith([ + { + topic: 'test_topic', + partition: 1, + offset: 102, + }, + ]) + }) + + it('should handle discarding non-existent partitions', () => { + const wrappedBatch = offsetManager.wrapBatch(mockRecorder) + wrappedBatch.discardPartition(999) + expect(mockRecorder.discardPartition).toHaveBeenCalledWith(999) + }) + + it('should maintain highest offset when recording multiple messages', async () => { + const wrappedBatch = offsetManager.wrapBatch(mockRecorder) + + // Record messages in non-sequential order + wrappedBatch.record(createMessage({ partition: 1, offset: 100 })) + wrappedBatch.record(createMessage({ partition: 1, offset: 99 })) + wrappedBatch.record(createMessage({ partition: 1, offset: 101 })) + + await offsetManager.commit() + + expect(mockCommitOffsets).toHaveBeenCalledWith([ + { + topic: 'test_topic', + partition: 1, + offset: 102, + }, + ]) + }) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/parser.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/parser.test.ts deleted file mode 100644 index 7ca6ad52a2569..0000000000000 --- a/plugin-server/tests/main/ingestion-queues/session-recording-v2/kafka/parser.test.ts +++ /dev/null @@ -1,165 +0,0 @@ -import { Message } from 'node-rdkafka' -import { promisify } from 'util' -import { gzip } from 'zlib' - -import { KafkaMetrics } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/metrics' -import { KafkaParser } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/parser' - -const compressWithGzip = promisify(gzip) - -describe('KafkaParser', () => { - let parser: KafkaParser - let mockKafkaMetrics: jest.Mocked - - beforeEach(() => { - mockKafkaMetrics = { - incrementMessageDropped: jest.fn(), - } as jest.Mocked - parser = new KafkaParser(mockKafkaMetrics) - }) - - const createMessage = (data: any, overrides: Partial = {}): Message => ({ - value: Buffer.from(JSON.stringify(data)), - size: 100, - topic: 'test-topic', - offset: 0, - partition: 0, - timestamp: 1234567890, - ...overrides, - }) - - describe('parseMessage', () => { - it('successfully parses a valid message', async () => { - const snapshotItems = [ - { type: 1, timestamp: 1234567890 }, - { type: 2, timestamp: 1234567891 }, - ] - const message = createMessage({ - data: JSON.stringify({ - event: '$snapshot_items', - properties: { - $session_id: 'session1', - $window_id: 'window1', - $snapshot_items: snapshotItems, - }, - }), - distinct_id: 'user123', - }) - - const result = await parser.parseMessage(message) - - expect(result).toEqual({ - metadata: { - partition: 0, - topic: 'test-topic', - rawSize: 100, - offset: 0, - timestamp: 1234567890, - }, - headers: undefined, - distinct_id: 'user123', - session_id: 'session1', - eventsByWindowId: { - window1: snapshotItems, - }, - eventsRange: { - start: 1234567890, - end: 1234567891, - }, - snapshot_source: undefined, - }) - expect(mockKafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled() - }) - - it('successfully parses a gzipped message', async () => { - const snapshotItems = [ - { type: 1, timestamp: 1234567890 }, - { type: 2, timestamp: 1234567891 }, - ] - const data = { - data: JSON.stringify({ - event: '$snapshot_items', - properties: { - $session_id: 'session1', - $window_id: 'window1', - $snapshot_items: snapshotItems, - }, - }), - distinct_id: 'user123', - } - - const gzippedData = await compressWithGzip(JSON.stringify(data)) - const message = createMessage(data, { value: gzippedData }) - - const result = await parser.parseMessage(message) - - expect(result).toBeTruthy() - expect(result?.session_id).toBe('session1') - expect(mockKafkaMetrics.incrementMessageDropped).not.toHaveBeenCalled() - }) - it('drops message with missing value', async () => { - const message = createMessage({}, { value: undefined }) - const result = await parser.parseMessage(message) - - expect(result).toBeNull() - expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( - 'session_recordings_blob_ingestion', - 'message_value_or_timestamp_is_empty' - ) - }) - - it('drops message with missing timestamp', async () => { - const message = createMessage({}, { timestamp: undefined }) - const result = await parser.parseMessage(message) - - expect(result).toBeNull() - expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( - 'session_recordings_blob_ingestion', - 'message_value_or_timestamp_is_empty' - ) - }) - - it('drops message with invalid gzip data', async () => { - const invalidGzip = Buffer.from([0x1f, 0x8b, 0x08, 0x00, 0x00]) // Invalid gzip data - const message = createMessage({}, { value: invalidGzip }) - - const result = await parser.parseMessage(message) - - expect(result).toBeNull() - expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( - 'session_recordings_blob_ingestion', - 'invalid_gzip_data' - ) - }) - - it('drops message with invalid JSON', async () => { - const message = createMessage({}, { value: Buffer.from('invalid json') }) - const result = await parser.parseMessage(message) - - expect(result).toBeNull() - expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( - 'session_recordings_blob_ingestion', - 'invalid_json' - ) - }) - - it('drops non-snapshot messages', async () => { - const message = createMessage({ - data: JSON.stringify({ - event: 'not_a_snapshot', - properties: { - $session_id: 'session1', - }, - }), - }) - - const result = await parser.parseMessage(message) - - expect(result).toBeNull() - expect(mockKafkaMetrics.incrementMessageDropped).toHaveBeenCalledWith( - 'session_recordings_blob_ingestion', - 'received_non_snapshot_message' - ) - }) - }) -}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/recorder.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/recorder.test.ts new file mode 100644 index 0000000000000..047539c4865a4 --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/recorder.test.ts @@ -0,0 +1,228 @@ +import { PassThrough } from 'stream' + +import { ParsedMessageData } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/types' +import { SessionRecorder } from '../../../../../src/main/ingestion-queues/session-recording-v2/sessions/recorder' + +// RRWeb event type constants +const enum EventType { + DomContentLoaded = 0, + Load = 1, + FullSnapshot = 2, + IncrementalSnapshot = 3, + Meta = 4, + Custom = 5, +} + +describe('SessionRecorder', () => { + let recorder: SessionRecorder + + beforeEach(() => { + recorder = new SessionRecorder() + }) + + const createMessage = (windowId: string, events: any[]): ParsedMessageData => ({ + distinct_id: 'distinct_id', + session_id: 'session_id', + eventsByWindowId: { + [windowId]: events, + }, + eventsRange: { + start: events[0]?.timestamp || 0, + end: events[events.length - 1]?.timestamp || 0, + }, + metadata: { + partition: 1, + topic: 'test', + offset: 0, + timestamp: 0, + rawSize: 0, + }, + }) + + const parseLines = (data: string): Array<[string, any]> => { + return data + .trim() + .split('\n') + .map((line) => JSON.parse(line)) + } + + describe('recordMessage', () => { + it('should record events in JSONL format', () => { + const events = [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { + source: 1, + adds: [{ parentId: 1, nextId: 2, node: { tag: 'div', attrs: { class: 'test' } } }], + }, + }, + { + type: EventType.IncrementalSnapshot, + timestamp: 2000, + data: { source: 2, texts: [{ id: 1, value: 'Updated text' }] }, + }, + ] + const message = createMessage('window1', events) + + const bytesWritten = recorder.recordMessage(message) + + const stream = new PassThrough() + let streamData = '' + stream.on('data', (chunk) => { + streamData += chunk + }) + + recorder.dump(stream) + const lines = parseLines(streamData) + + expect(lines).toEqual([ + ['window1', events[0]], + ['window1', events[1]], + ]) + expect(bytesWritten).toBeGreaterThan(0) + }) + + it('should handle multiple windows with multiple events', () => { + const events = { + window1: [ + { + type: EventType.Meta, + timestamp: 1000, + data: { href: 'https://example.com', width: 1024, height: 768 }, + }, + { + type: EventType.FullSnapshot, + timestamp: 1500, + data: { + source: 1, + adds: [{ parentId: 1, nextId: null, node: { tag: 'h1', attrs: { id: 'title' } } }], + }, + }, + ], + window2: [ + { + type: EventType.Custom, + timestamp: 2000, + data: { tag: 'user-interaction', payload: { type: 'click', target: '#submit-btn' } }, + }, + { + type: EventType.IncrementalSnapshot, + timestamp: 2500, + data: { source: 3, mousemove: [{ x: 100, y: 200, id: 1 }] }, + }, + ], + } + const message: ParsedMessageData = { + ...createMessage('', []), + eventsByWindowId: events, + } + + recorder.recordMessage(message) + + const stream = new PassThrough() + let streamData = '' + stream.on('data', (chunk) => { + streamData += chunk + }) + + recorder.dump(stream) + const lines = parseLines(streamData) + + expect(lines).toEqual([ + ['window1', events.window1[0]], + ['window1', events.window1[1]], + ['window2', events.window2[0]], + ['window2', events.window2[1]], + ]) + }) + + it('should handle empty events array', () => { + const message = createMessage('window1', []) + const bytesWritten = recorder.recordMessage(message) + + const stream = new PassThrough() + let streamData = '' + stream.on('data', (chunk) => { + streamData += chunk + }) + + recorder.dump(stream) + expect(streamData).toBe('') + expect(bytesWritten).toBe(0) + }) + + it('should correctly count bytes for multi-byte characters', () => { + let bytesWritten = 0 + + const events1 = { + window1: [{ type: EventType.Custom, timestamp: 1000, data: { message: 'Testowanie z jeżem 🦔' } }], + } + const message1: ParsedMessageData = { + ...createMessage('', []), + eventsByWindowId: events1, + } + bytesWritten += recorder.recordMessage(message1) + + const events2 = { + window1: [ + { + type: EventType.Custom, + timestamp: 1500, + data: { message: '🦔 What do you call a hedgehog in the desert? A cactus impersonator!' }, + }, + ], + } + const message2: ParsedMessageData = { + ...createMessage('', []), + eventsByWindowId: events2, + } + bytesWritten += recorder.recordMessage(message2) + + const events3 = { + window2: [ + { + type: EventType.Custom, + timestamp: 2000, + data: { message: "🦔 What's a hedgehog's favorite exercise? Spike jumps!" }, + }, + ], + } + const message3: ParsedMessageData = { + ...createMessage('', []), + eventsByWindowId: events3, + } + bytesWritten += recorder.recordMessage(message3) + + const stream = new PassThrough() + let bytesReceived = 0 + stream.on('data', (chunk) => { + bytesReceived += Buffer.byteLength(chunk) + }) + + recorder.dump(stream) + + expect(bytesReceived).toBe(bytesWritten) + }) + }) + + describe('dump', () => { + it('should ensure last line ends with newline', async () => { + const events = [ + { type: EventType.FullSnapshot, timestamp: 1000, data: {} }, + { type: EventType.IncrementalSnapshot, timestamp: 2000, data: {} }, + ] + const message = createMessage('window1', events) + recorder.recordMessage(message) + + const stream = new PassThrough() + let streamData = '' + stream.on('data', (chunk) => { + streamData += chunk + }) + + await recorder.dump(stream) + expect(streamData.endsWith('\n')).toBe(true) + }) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.test.ts new file mode 100644 index 0000000000000..8362ce60bb0df --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager.test.ts @@ -0,0 +1,330 @@ +import { KafkaOffsetManager } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/offset-manager' +import { SessionBatchManager } from '../../../../../src/main/ingestion-queues/session-recording-v2/sessions/session-batch-manager' +import { SessionBatchRecorder } from '../../../../../src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder' + +jest.setTimeout(1000) + +const createMockBatch = (): jest.Mocked => { + return { + record: jest.fn(), + flush: jest.fn().mockResolvedValue(undefined), + get size() { + return 0 + }, + discardPartition: jest.fn(), + } as unknown as jest.Mocked +} + +describe('SessionBatchManager', () => { + let manager: SessionBatchManager + let executionOrder: number[] + let createBatchMock: jest.Mock + let currentBatch: jest.Mocked + let mockOffsetManager: jest.Mocked + + beforeEach(() => { + currentBatch = createMockBatch() + createBatchMock = jest.fn().mockImplementation(() => { + currentBatch = createMockBatch() + return currentBatch + }) + + mockOffsetManager = { + wrapBatch: jest.fn().mockImplementation((batch) => batch), + commit: jest.fn().mockResolvedValue(undefined), + trackOffset: jest.fn(), + } as unknown as jest.Mocked + + manager = new SessionBatchManager({ + maxBatchSizeBytes: 100, + maxBatchAgeMs: 1000, + createBatch: createBatchMock, + offsetManager: mockOffsetManager, + }) + executionOrder = [] + }) + + const waitForNextTick = () => new Promise((resolve) => process.nextTick(resolve)) + + const waitFor = async (condition: () => boolean) => { + while (!condition()) { + await waitForNextTick() + } + } + + const waitForValue = async (array: number[], value: number) => { + await waitFor(() => array.includes(value)) + } + + it('should execute callbacks sequentially', async () => { + const promise1 = manager.withBatch(async () => { + executionOrder.push(1) + await waitForValue(executionOrder, 1) + executionOrder.push(2) + }) + + const promise2 = manager.withBatch(async () => { + executionOrder.push(3) + await waitForValue(executionOrder, 3) + executionOrder.push(4) + }) + + const promise3 = manager.withBatch(async () => { + executionOrder.push(5) + executionOrder.push(6) + return Promise.resolve() + }) + + await Promise.all([promise1, promise2, promise3]) + + // Should execute in order despite different delays + expect(executionOrder).toEqual([1, 2, 3, 4, 5, 6]) + }) + + it('should handle errors without breaking the queue', async () => { + const errorSpy = jest.fn() + + const promise1 = manager + .withBatch(async () => { + executionOrder.push(1) + throw new Error('test error') + return Promise.resolve() + }) + .catch(errorSpy) + + const promise2 = manager.withBatch(async () => { + executionOrder.push(2) + return Promise.resolve() + }) + + await Promise.all([promise1, promise2]) + + expect(executionOrder).toEqual([1, 2]) + expect(errorSpy).toHaveBeenCalled() + }) + + it('should maintain order even with immediate callbacks', async () => { + const results: number[] = [] + const promises: Promise[] = [] + + // Queue up 10 immediate callbacks + for (let i = 0; i < 10; i++) { + promises.push( + manager.withBatch(async () => { + results.push(i) + return Promise.resolve() + }) + ) + } + + await Promise.all(promises) + + // Should execute in order 0-9 + expect(results).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + }) + + it('should process new callbacks added during execution', async () => { + const results: number[] = [] + let nestedPromise: Promise | null = null + let promise2: Promise | null = null + const promise1 = manager.withBatch(async () => { + results.push(1) + // Add a new callback while this one is executing + nestedPromise = manager.withBatch(async () => { + await waitFor(() => promise2 !== null) + results.push(2) + return Promise.resolve() + }) + return Promise.resolve() + }) + + await waitFor(() => nestedPromise !== null) + promise2 = manager.withBatch(async () => { + results.push(3) + return Promise.resolve() + }) + + await Promise.all([promise1, promise2, nestedPromise!]) + + expect(results).toEqual([1, 2, 3]) + }) + + it('should create new batch on flush', async () => { + let firstBatch: SessionBatchRecorder | null = null + + await manager.withBatch(async (batch) => { + firstBatch = batch + return Promise.resolve() + }) + + await manager.flush() + + await manager.withBatch(async (batch) => { + expect(batch).not.toBe(firstBatch) + return Promise.resolve() + }) + }) + + it('should create new batch and commit offsets on flush', async () => { + const firstBatch = currentBatch + + await manager.flush() + + expect(firstBatch.flush).toHaveBeenCalled() + expect(mockOffsetManager.commit).toHaveBeenCalled() + expect(createBatchMock).toHaveBeenCalledTimes(2) + }) + + describe('size-based flushing', () => { + it('should flush and commit when buffer is full', async () => { + const firstBatch = currentBatch + jest.spyOn(firstBatch, 'size', 'get').mockReturnValue(150) + + await manager.flushIfNeeded() + + expect(firstBatch.flush).toHaveBeenCalled() + expect(mockOffsetManager.commit).toHaveBeenCalled() + expect(createBatchMock).toHaveBeenCalledTimes(2) + }) + }) + + describe('time-based flushing', () => { + beforeEach(() => { + jest.useFakeTimers() + }) + + afterEach(() => { + jest.useRealTimers() + }) + + it('should not flush when buffer is under limit and timeout not reached', async () => { + const firstBatch = currentBatch + jest.spyOn(firstBatch, 'size', 'get').mockReturnValue(50) + + jest.advanceTimersByTime(500) // Advance time by 500ms (less than timeout) + const flushPromise = manager.flushIfNeeded() + jest.runAllTimers() + await flushPromise + + expect(firstBatch.flush).not.toHaveBeenCalled() + expect(mockOffsetManager.commit).not.toHaveBeenCalled() + expect(createBatchMock).toHaveBeenCalledTimes(1) + }) + + it('should flush when timeout is reached', async () => { + const firstBatch = currentBatch + jest.spyOn(firstBatch, 'size', 'get').mockReturnValue(50) + + jest.advanceTimersByTime(1500) // Advance time by 1.5s (more than timeout) + const flushPromise = manager.flushIfNeeded() + jest.runAllTimers() + await flushPromise + + expect(firstBatch.flush).toHaveBeenCalled() + expect(mockOffsetManager.commit).toHaveBeenCalled() + expect(createBatchMock).toHaveBeenCalledTimes(2) + }) + + it('should reset flush timer after flush', async () => { + const firstBatch = currentBatch + jest.spyOn(firstBatch, 'size', 'get').mockReturnValue(50) + + // First flush due to timeout + jest.advanceTimersByTime(1500) + const firstFlushPromise = manager.flushIfNeeded() + jest.runAllTimers() + await firstFlushPromise + expect(firstBatch.flush).toHaveBeenCalled() + + const secondBatch = currentBatch + jest.spyOn(secondBatch, 'size', 'get').mockReturnValue(50) + + // Advance less than timeout from last flush + jest.advanceTimersByTime(500) + const secondFlushPromise = manager.flushIfNeeded() + jest.runAllTimers() + await secondFlushPromise + + expect(secondBatch.flush).not.toHaveBeenCalled() + expect(mockOffsetManager.commit).toHaveBeenCalledTimes(1) + }) + }) + + it('should execute callbacks sequentially including flushes', async () => { + const firstBatch = currentBatch + + const promise1 = manager.withBatch(async () => { + executionOrder.push(1) + return Promise.resolve() + }) + + const flushPromise = manager.flush() + + const promise2 = manager.withBatch(async () => { + executionOrder.push(2) + return Promise.resolve() + }) + + await Promise.all([promise1, flushPromise, promise2]) + + expect(executionOrder).toEqual([1, 2]) + expect(firstBatch.flush).toHaveBeenCalled() + expect(mockOffsetManager.commit).toHaveBeenCalled() + }) + + describe('partition handling', () => { + it('should discard partitions on new batch after flush', async () => { + const firstBatch = currentBatch + + // Flush to create a new batch + await manager.flush() + const secondBatch = currentBatch + + // Verify we have a new batch + expect(secondBatch).not.toBe(firstBatch) + + // Discard partitions + await manager.discardPartitions([1, 2]) + + // Verify discards happened on the new batch only + expect(firstBatch.discardPartition).not.toHaveBeenCalled() + expect(secondBatch.discardPartition).toHaveBeenCalledWith(1) + expect(secondBatch.discardPartition).toHaveBeenCalledWith(2) + }) + + it('should discard multiple partitions on current batch', async () => { + await manager.discardPartitions([1, 2]) + expect(currentBatch.discardPartition).toHaveBeenCalledWith(1) + expect(currentBatch.discardPartition).toHaveBeenCalledWith(2) + expect(currentBatch.discardPartition).toHaveBeenCalledTimes(2) + }) + + it('should maintain operation order when discarding partitions', async () => { + const executionOrder: number[] = [] + + // Start a long-running batch operation + const batchPromise = manager.withBatch(async () => { + await new Promise((resolve) => setTimeout(resolve, 100)) + executionOrder.push(1) + return Promise.resolve() + }) + + // Queue up a partition discard + const discardPromise = manager.discardPartitions([1]).then(() => { + executionOrder.push(2) + }) + + // Wait for both operations to complete + await Promise.all([batchPromise, discardPromise]) + + // Verify operations happened in the correct order + expect(executionOrder).toEqual([1, 2]) + expect(currentBatch.discardPartition).toHaveBeenCalledWith(1) + }) + + it('should handle empty partition array', async () => { + await manager.discardPartitions([]) + expect(currentBatch.discardPartition).not.toHaveBeenCalled() + }) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts new file mode 100644 index 0000000000000..e81543391a2b6 --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder.test.ts @@ -0,0 +1,490 @@ +import { PassThrough } from 'stream' + +import { + SessionBatchRecorder, + SessionBatchRecorderInterface, + SessionBatchWriter, +} from '../../../../../src/main/ingestion-queues/session-recording-v2/sessions/session-batch-recorder' +import { MessageWithTeam } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/types' + +// RRWeb event type constants +const enum EventType { + DomContentLoaded = 0, + Load = 1, + FullSnapshot = 2, + IncrementalSnapshot = 3, + Meta = 4, + Custom = 5, +} + +interface RRWebEvent { + type: EventType + timestamp: number + data: Record +} + +interface MessageMetadata { + partition?: number + topic?: string + offset?: number + timestamp?: number + rawSize?: number +} + +describe('SessionBatchRecorder', () => { + let recorder: SessionBatchRecorderInterface + let mockWriter: jest.Mocked + let mockStream: PassThrough + let mockFinish: () => Promise + + beforeEach(() => { + mockStream = new PassThrough() + mockFinish = jest.fn().mockResolvedValue(undefined) + mockWriter = { + open: jest.fn().mockImplementation(() => + Promise.resolve({ + stream: mockStream, + finish: mockFinish, + }) + ), + } + recorder = new SessionBatchRecorder(mockWriter) + }) + + const createMessage = ( + sessionId: string, + events: RRWebEvent[], + metadata: MessageMetadata = {} + ): MessageWithTeam => ({ + team: { + teamId: 1, + consoleLogIngestionEnabled: false, + }, + message: { + distinct_id: 'distinct_id', + session_id: sessionId, + eventsByWindowId: { + window1: events, + }, + eventsRange: { + start: events[0]?.timestamp || 0, + end: events[events.length - 1]?.timestamp || 0, + }, + metadata: { + partition: 1, + topic: 'test', + offset: 0, + timestamp: 0, + rawSize: 0, + ...metadata, + }, + }, + }) + + const parseLines = (output: string): [string, RRWebEvent][] => { + return output + .trim() + .split('\n') + .map((line) => { + const [windowId, event] = JSON.parse(line) + return [windowId, event] + }) + } + + const captureOutput = (stream: PassThrough): Promise => { + return new Promise((resolve) => { + let streamData = '' + stream.on('data', (chunk) => { + streamData += chunk + }) + stream.on('end', () => { + resolve(streamData) + }) + }) + } + + describe('recording and flushing', () => { + it('should process and flush a single session', async () => { + const message = createMessage('session1', [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { source: 1, adds: [{ parentId: 1, nextId: 2, node: { tag: 'div' } }] }, + }, + ]) + + recorder.record(message) + const outputPromise = captureOutput(mockStream) + await recorder.flush() + + expect(mockWriter.open).toHaveBeenCalled() + expect(mockFinish).toHaveBeenCalled() + + const output = await outputPromise + const lines = parseLines(output) + expect(lines).toEqual([['window1', message.message.eventsByWindowId.window1[0]]]) + expect(output.endsWith('\n')).toBe(true) + }) + + it('should handle multiple sessions in parallel', async () => { + const messages = [ + createMessage('session1', [ + { + type: EventType.Meta, + timestamp: 1000, + data: { href: 'https://example.com' }, + }, + ]), + createMessage('session2', [ + { + type: EventType.Custom, + timestamp: 2000, + data: { tag: 'user-interaction' }, + }, + ]), + ] + + messages.forEach((message) => recorder.record(message)) + const outputPromise = captureOutput(mockStream) + await recorder.flush() + + expect(mockWriter.open).toHaveBeenCalled() + expect(mockFinish).toHaveBeenCalled() + + const output = await outputPromise + const lines = parseLines(output) + expect(lines).toEqual([ + ['window1', messages[0].message.eventsByWindowId.window1[0]], + ['window1', messages[1].message.eventsByWindowId.window1[0]], + ]) + expect(output.endsWith('\n')).toBe(true) + }) + + it('should accumulate events for the same session', async () => { + const messages = [ + createMessage('session1', [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { source: 1, adds: [{ parentId: 1, nextId: 2, node: { tag: 'div' } }] }, + }, + ]), + createMessage('session1', [ + { + type: EventType.IncrementalSnapshot, + timestamp: 2000, + data: { source: 2, texts: [{ id: 1, value: 'Updated text' }] }, + }, + ]), + ] + + messages.forEach((message) => recorder.record(message)) + const outputPromise = captureOutput(mockStream) + await recorder.flush() + + expect(mockWriter.open).toHaveBeenCalled() + expect(mockFinish).toHaveBeenCalled() + + const output = await outputPromise + const lines = parseLines(output) + expect(lines).toEqual([ + ['window1', messages[0].message.eventsByWindowId.window1[0]], + ['window1', messages[1].message.eventsByWindowId.window1[0]], + ]) + expect(output.endsWith('\n')).toBe(true) + }) + + it('should handle empty events array', async () => { + const message = createMessage('session1', []) + const bytesWritten = recorder.record(message) + + const outputPromise = captureOutput(mockStream) + await recorder.flush() + + expect(mockWriter.open).toHaveBeenCalled() + expect(mockFinish).toHaveBeenCalled() + + const output = await outputPromise + expect(output).toBe('') + expect(bytesWritten).toBe(0) + }) + + it('should group events by session when interleaved', async () => { + const messages = [ + createMessage('session1', [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { source: 1, adds: [{ parentId: 1, nextId: 2, node: { tag: 'div' } }] }, + }, + ]), + createMessage('session2', [ + { + type: EventType.Meta, + timestamp: 1100, + data: { href: 'https://example.com' }, + }, + ]), + createMessage('session1', [ + { + type: EventType.IncrementalSnapshot, + timestamp: 2000, + data: { source: 2, texts: [{ id: 1, value: 'Updated text' }] }, + }, + ]), + createMessage('session2', [ + { + type: EventType.Custom, + timestamp: 2100, + data: { tag: 'user-interaction' }, + }, + ]), + ] + + messages.forEach((message) => recorder.record(message)) + const outputPromise = captureOutput(mockStream) + await recorder.flush() + + expect(mockWriter.open).toHaveBeenCalled() + expect(mockFinish).toHaveBeenCalled() + + const output = await outputPromise + const lines = parseLines(output) + + // Events should be grouped by session, maintaining chronological order within each session + expect(lines).toEqual([ + // All session1 events + ['window1', messages[0].message.eventsByWindowId.window1[0]], + ['window1', messages[2].message.eventsByWindowId.window1[0]], + // All session2 events + ['window1', messages[1].message.eventsByWindowId.window1[0]], + ['window1', messages[3].message.eventsByWindowId.window1[0]], + ]) + expect(output.endsWith('\n')).toBe(true) + }) + }) + + describe('flushing behavior', () => { + it('should clear sessions after flush', async () => { + const stream1 = new PassThrough() + const stream2 = new PassThrough() + const finish1 = jest.fn().mockResolvedValue(undefined) + const finish2 = jest.fn().mockResolvedValue(undefined) + + mockWriter.open + .mockResolvedValueOnce({ stream: stream1, finish: finish1 }) + .mockResolvedValueOnce({ stream: stream2, finish: finish2 }) + + const message1 = createMessage('session1', [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { source: 1, adds: [{ parentId: 1, nextId: 2, node: { tag: 'div' } }] }, + }, + ]) + + const message2 = createMessage('session1', [ + { + type: EventType.IncrementalSnapshot, + timestamp: 2000, + data: { source: 2, texts: [{ id: 1, value: 'Updated text' }] }, + }, + ]) + + recorder.record(message1) + const outputPromise1 = captureOutput(stream1) + await recorder.flush() + + expect(mockWriter.open).toHaveBeenCalledTimes(1) + expect(finish1).toHaveBeenCalledTimes(1) + expect(finish2).not.toHaveBeenCalled() + const output1 = await outputPromise1 + + // Record another message after flush + recorder.record(message2) + const outputPromise2 = captureOutput(stream2) + await recorder.flush() + + expect(mockWriter.open).toHaveBeenCalledTimes(2) + expect(finish1).toHaveBeenCalledTimes(1) + expect(finish2).toHaveBeenCalledTimes(1) + const output2 = await outputPromise2 + + // Each output should only contain the events from its own batch + const lines1 = parseLines(output1) + const lines2 = parseLines(output2) + expect(lines1).toEqual([['window1', message1.message.eventsByWindowId.window1[0]]]) + expect(lines2).toEqual([['window1', message2.message.eventsByWindowId.window1[0]]]) + }) + + it('should not output anything on second flush if no new events', async () => { + const stream1 = new PassThrough() + const stream2 = new PassThrough() + const finish1 = jest.fn().mockResolvedValue(undefined) + const finish2 = jest.fn().mockResolvedValue(undefined) + + mockWriter.open + .mockResolvedValueOnce({ stream: stream1, finish: finish1 }) + .mockResolvedValueOnce({ stream: stream2, finish: finish2 }) + + const message = createMessage('session1', [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { source: 1 }, + }, + ]) + + recorder.record(message) + await recorder.flush() + expect(mockWriter.open).toHaveBeenCalledTimes(1) + expect(finish1).toHaveBeenCalledTimes(1) + expect(finish2).not.toHaveBeenCalled() + + const outputPromise = captureOutput(stream2) + await recorder.flush() + const output = await outputPromise + + expect(output).toBe('') + expect(mockWriter.open).toHaveBeenCalledTimes(2) + expect(finish1).toHaveBeenCalledTimes(1) + expect(finish2).toHaveBeenCalledTimes(1) + }) + }) + + describe('partition handling', () => { + it('should flush all partitions', async () => { + const messages = [ + createMessage( + 'session1', + [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { source: 1 }, + }, + ], + { partition: 1 } + ), + createMessage( + 'session2', + [ + { + type: EventType.IncrementalSnapshot, + timestamp: 2000, + data: { source: 2 }, + }, + ], + { partition: 2 } + ), + ] + + messages.forEach((message) => recorder.record(message)) + const outputPromise = captureOutput(mockStream) + await recorder.flush() + + const output = await outputPromise + const lines = parseLines(output) + + expect(lines).toEqual([ + ['window1', messages[0].message.eventsByWindowId.window1[0]], + ['window1', messages[1].message.eventsByWindowId.window1[0]], + ]) + expect(mockWriter.open).toHaveBeenCalledTimes(1) + expect(mockFinish).toHaveBeenCalledTimes(1) + }) + + it('should not flush discarded partitions', async () => { + const messages = [ + createMessage( + 'session1', + [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { source: 1 }, + }, + ], + { partition: 1 } + ), + createMessage( + 'session2', + [ + { + type: EventType.IncrementalSnapshot, + timestamp: 2000, + data: { source: 2 }, + }, + ], + { partition: 2 } + ), + ] + + messages.forEach((message) => recorder.record(message)) + recorder.discardPartition(1) // Discard partition 1 + + const outputPromise = captureOutput(mockStream) + await recorder.flush() + + const output = await outputPromise + const lines = parseLines(output) + + // Should only contain message from partition 2 + expect(lines).toEqual([['window1', messages[1].message.eventsByWindowId.window1[0]]]) + }) + + it('should correctly update size when discarding partitions', () => { + const message1 = createMessage( + 'session1', + [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { source: 1 }, + }, + ], + { partition: 1 } + ) + const message2 = createMessage( + 'session2', + [ + { + type: EventType.IncrementalSnapshot, + timestamp: 2000, + data: { source: 2 }, + }, + ], + { partition: 2 } + ) + + const size1 = recorder.record(message1) + const size2 = recorder.record(message2) + const totalSize = size1 + size2 + + expect(recorder.size).toBe(totalSize) + + recorder.discardPartition(1) + expect(recorder.size).toBe(size2) + + recorder.discardPartition(2) + expect(recorder.size).toBe(0) + }) + + it('should handle discarding non-existent partitions', () => { + const message = createMessage('session1', [ + { + type: EventType.FullSnapshot, + timestamp: 1000, + data: { source: 1 }, + }, + ]) + + const bytesWritten = recorder.record(message) + expect(recorder.size).toBe(bytesWritten) + + // Should not throw or change size + recorder.discardPartition(999) + expect(recorder.size).toBe(bytesWritten) + }) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/teams/team-filter.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording-v2/teams/team-filter.test.ts index aa00599e9a570..91e9c7f76eacf 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording-v2/teams/team-filter.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording-v2/teams/team-filter.test.ts @@ -1,193 +1,135 @@ -import { Message } from 'node-rdkafka' - -import { KafkaMetrics } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/metrics' -import { KafkaParser } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/parser' +import { ParsedMessageData } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/types' import { TeamFilter } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/team-filter' import { TeamService } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/team-service' import { Team } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/types' jest.mock('../../../../../src/main/ingestion-queues/session-recording-v2/teams/team-service') -jest.mock('../../../../../src/main/ingestion-queues/session-recording-v2/kafka/parser') const validTeam: Team = { teamId: 1, consoleLogIngestionEnabled: true, } -const createSessionRecordingMessage = (token?: string, timestamp = Date.now()): Message => ({ - value: Buffer.from('test'), - size: 4, - topic: 'test', - offset: 0, - partition: 0, - timestamp, - headers: token ? [{ token }] : undefined, -}) - -const createParsedMessage = (offset = 0, timestamp = Date.now()) => ({ - distinct_id: 'distinct_id', - session_id: 'session_id', +const createMessage = (token?: string, overrides = {}): ParsedMessageData => ({ metadata: { partition: 0, topic: 'test', - offset, - timestamp, + offset: 0, + timestamp: Date.now(), rawSize: 100, }, + headers: token ? [{ token }] : undefined, + distinct_id: 'distinct_id', + session_id: 'session_id', eventsByWindowId: {}, eventsRange: { start: 0, end: 0 }, + ...overrides, }) describe('TeamFilter', () => { let teamFilter: TeamFilter - let teamService: jest.Mocked - let kafkaMetrics: jest.Mocked - let kafkaParser: jest.Mocked + let mockTeamService: jest.Mocked beforeEach(() => { jest.clearAllMocks() - teamService = new TeamService() as jest.Mocked - kafkaMetrics = new KafkaMetrics() as jest.Mocked - kafkaParser = new KafkaParser(kafkaMetrics) as jest.Mocked - teamFilter = new TeamFilter(teamService, kafkaParser) + mockTeamService = { + getTeamByToken: jest.fn(), + } as jest.Mocked + teamFilter = new TeamFilter(mockTeamService) }) describe('team token validation', () => { it('processes messages with valid team token', async () => { - const message = createSessionRecordingMessage('valid-token') - const parsedMessage = createParsedMessage() - - teamService.getTeamByToken.mockResolvedValueOnce(validTeam) - kafkaParser.parseMessage.mockResolvedValueOnce(parsedMessage) + const message = createMessage('valid-token') + mockTeamService.getTeamByToken.mockResolvedValueOnce(validTeam) - const result = await teamFilter.parseBatch([message]) + const result = await teamFilter.filterBatch([message]) - expect(result).toEqual([{ team: validTeam, message: parsedMessage }]) - expect(teamService.getTeamByToken).toHaveBeenCalledWith('valid-token') - expect(teamService.getTeamByToken).toHaveBeenCalledTimes(1) + expect(result).toEqual([{ team: validTeam, message }]) + expect(mockTeamService.getTeamByToken).toHaveBeenCalledWith('valid-token') + expect(mockTeamService.getTeamByToken).toHaveBeenCalledTimes(1) }) it('drops messages with no token in header', async () => { - const message = createSessionRecordingMessage() - const result = await teamFilter.parseBatch([message]) + const message = createMessage() + const result = await teamFilter.filterBatch([message]) expect(result).toEqual([]) - expect(teamService.getTeamByToken).not.toHaveBeenCalled() - expect(kafkaParser.parseMessage).not.toHaveBeenCalled() + expect(mockTeamService.getTeamByToken).not.toHaveBeenCalled() }) it('drops messages with invalid team tokens', async () => { - const message = createSessionRecordingMessage('invalid-token') - teamService.getTeamByToken.mockResolvedValueOnce(null) - - const result = await teamFilter.parseBatch([message]) - - expect(result).toEqual([]) - expect(teamService.getTeamByToken).toHaveBeenCalledWith('invalid-token') - expect(teamService.getTeamByToken).toHaveBeenCalledTimes(1) - expect(kafkaParser.parseMessage).not.toHaveBeenCalled() - }) - }) + const message = createMessage('invalid-token') + mockTeamService.getTeamByToken.mockResolvedValueOnce(null) - describe('message parsing', () => { - beforeEach(() => { - teamService.getTeamByToken.mockResolvedValue(validTeam) - }) - - it('processes valid parsed messages', async () => { - const message = createSessionRecordingMessage('token') - const parsedMessage = createParsedMessage() - kafkaParser.parseMessage.mockResolvedValueOnce(parsedMessage) - - const result = await teamFilter.parseBatch([message]) - - expect(result).toEqual([{ team: validTeam, message: parsedMessage }]) - expect(teamService.getTeamByToken).toHaveBeenCalledWith('token') - expect(teamService.getTeamByToken).toHaveBeenCalledTimes(1) - }) - - it('drops messages that fail parsing', async () => { - const message = createSessionRecordingMessage('token') - kafkaParser.parseMessage.mockResolvedValueOnce(null) - - const result = await teamFilter.parseBatch([message]) + const result = await teamFilter.filterBatch([message]) expect(result).toEqual([]) - expect(teamService.getTeamByToken).toHaveBeenCalledWith('token') - expect(teamService.getTeamByToken).toHaveBeenCalledTimes(1) + expect(mockTeamService.getTeamByToken).toHaveBeenCalledWith('invalid-token') + expect(mockTeamService.getTeamByToken).toHaveBeenCalledTimes(1) }) }) describe('batch processing', () => { - beforeEach(() => { - teamService.getTeamByToken.mockResolvedValue(validTeam) - }) - it('processes multiple messages in order', async () => { const timestamp = Date.now() const messages = [ - createSessionRecordingMessage('token1', timestamp), - createSessionRecordingMessage('token2', timestamp + 1), + createMessage('token1', { metadata: { timestamp } }), + createMessage('token2', { metadata: { timestamp: timestamp + 1 } }), ] - const parsedMessages = [createParsedMessage(0, timestamp), createParsedMessage(1, timestamp + 1)] - - kafkaParser.parseMessage.mockResolvedValueOnce(parsedMessages[0]).mockResolvedValueOnce(parsedMessages[1]) + mockTeamService.getTeamByToken.mockResolvedValue(validTeam) - const result = await teamFilter.parseBatch(messages) + const result = await teamFilter.filterBatch(messages) expect(result).toEqual([ - { team: validTeam, message: parsedMessages[0] }, - { team: validTeam, message: parsedMessages[1] }, + { team: validTeam, message: messages[0] }, + { team: validTeam, message: messages[1] }, ]) - expect(teamService.getTeamByToken).toHaveBeenCalledWith('token1') - expect(teamService.getTeamByToken).toHaveBeenCalledWith('token2') - expect(teamService.getTeamByToken).toHaveBeenCalledTimes(2) + expect(mockTeamService.getTeamByToken).toHaveBeenCalledWith('token1') + expect(mockTeamService.getTeamByToken).toHaveBeenCalledWith('token2') + expect(mockTeamService.getTeamByToken).toHaveBeenCalledTimes(2) }) it('processes messages with different teams', async () => { const timestamp = Date.now() const messages = [ - createSessionRecordingMessage('token1', timestamp), - createSessionRecordingMessage('token2', timestamp + 1), + createMessage('token1', { metadata: { timestamp } }), + createMessage('token2', { metadata: { timestamp: timestamp + 1 } }), ] - const parsedMessages = [createParsedMessage(0, timestamp), createParsedMessage(1, timestamp + 1)] - const team2 = { ...validTeam, teamId: 2 } + mockTeamService.getTeamByToken.mockResolvedValueOnce(validTeam).mockResolvedValueOnce(team2) - teamService.getTeamByToken.mockResolvedValueOnce(validTeam).mockResolvedValueOnce(team2) - - kafkaParser.parseMessage.mockResolvedValueOnce(parsedMessages[0]).mockResolvedValueOnce(parsedMessages[1]) - - const result = await teamFilter.parseBatch(messages) + const result = await teamFilter.filterBatch(messages) expect(result).toEqual([ - { team: validTeam, message: parsedMessages[0] }, - { team: team2, message: parsedMessages[1] }, + { team: validTeam, message: messages[0] }, + { team: team2, message: messages[1] }, ]) - expect(teamService.getTeamByToken).toHaveBeenCalledWith('token1') - expect(teamService.getTeamByToken).toHaveBeenCalledWith('token2') - expect(teamService.getTeamByToken).toHaveBeenCalledTimes(2) + expect(mockTeamService.getTeamByToken).toHaveBeenCalledWith('token1') + expect(mockTeamService.getTeamByToken).toHaveBeenCalledWith('token2') + expect(mockTeamService.getTeamByToken).toHaveBeenCalledTimes(2) }) it('handles mixed valid and invalid messages in batch', async () => { const messages = [ - createSessionRecordingMessage('token1'), - createSessionRecordingMessage(), // No token - createSessionRecordingMessage('token2'), + createMessage('token1'), + createMessage(), // No token + createMessage('token2'), ] - kafkaParser.parseMessage - .mockResolvedValueOnce(createParsedMessage(0)) - .mockResolvedValueOnce(createParsedMessage(2)) + mockTeamService.getTeamByToken.mockResolvedValue(validTeam) - const result = await teamFilter.parseBatch(messages) + const result = await teamFilter.filterBatch(messages) - expect(result).toHaveLength(2) - expect(teamService.getTeamByToken).toHaveBeenCalledWith('token1') - expect(teamService.getTeamByToken).toHaveBeenCalledWith('token2') - expect(teamService.getTeamByToken).toHaveBeenCalledTimes(2) + expect(result).toEqual([ + { team: validTeam, message: messages[0] }, + { team: validTeam, message: messages[2] }, + ]) + expect(mockTeamService.getTeamByToken).toHaveBeenCalledWith('token1') + expect(mockTeamService.getTeamByToken).toHaveBeenCalledWith('token2') + expect(mockTeamService.getTeamByToken).toHaveBeenCalledTimes(2) }) }) }) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.test.ts index 145cb7d0f633e..b77de2187d775 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor.test.ts @@ -1,187 +1,79 @@ -import { Message, MessageHeader } from 'node-rdkafka' - import { MessageWithTeam } from '../../../../../src/main/ingestion-queues/session-recording-v2/teams/types' -import { - BatchMessageProcessor, - CaptureIngestionWarningFn, -} from '../../../../../src/main/ingestion-queues/session-recording-v2/types' import { LibVersionMonitor } from '../../../../../src/main/ingestion-queues/session-recording-v2/versions/lib-version-monitor' import { VersionMetrics } from '../../../../../src/main/ingestion-queues/session-recording-v2/versions/version-metrics' describe('LibVersionMonitor', () => { - let mockCaptureWarning: jest.MockedFunction - let mockMetrics: jest.Mocked - let mockSourceProcessor: jest.Mocked> - let monitor: LibVersionMonitor + let monitor: LibVersionMonitor + let mockCaptureWarning: jest.Mock + let mockVersionMetrics: jest.Mocked beforeEach(() => { - jest.clearAllMocks() mockCaptureWarning = jest.fn() - mockMetrics = { + mockVersionMetrics = { incrementLibVersionWarning: jest.fn(), - } as any - mockSourceProcessor = { - parseBatch: jest.fn(), - } - monitor = new LibVersionMonitor(mockSourceProcessor, mockCaptureWarning, mockMetrics) + } as unknown as jest.Mocked + monitor = new LibVersionMonitor(mockCaptureWarning, mockVersionMetrics) }) - describe('parseBatch', () => { - it('should process messages and return them unmodified', async () => { - const inputMessages: Message[] = [{ partition: 1 } as Message] - const processedMessages: MessageWithTeam[] = [ - { - team: { teamId: 1, consoleLogIngestionEnabled: true }, - message: { - distinct_id: 'test_id', - session_id: 'test_session', - eventsByWindowId: {}, - eventsRange: { start: 0, end: 0 }, - headers: [{ lib_version: '1.74.0' }] as MessageHeader[], - metadata: { - partition: 0, - topic: 'test', - rawSize: 0, - offset: 0, - timestamp: 0, - }, - }, - }, - ] - - mockSourceProcessor.parseBatch.mockResolvedValue(processedMessages) - - const result = await monitor.parseBatch(inputMessages) - expect(result).toBe(processedMessages) - expect(mockSourceProcessor.parseBatch).toHaveBeenCalledWith(inputMessages) - }) - - it('should trigger warning for old versions', async () => { - const inputMessages: Message[] = [{ partition: 1 } as Message] - const processedMessages: MessageWithTeam[] = [ - { - team: { teamId: 1, consoleLogIngestionEnabled: true }, - message: { - distinct_id: 'test_id', - session_id: 'test_session', - eventsByWindowId: {}, - eventsRange: { start: 0, end: 0 }, - headers: [{ lib_version: '1.74.0' }] as MessageHeader[], - metadata: { - partition: 0, - topic: 'test', - rawSize: 0, - offset: 0, - timestamp: 0, - }, - }, - }, - ] - - mockSourceProcessor.parseBatch.mockResolvedValue(processedMessages) - - await monitor.parseBatch(inputMessages) - - expect(mockMetrics.incrementLibVersionWarning).toHaveBeenCalled() - expect(mockCaptureWarning).toHaveBeenCalledWith( - 1, - 'replay_lib_version_too_old', - { - libVersion: '1.74.0', - parsedVersion: { major: 1, minor: 74 }, - }, - { key: '1.74.0' } - ) - }) - - it('should not trigger warning for newer versions', async () => { - const inputMessages: Message[] = [{ partition: 1 } as Message] - const processedMessages: MessageWithTeam[] = [ - { - team: { teamId: 1, consoleLogIngestionEnabled: true }, - message: { - distinct_id: 'test_id', - session_id: 'test_session', - eventsByWindowId: {}, - eventsRange: { start: 0, end: 0 }, - headers: [{ lib_version: '1.76.0' }] as MessageHeader[], - metadata: { - partition: 0, - topic: 'test', - rawSize: 0, - offset: 0, - timestamp: 0, - }, - }, - }, - ] - - mockSourceProcessor.parseBatch.mockResolvedValue(processedMessages) - - await monitor.parseBatch(inputMessages) - - expect(mockMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() - expect(mockCaptureWarning).not.toHaveBeenCalled() - }) - - it('should handle invalid version formats', async () => { - const inputMessages: Message[] = [{ partition: 1 } as Message] - const processedMessages: MessageWithTeam[] = [ - { - team: { teamId: 1, consoleLogIngestionEnabled: true }, - message: { - distinct_id: 'test_id', - session_id: 'test_session', - eventsByWindowId: {}, - eventsRange: { start: 0, end: 0 }, - headers: [{ lib_version: 'invalid' }] as MessageHeader[], - metadata: { - partition: 0, - topic: 'test', - rawSize: 0, - offset: 0, - timestamp: 0, - }, - }, - }, - ] + const createMessage = (headers: any[] = []): MessageWithTeam => ({ + team: { teamId: 1, consoleLogIngestionEnabled: false }, + message: { + metadata: { + partition: 1, + topic: 'test-topic', + offset: 1, + timestamp: 1234567890, + rawSize: 100, + }, + headers, + distinct_id: 'distinct_id', + session_id: 'session1', + eventsByWindowId: { window1: [] }, + eventsRange: { start: 0, end: 0 }, + }, + }) - mockSourceProcessor.parseBatch.mockResolvedValue(processedMessages) + it('should warn on old lib version (< 1.75.0)', async () => { + const message = createMessage([{ lib_version: '1.74.0' }]) + const result = await monitor.processBatch([message]) + + expect(result).toEqual([message]) + expect(mockVersionMetrics.incrementLibVersionWarning).toHaveBeenCalled() + expect(mockCaptureWarning).toHaveBeenCalledWith( + 1, + 'replay_lib_version_too_old', + { + libVersion: '1.74.0', + parsedVersion: { major: 1, minor: 74 }, + }, + { key: '1.74.0' } + ) + }) - await monitor.parseBatch(inputMessages) + it('should not warn on new lib version (>= 1.75.0)', async () => { + const message = createMessage([{ lib_version: '1.75.0' }]) + const result = await monitor.processBatch([message]) - expect(mockMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() - expect(mockCaptureWarning).not.toHaveBeenCalled() - }) + expect(result).toEqual([message]) + expect(mockVersionMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() + expect(mockCaptureWarning).not.toHaveBeenCalled() + }) - it('should handle missing version header', async () => { - const inputMessages: Message[] = [{ partition: 1 } as Message] - const processedMessages: MessageWithTeam[] = [ - { - team: { teamId: 1, consoleLogIngestionEnabled: true }, - message: { - distinct_id: 'test_id', - session_id: 'test_session', - eventsByWindowId: {}, - eventsRange: { start: 0, end: 0 }, - headers: [] as MessageHeader[], - metadata: { - partition: 0, - topic: 'test', - rawSize: 0, - offset: 0, - timestamp: 0, - }, - }, - }, - ] + it('should handle invalid lib version', async () => { + const message = createMessage([{ lib_version: 'invalid' }]) + const result = await monitor.processBatch([message]) - mockSourceProcessor.parseBatch.mockResolvedValue(processedMessages) + expect(result).toEqual([message]) + expect(mockVersionMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() + expect(mockCaptureWarning).not.toHaveBeenCalled() + }) - await monitor.parseBatch(inputMessages) + it('should handle missing lib version', async () => { + const message = createMessage() + const result = await monitor.processBatch([message]) - expect(mockMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() - expect(mockCaptureWarning).not.toHaveBeenCalled() - }) + expect(result).toEqual([message]) + expect(mockVersionMetrics.incrementLibVersionWarning).not.toHaveBeenCalled() + expect(mockCaptureWarning).not.toHaveBeenCalled() }) })