From 124f8603e983f2670c0763b873c6af1edba67c7f Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Wed, 6 Nov 2024 14:09:18 -0500 Subject: [PATCH 01/10] feat: Add handling of message sending limits by size in bytes --- packages/server/src/config/app.config.ts | 7 ++ .../websocket/schemas/StoreQueuedMessage.ts | 8 ++ .../websocket/services/MessagePersister.ts | 1 + .../server/src/websocket/websocket.service.ts | 82 ++++++++++++------- 4 files changed, 68 insertions(+), 30 deletions(-) diff --git a/packages/server/src/config/app.config.ts b/packages/server/src/config/app.config.ts index 5a2c484..c86dd1c 100644 --- a/packages/server/src/config/app.config.ts +++ b/packages/server/src/config/app.config.ts @@ -65,4 +65,11 @@ export default registerAs('appConfig', () => ({ *Allows set threshold time to execute messagePersist module on milisecond */ thresholdTimestamp: parseInt(process.env.THRESHOLD_TIMESTAMP) || 60000, + + /** + * The maximum total message size in bytes. + * Defaults to 1 MB (1 * 1024 * 1024 bytes) if MAX_TOTAL_MESSAGE_SIZE_MB is not set in the environment variables. + * @type {number} + */ + maxMessageSizeBytes: (parseInt(process.env.MAX_MESSAGE_SIZE_BYTES, 10) || 1) * 1024 * 1024, })) diff --git a/packages/server/src/websocket/schemas/StoreQueuedMessage.ts b/packages/server/src/websocket/schemas/StoreQueuedMessage.ts index 4905b05..810829f 100644 --- a/packages/server/src/websocket/schemas/StoreQueuedMessage.ts +++ b/packages/server/src/websocket/schemas/StoreQueuedMessage.ts @@ -31,6 +31,13 @@ export class StoreQueuedMessage extends Document { @Prop({ type: Object, required: true }) encryptedMessage: EncryptedMessage + /** + * The size Encrypted Message store in collection + * @type {number} + */ + @Prop() + encryptedMessageSize?: number + /** * The recipient keys (DIDs or other identifiers) associated with the message. * @type {string[]} @@ -44,6 +51,7 @@ export class StoreQueuedMessage extends Document { */ @Prop() state?: string + /** * The timestamp when the message was created. * Mongoose automatically creates this field when `timestamps: true` is set in the schema. diff --git a/packages/server/src/websocket/services/MessagePersister.ts b/packages/server/src/websocket/services/MessagePersister.ts index 647d777..53eb99c 100644 --- a/packages/server/src/websocket/services/MessagePersister.ts +++ b/packages/server/src/websocket/services/MessagePersister.ts @@ -62,6 +62,7 @@ export class MessagePersister { connectionId: message.connectionId, recipientKeys: message.recipientDids, encryptedMessage: message.encryptedMessage, + encryptedMessageSize: message.encryptedMessageSize, state: message.state, createdAt: new Date(message.receivedAt), }) diff --git a/packages/server/src/websocket/websocket.service.ts b/packages/server/src/websocket/websocket.service.ts index 729c99d..a3e10a9 100644 --- a/packages/server/src/websocket/websocket.service.ts +++ b/packages/server/src/websocket/websocket.service.ts @@ -65,54 +65,70 @@ export class WebsocketService { * @returns {Promise} - A promise that resolves to an array of queued messages. */ async takeFromQueue(dto: TakeFromQueueDto): Promise { - const { connectionId, limit = 10, recipientDid } = dto + const { connectionId, recipientDid } = dto + const maxMessageSizeBytes = this.configService.get('appConfig.maxMessageSizeBytes') + let currentSize = 0 // Accumulated size of messages in bytes + const combinedMessages: QueuedMessage[] = [] this.logger.debug('[takeFromQueue] Method called with DTO:', dto) try { - // Retrieve messages from Redis - const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, limit - 1) - const redisMessages: QueuedMessage[] = redisMessagesRaw.map((message) => { - const parsedMessage = JSON.parse(message) - - // Map Redis data to QueuedMessage type - return { - id: parsedMessage.messageId, - receivedAt: new Date(parsedMessage.receivedAt), - encryptedMessage: parsedMessage.encryptedMessage, - } - }) - - this.logger.debug( - `[takeFromQueue] Fetched ${redisMessages.length} messages from Redis for connectionId ${connectionId}`, - ) - - // Query MongoDB with the provided connectionId or recipientDid, and state 'pending' + // Step 1: Retrieve messages from MongoDB, respecting the accumulated size limit const mongoMessages = await this.queuedMessage .find({ $or: [{ connectionId }, { recipientKeys: recipientDid }], state: 'pending', }) .sort({ createdAt: 1 }) - .limit(limit) - .select({ messageId: 1, encryptedMessage: 1, createdAt: 1 }) + .select({ messageId: 1, encryptedMessage: 1, createdAt: 1, encryptedMessageSize: 1 }) .lean() .exec() - const mongoMappedMessages: QueuedMessage[] = mongoMessages.map((msg) => ({ - id: msg.messageId, - receivedAt: msg.createdAt, - encryptedMessage: msg.encryptedMessage, - })) + for (const msg of mongoMessages) { + const messageSize = msg.encryptedMessageSize || Buffer.byteLength(JSON.stringify(msg.encryptedMessage), 'utf8') + + // Check if adding this message would exceed the max message size limit + if (currentSize + messageSize > maxMessageSizeBytes) break + + // Add message to the result and update the accumulated size + combinedMessages.push({ + id: msg.messageId, + receivedAt: msg.createdAt, + encryptedMessage: msg.encryptedMessage, + }) + currentSize += messageSize + } + + // Step 2: Retrieve messages from Redis + const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, -1) + for (const message of redisMessagesRaw) { + const parsedMessage = JSON.parse(message) + const messageSize = + parsedMessage.sizeInBytes || Buffer.byteLength(JSON.stringify(parsedMessage.encryptedMessage), 'utf8') + + // Check if adding this message would exceed the max message size limit + if (currentSize + messageSize > maxMessageSizeBytes) break + + // Add message to the result and update the accumulated size + combinedMessages.push({ + id: parsedMessage.messageId, + receivedAt: new Date(parsedMessage.receivedAt), + encryptedMessage: parsedMessage.encryptedMessage, + }) + currentSize += messageSize + } this.logger.debug( - `[takeFromQueue] Fetched ${mongoMappedMessages.length} messages from MongoDB for connectionId ${connectionId}`, + `[takeFromQueue] Fetched ${combinedMessages.length} messages from Redis for connectionId ${connectionId}`, ) - // Combine messages from Redis and MongoDB - const combinedMessages: QueuedMessage[] = [...redisMessages, ...mongoMappedMessages] - this.logger.debug(`[takeFromQueue] combinedMessages for connectionId ${connectionId}: ${combinedMessages}`) + this.logger.debug( + `[takeFromQueue] Fetched ${combinedMessages.length} total messages (from Redis and MongoDB) in ${currentSize} MB for connectionId ${connectionId}`, + ) + this.logger.debug( + `[takeFromQueue] Total message size to be sent for connectionId ${connectionId}: ${currentSize} bytes`, + ) return combinedMessages } catch (error) { this.logger.error('[takeFromQueue] Error retrieving messages from Redis and MongoDB:', { @@ -178,6 +194,11 @@ export class WebsocketService { messageId = new ObjectId().toString() receivedAt = new Date() + // Calculate the size in bytes of the encrypted message to add database + const encryptedMessageSize = Buffer.byteLength(JSON.stringify(payload), 'utf8') + + this.logger.debug(`[addMessage] Size Encrypted Message ${encryptedMessageSize} `) + // Create a message object to store in Redis const messageData = { messageId, @@ -185,6 +206,7 @@ export class WebsocketService { recipientDids, encryptedMessage: payload, state: MessageState.pending, + encryptedMessageSize, receivedAt, } From d9bb53fce05e04d3408c389b543224b5f3d6d9be Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Wed, 6 Nov 2024 16:21:28 -0500 Subject: [PATCH 02/10] feat: Add handling of message limits based on message size --- .../server/src/websocket/websocket.service.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/server/src/websocket/websocket.service.ts b/packages/server/src/websocket/websocket.service.ts index a3e10a9..67ba882 100644 --- a/packages/server/src/websocket/websocket.service.ts +++ b/packages/server/src/websocket/websocket.service.ts @@ -88,7 +88,9 @@ export class WebsocketService { const messageSize = msg.encryptedMessageSize || Buffer.byteLength(JSON.stringify(msg.encryptedMessage), 'utf8') // Check if adding this message would exceed the max message size limit - if (currentSize + messageSize > maxMessageSizeBytes) break + if (currentSize + messageSize > maxMessageSizeBytes) { + return combinedMessages + } // Add message to the result and update the accumulated size combinedMessages.push({ @@ -101,13 +103,16 @@ export class WebsocketService { // Step 2: Retrieve messages from Redis const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, -1) + for (const message of redisMessagesRaw) { const parsedMessage = JSON.parse(message) const messageSize = parsedMessage.sizeInBytes || Buffer.byteLength(JSON.stringify(parsedMessage.encryptedMessage), 'utf8') // Check if adding this message would exceed the max message size limit - if (currentSize + messageSize > maxMessageSizeBytes) break + if (currentSize + messageSize > maxMessageSizeBytes) { + return combinedMessages + } // Add message to the result and update the accumulated size combinedMessages.push({ @@ -119,11 +124,7 @@ export class WebsocketService { } this.logger.debug( - `[takeFromQueue] Fetched ${combinedMessages.length} messages from Redis for connectionId ${connectionId}`, - ) - - this.logger.debug( - `[takeFromQueue] Fetched ${combinedMessages.length} total messages (from Redis and MongoDB) in ${currentSize} MB for connectionId ${connectionId}`, + `[takeFromQueue] Fetched ${combinedMessages.length} total messages for connectionId ${connectionId}`, ) this.logger.debug( From 47f917b752356076b26ec5e12105dc8af5eb28f4 Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Wed, 6 Nov 2024 16:43:59 -0500 Subject: [PATCH 03/10] fix: fix unit test takeFromQueue method --- packages/server/src/websocket/websocket.service.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/server/src/websocket/websocket.service.spec.ts b/packages/server/src/websocket/websocket.service.spec.ts index 039df06..0a4b3f1 100644 --- a/packages/server/src/websocket/websocket.service.spec.ts +++ b/packages/server/src/websocket/websocket.service.spec.ts @@ -76,7 +76,7 @@ describe('WebsocketService', () => { jest.spyOn(redisMock, 'lrange').mockResolvedValue([ JSON.stringify({ id: '1', // Usar 'id' en lugar de 'messageId' - encryptedMessage: 'test-message-1', + encryptedMessage: 'test-message-2', receivedAt: new Date().toISOString(), }), ]) @@ -85,7 +85,7 @@ describe('WebsocketService', () => { storeQueuedMessageMock.exec.mockResolvedValue([ { id: '2', // MongoDB usa _id por defecto - encryptedMessage: 'test-message-2', + encryptedMessage: 'test-message-1', createdAt: new Date(), }, ]) From ca76b4dba5a2740225b15bf02e14c6e99a9f8949 Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Thu, 7 Nov 2024 15:01:18 -0500 Subject: [PATCH 04/10] feat: feat: separate message retrieval into limit-based and size-based functions --- .../dto/messagerepository-websocket.dto.ts | 6 +- .../src/websocket/websocket.service.spec.ts | 41 +++- .../server/src/websocket/websocket.service.ts | 219 ++++++++++++------ 3 files changed, 194 insertions(+), 72 deletions(-) diff --git a/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts b/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts index b65dce8..3a669f4 100644 --- a/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts +++ b/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts @@ -25,7 +25,11 @@ export class TakeFromQueueDto { @IsOptional() @IsInt() - limit: number + limit?: number + + @IsOptional() + @IsInt() + limitSize?: number @IsOptional() @IsBoolean() diff --git a/packages/server/src/websocket/websocket.service.spec.ts b/packages/server/src/websocket/websocket.service.spec.ts index 0a4b3f1..94f6cc8 100644 --- a/packages/server/src/websocket/websocket.service.spec.ts +++ b/packages/server/src/websocket/websocket.service.spec.ts @@ -71,7 +71,46 @@ describe('WebsocketService', () => { expect(service).toBeDefined() }) - it('should takeFromQueue (Redis and MongoDB)', async () => { + it('should takeFromQueue (Redis and MongoDB) with size message', async () => { + // Mock configuration for Redis + jest.spyOn(redisMock, 'lrange').mockResolvedValue([ + JSON.stringify({ + id: '1', // Usar 'id' en lugar de 'messageId' + encryptedMessage: 'test-message-2', + receivedAt: new Date().toISOString(), + }), + ]) + + // Mock configuration for MongoDB + storeQueuedMessageMock.exec.mockResolvedValue([ + { + id: '2', // MongoDB usa _id por defecto + encryptedMessage: 'test-message-1', + createdAt: new Date(), + }, + ]) + + // Execute the takeFromQueue method + const result = await service.takeFromQueue({ + connectionId: 'test-connection-id', + id: '', + limitSize: 1, + }) + + // Verify that Redis and MongoDB calls were made + expect(redisMock.lrange).toHaveBeenCalledWith('connectionId:test-connection-id:queuemessages', 0, -1) + expect(storeQueuedMessageMock.find).toHaveBeenCalledWith({ + $or: [{ connectionId: 'test-connection-id' }, { recipientKeys: undefined }], + state: 'pending', + }) + + // Verify the combined result from Redis and MongoDB + expect(result).toHaveLength(2) + expect(result[0].encryptedMessage).toBe('test-message-1') // Verifica por 'id' + expect(result[1].encryptedMessage).toBe('test-message-2') // Verifica por 'id' + }) + + it('should takeFromQueue (Redis and MongoDB) with limit message', async () => { // Mock configuration for Redis jest.spyOn(redisMock, 'lrange').mockResolvedValue([ JSON.stringify({ diff --git a/packages/server/src/websocket/websocket.service.ts b/packages/server/src/websocket/websocket.service.ts index 67ba882..ac7ad09 100644 --- a/packages/server/src/websocket/websocket.service.ts +++ b/packages/server/src/websocket/websocket.service.ts @@ -65,79 +65,11 @@ export class WebsocketService { * @returns {Promise} - A promise that resolves to an array of queued messages. */ async takeFromQueue(dto: TakeFromQueueDto): Promise { - const { connectionId, recipientDid } = dto - const maxMessageSizeBytes = this.configService.get('appConfig.maxMessageSizeBytes') - let currentSize = 0 // Accumulated size of messages in bytes - const combinedMessages: QueuedMessage[] = [] + const { limitSize } = dto this.logger.debug('[takeFromQueue] Method called with DTO:', dto) - try { - // Step 1: Retrieve messages from MongoDB, respecting the accumulated size limit - const mongoMessages = await this.queuedMessage - .find({ - $or: [{ connectionId }, { recipientKeys: recipientDid }], - state: 'pending', - }) - .sort({ createdAt: 1 }) - .select({ messageId: 1, encryptedMessage: 1, createdAt: 1, encryptedMessageSize: 1 }) - .lean() - .exec() - - for (const msg of mongoMessages) { - const messageSize = msg.encryptedMessageSize || Buffer.byteLength(JSON.stringify(msg.encryptedMessage), 'utf8') - - // Check if adding this message would exceed the max message size limit - if (currentSize + messageSize > maxMessageSizeBytes) { - return combinedMessages - } - - // Add message to the result and update the accumulated size - combinedMessages.push({ - id: msg.messageId, - receivedAt: msg.createdAt, - encryptedMessage: msg.encryptedMessage, - }) - currentSize += messageSize - } - - // Step 2: Retrieve messages from Redis - const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, -1) - - for (const message of redisMessagesRaw) { - const parsedMessage = JSON.parse(message) - const messageSize = - parsedMessage.sizeInBytes || Buffer.byteLength(JSON.stringify(parsedMessage.encryptedMessage), 'utf8') - - // Check if adding this message would exceed the max message size limit - if (currentSize + messageSize > maxMessageSizeBytes) { - return combinedMessages - } - - // Add message to the result and update the accumulated size - combinedMessages.push({ - id: parsedMessage.messageId, - receivedAt: new Date(parsedMessage.receivedAt), - encryptedMessage: parsedMessage.encryptedMessage, - }) - currentSize += messageSize - } - - this.logger.debug( - `[takeFromQueue] Fetched ${combinedMessages.length} total messages for connectionId ${connectionId}`, - ) - - this.logger.debug( - `[takeFromQueue] Total message size to be sent for connectionId ${connectionId}: ${currentSize} bytes`, - ) - return combinedMessages - } catch (error) { - this.logger.error('[takeFromQueue] Error retrieving messages from Redis and MongoDB:', { - connectionId, - error: error.message, - }) - return [] - } + return limitSize ? await this.takeMessagesWithSize(dto) : await this.takeMessagesWithLimit(dto) } /** @@ -721,4 +653,151 @@ export class WebsocketService { }) } } + + /** + * Retrieves messages from both Redis and MongoDB up to a specified message count limit. + * + * @param {TakeFromQueueDto} dto - Data transfer object containing query parameters. + * @returns {Promise} - A promise that resolves to an array of queued messages. + */ + private async takeMessagesWithLimit(dto: TakeFromQueueDto): Promise { + const { connectionId, limit, recipientDid } = dto + + this.logger.debug('[takeMessagesWithLimit] Method called with DTO:', dto) + + try { + // Query MongoDB with the provided connectionId or recipientDid, and state 'pending' + const mongoMessages = await this.queuedMessage + .find({ + $or: [{ connectionId }, { recipientKeys: recipientDid }], + state: 'pending', + }) + .sort({ createdAt: 1 }) + .limit(limit) + .select({ messageId: 1, encryptedMessage: 1, createdAt: 1 }) + .lean() + .exec() + + const mongoMappedMessages: QueuedMessage[] = mongoMessages.map((msg) => ({ + id: msg.messageId, + receivedAt: msg.createdAt, + encryptedMessage: msg.encryptedMessage, + })) + + this.logger.debug( + `[takeMessagesWithLimit] Fetched ${mongoMappedMessages.length} messages from MongoDB for connectionId ${connectionId}`, + ) + + // Retrieve messages from Redis + const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, limit - 1) + const redisMessages: QueuedMessage[] = redisMessagesRaw.map((message) => { + const parsedMessage = JSON.parse(message) + + // Map Redis data to QueuedMessage type + return { + id: parsedMessage.messageId, + receivedAt: new Date(parsedMessage.receivedAt), + encryptedMessage: parsedMessage.encryptedMessage, + } + }) + + this.logger.debug( + `[takeMessagesWithLimit] Fetched ${redisMessages.length} messages from Redis for connectionId ${connectionId}`, + ) + // Combine messages from Redis and MongoDB + const combinedMessages: QueuedMessage[] = [...mongoMappedMessages, ...redisMessages] + + this.logger.debug( + `[takeMessagesWithLimit] combinedMessages for connectionId ${connectionId}: ${combinedMessages}`, + ) + + return combinedMessages + } catch (error) { + this.logger.error('[takeMessagesWithLimit] Error retrieving messages from Redis and MongoDB:', { + connectionId, + error: error.message, + }) + return [] + } + } + + /** + * Retrieves messages from both Redis and MongoDB up to a specified total size in bytes. + * + * @param {TakeFromQueueDto} dto - Data transfer object containing query parameters. + * @returns {Promise} - A promise that resolves to an array of queued messages. + */ + private async takeMessagesWithSize(dto: TakeFromQueueDto): Promise { + const { connectionId, recipientDid, limitSize } = dto + const maxMessageSizeBytes = limitSize * 1024 * 1024 + let currentSize = 0 + const combinedMessages: QueuedMessage[] = [] + + try { + // Step 1: Retrieve messages from MongoDB with size limit + const mongoMessages = await this.queuedMessage + .find({ + $or: [{ connectionId }, { recipientKeys: recipientDid }], + state: 'pending', + }) + .sort({ createdAt: 1 }) + .select({ messageId: 1, encryptedMessage: 1, createdAt: 1, encryptedMessageSize: 1 }) + .lean() + .exec() + + for (const msg of mongoMessages) { + const messageSize = msg.encryptedMessageSize || Buffer.byteLength(JSON.stringify(msg.encryptedMessage), 'utf8') + + if (currentSize + messageSize > maxMessageSizeBytes) break + + combinedMessages.push({ + id: msg.messageId, + receivedAt: msg.createdAt, + encryptedMessage: msg.encryptedMessage, + }) + currentSize += messageSize + } + + // Skip Redis if size limit reached + if (currentSize >= maxMessageSizeBytes) { + this.logger.debug( + `[takeMessagesWithSize] Size limit reached with MongoDB messages for connectionId ${connectionId}`, + ) + return combinedMessages + } + + // Step 2: Retrieve messages from Redis with size limit + const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, -1) + + for (const message of redisMessagesRaw) { + const parsedMessage = JSON.parse(message) + const messageSize = + parsedMessage.sizeInBytes || Buffer.byteLength(JSON.stringify(parsedMessage.encryptedMessage), 'utf8') + + if (currentSize + messageSize > maxMessageSizeBytes) break + + combinedMessages.push({ + id: parsedMessage.messageId, + receivedAt: new Date(parsedMessage.receivedAt), + encryptedMessage: parsedMessage.encryptedMessage, + }) + currentSize += messageSize + } + + this.logger.debug( + `[takeMessagesWithSize] Fetched ${combinedMessages.length} total messages for connectionId ${connectionId}`, + ) + + this.logger.debug( + `[takeMessagesWithSize] Total message size to be sent for connectionId ${connectionId}: ${currentSize} bytes`, + ) + + return combinedMessages + } catch (error) { + this.logger.error( + `[takeMessagesWithSize] Error retrieving messages for connectionId ${connectionId}: ${error.message}`, + ) + return [] + } + } } From 61c80fcae12bc2eb2f97225d32804f89b71b7e46 Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Fri, 8 Nov 2024 09:54:46 -0500 Subject: [PATCH 05/10] Update packages/server/src/websocket/dto/messagerepository-websocket.dto.ts Co-authored-by: Ariel Gentile --- .../server/src/websocket/dto/messagerepository-websocket.dto.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts b/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts index 3a669f4..61ba074 100644 --- a/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts +++ b/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts @@ -29,7 +29,7 @@ export class TakeFromQueueDto { @IsOptional() @IsInt() - limitSize?: number + limitBytes?: number @IsOptional() @IsBoolean() From d3e04d2d21eec9937e9e7e350aa1656a5aaee8bd Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Fri, 8 Nov 2024 09:54:54 -0500 Subject: [PATCH 06/10] Update packages/server/src/websocket/schemas/StoreQueuedMessage.ts Co-authored-by: Ariel Gentile --- packages/server/src/websocket/schemas/StoreQueuedMessage.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/server/src/websocket/schemas/StoreQueuedMessage.ts b/packages/server/src/websocket/schemas/StoreQueuedMessage.ts index 810829f..f3b8ceb 100644 --- a/packages/server/src/websocket/schemas/StoreQueuedMessage.ts +++ b/packages/server/src/websocket/schemas/StoreQueuedMessage.ts @@ -36,7 +36,7 @@ export class StoreQueuedMessage extends Document { * @type {number} */ @Prop() - encryptedMessageSize?: number + encryptedMessageByteCount?: number /** * The recipient keys (DIDs or other identifiers) associated with the message. From 7c6493eaf78df43cac81d97a5b6cb735f2515484 Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Fri, 8 Nov 2024 09:55:10 -0500 Subject: [PATCH 07/10] Update packages/server/src/websocket/websocket.service.ts Co-authored-by: Ariel Gentile --- packages/server/src/websocket/websocket.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/server/src/websocket/websocket.service.ts b/packages/server/src/websocket/websocket.service.ts index ac7ad09..2cc485d 100644 --- a/packages/server/src/websocket/websocket.service.ts +++ b/packages/server/src/websocket/websocket.service.ts @@ -660,7 +660,7 @@ export class WebsocketService { * @param {TakeFromQueueDto} dto - Data transfer object containing query parameters. * @returns {Promise} - A promise that resolves to an array of queued messages. */ - private async takeMessagesWithLimit(dto: TakeFromQueueDto): Promise { + private async takeMessagesWithMessageCountLimit(dto: TakeFromQueueDto): Promise { const { connectionId, limit, recipientDid } = dto this.logger.debug('[takeMessagesWithLimit] Method called with DTO:', dto) From 4fdee2e1f72f7809857ec039cafbd4b301c8a505 Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Fri, 8 Nov 2024 09:55:22 -0500 Subject: [PATCH 08/10] Update packages/server/src/websocket/websocket.service.ts Co-authored-by: Ariel Gentile --- packages/server/src/websocket/websocket.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/server/src/websocket/websocket.service.ts b/packages/server/src/websocket/websocket.service.ts index 2cc485d..2caa1e7 100644 --- a/packages/server/src/websocket/websocket.service.ts +++ b/packages/server/src/websocket/websocket.service.ts @@ -727,7 +727,7 @@ export class WebsocketService { * @param {TakeFromQueueDto} dto - Data transfer object containing query parameters. * @returns {Promise} - A promise that resolves to an array of queued messages. */ - private async takeMessagesWithSize(dto: TakeFromQueueDto): Promise { + private async takeMessagesWithByteCountLimit(dto: TakeFromQueueDto): Promise { const { connectionId, recipientDid, limitSize } = dto const maxMessageSizeBytes = limitSize * 1024 * 1024 let currentSize = 0 From 1ea7b55415e55b870c40be3a7e9bf9bbbc965ce6 Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Fri, 8 Nov 2024 12:58:22 -0500 Subject: [PATCH 09/10] feat: add changes requested about MPR client and use maxReceiveBytes and limitBytes --- docs/message-pickup-repository-client.md | 1 + .../src/MessagePickupRepositoryClient.ts | 34 +++++++++++++------ packages/client/src/interfaces.ts | 6 +++- packages/server/src/config/app.config.ts | 7 ---- .../src/websocket/websocket.service.spec.ts | 2 +- .../server/src/websocket/websocket.service.ts | 34 +++++++++++++------ 6 files changed, 54 insertions(+), 30 deletions(-) diff --git a/docs/message-pickup-repository-client.md b/docs/message-pickup-repository-client.md index 7eaf41b..79f1810 100644 --- a/docs/message-pickup-repository-client.md +++ b/docs/message-pickup-repository-client.md @@ -63,6 +63,7 @@ Retrieves messages from the queue. - `connectionId: string`: ID of the connection. - `recipientDid?: string`: Optional DID of the recipient. - `limit?: number`: Optional limit on the number of messages. + - `limitBytes?: number`: Optional byte size limit for retrieving messages - `deleteMessages?: boolean`: Whether to delete the messages after retrieval. - **Returns**: `Promise` diff --git a/packages/client/src/MessagePickupRepositoryClient.ts b/packages/client/src/MessagePickupRepositoryClient.ts index 636e2ab..e68b69f 100644 --- a/packages/client/src/MessagePickupRepositoryClient.ts +++ b/packages/client/src/MessagePickupRepositoryClient.ts @@ -5,6 +5,7 @@ import { ConnectionIdOptions, AddLiveSessionOptions, MessagesReceivedCallbackParams, + ExtendedTakeFromQueueOptions, } from './interfaces' import { AddMessageOptions, @@ -12,7 +13,6 @@ import { MessagePickupRepository, QueuedMessage, RemoveMessagesOptions, - TakeFromQueueOptions, } from '@credo-ts/core' log.setLevel('info') @@ -22,7 +22,10 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository { private readonly logger = log private messagesReceivedCallback: ((data: MessagesReceivedCallbackParams) => void) | null = null - constructor(private readonly url: string) {} + constructor( + private readonly url: string, + private readonly maxReceiveBytes?: number, + ) {} /** * Connect to the WebSocket server. @@ -90,21 +93,32 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository { } /** - * Call the 'takeFromQueue' RPC method. - * This method sends a request to the WebSocket server to take messages from the queue. - * It expects the response to be an array of `QueuedMessage` objects. + * Calls the 'takeFromQueue' RPC method on the WebSocket server. + * This method sends a request to retrieve messages from the queue for the specified connection. + * It can retrieve messages up to a specified byte limit (`limitBytes`) or by a count limit (`limit`). + * The response is expected to be an array of `QueuedMessage` objects. * - * @param {TakeFromQueueOptions} params - The parameters to pass to the 'takeFromQueue' method, including: + * @param {ExtendedTakeFromQueueOptions} params - The parameters to pass to the 'takeFromQueue' method, including: * @property {string} connectionId - The ID of the connection from which to take messages. * @property {string} [recipientDid] - Optional DID of the recipient to filter messages by. - * @property {number} [limit] - Optional maximum number of messages to take from the queue. + * @property {number} [limit] - Optional maximum number of messages to take from the queue. Ignored if `limitBytes` is set. + * @property {number} [limitBytes] - Optional maximum cumulative byte size of messages to retrieve. * @property {boolean} [deleteMessages] - Optional flag indicating whether to delete the messages after retrieving them. - * @returns {Promise} - The result from the WebSocket server, expected to be an array of `QueuedMessage`. - * @throws Will throw an error if the result is not an array or if there's any issue with the WebSocket call. + * If provided, limits the retrieval by the total byte size of messages rather than by count. + * + * @returns {Promise} - A promise that resolves to an array of `QueuedMessage` objects from the WebSocket server. + * @throws {Error} Will throw an error if the result is not an array of `QueuedMessage` objects, + * or if any issue occurs with the WebSocket call. */ - async takeFromQueue(params: TakeFromQueueOptions): Promise { + async takeFromQueue(params: ExtendedTakeFromQueueOptions): Promise { try { const client = this.checkClient() + + // Add limitBytes to params if maxReceiveBytes is set + if (this.maxReceiveBytes) { + params = { ...params, limitBytes: this.maxReceiveBytes } + } + // Call the RPC method and store the result as 'unknown' type initially const result: unknown = await client.call('takeFromQueue', params, 2000) diff --git a/packages/client/src/interfaces.ts b/packages/client/src/interfaces.ts index 4270b27..95ef12f 100644 --- a/packages/client/src/interfaces.ts +++ b/packages/client/src/interfaces.ts @@ -1,4 +1,4 @@ -import { QueuedMessage } from '@credo-ts/core' +import { QueuedMessage, TakeFromQueueOptions } from '@credo-ts/core' export interface RemoveAllMessagesOptions { connectionId: string @@ -18,3 +18,7 @@ export interface MessagesReceivedCallbackParams { connectionId: string messages: QueuedMessage[] } + +export interface ExtendedTakeFromQueueOptions extends TakeFromQueueOptions { + limitBytes?: number +} diff --git a/packages/server/src/config/app.config.ts b/packages/server/src/config/app.config.ts index c86dd1c..5a2c484 100644 --- a/packages/server/src/config/app.config.ts +++ b/packages/server/src/config/app.config.ts @@ -65,11 +65,4 @@ export default registerAs('appConfig', () => ({ *Allows set threshold time to execute messagePersist module on milisecond */ thresholdTimestamp: parseInt(process.env.THRESHOLD_TIMESTAMP) || 60000, - - /** - * The maximum total message size in bytes. - * Defaults to 1 MB (1 * 1024 * 1024 bytes) if MAX_TOTAL_MESSAGE_SIZE_MB is not set in the environment variables. - * @type {number} - */ - maxMessageSizeBytes: (parseInt(process.env.MAX_MESSAGE_SIZE_BYTES, 10) || 1) * 1024 * 1024, })) diff --git a/packages/server/src/websocket/websocket.service.spec.ts b/packages/server/src/websocket/websocket.service.spec.ts index 94f6cc8..7c861fa 100644 --- a/packages/server/src/websocket/websocket.service.spec.ts +++ b/packages/server/src/websocket/websocket.service.spec.ts @@ -94,7 +94,7 @@ describe('WebsocketService', () => { const result = await service.takeFromQueue({ connectionId: 'test-connection-id', id: '', - limitSize: 1, + limitBytes: 1000000, }) // Verify that Redis and MongoDB calls were made diff --git a/packages/server/src/websocket/websocket.service.ts b/packages/server/src/websocket/websocket.service.ts index 2caa1e7..79de19b 100644 --- a/packages/server/src/websocket/websocket.service.ts +++ b/packages/server/src/websocket/websocket.service.ts @@ -55,21 +55,32 @@ export class WebsocketService { /** * Retrieves messages from both Redis and MongoDB based on the provided criteria. - * This method retrieves messages from Redis for the specified connection ID, as well as messages stored in MongoDB. + * Depending on the specified criteria, this method will retrieve messages either + * by a byte size limit or by a count limit, pulling messages from both Redis and MongoDB. + * + * If `limitBytes` is defined in the DTO, messages will be retrieved up to the specified byte limit + * using `takeMessagesWithByteCountLimit`. Otherwise, messages will be retrieved by a count limit + * using `takeMessagesWithMessageCountLimit`. * * @param {TakeFromQueueDto} dto - Data transfer object containing the query parameters. * @param {string} dto.connectionId - The unique identifier of the connection. - * @param {number} [dto.limit] - Optional limit on the number of messages to retrieve. + * @param {number} [dto.limit] - Optional limit on the number of messages to retrieve if `limitBytes` is not specified. + * @param {number} [dto.limitBytes] - Optional byte size limit for retrieving messages. * @param {boolean} [dto.deleteMessages] - Optional flag to determine if messages should be deleted after retrieval. * @param {string} [dto.recipientDid] - Optional recipient identifier for filtering messages. + * When set, retrieval is based on the cumulative byte size of messages rather than the count. + * * @returns {Promise} - A promise that resolves to an array of queued messages. + * The array will contain messages retrieved either by byte size or by count, based on the criteria provided. */ async takeFromQueue(dto: TakeFromQueueDto): Promise { - const { limitSize } = dto + const { limitBytes } = dto this.logger.debug('[takeFromQueue] Method called with DTO:', dto) - return limitSize ? await this.takeMessagesWithSize(dto) : await this.takeMessagesWithLimit(dto) + return limitBytes + ? await this.takeMessagesWithByteCountLimit(dto) + : await this.takeMessagesWithMessageCountLimit(dto) } /** @@ -128,9 +139,9 @@ export class WebsocketService { receivedAt = new Date() // Calculate the size in bytes of the encrypted message to add database - const encryptedMessageSize = Buffer.byteLength(JSON.stringify(payload), 'utf8') + const encryptedMessageByteCount = Buffer.byteLength(JSON.stringify(payload), 'utf8') - this.logger.debug(`[addMessage] Size Encrypted Message ${encryptedMessageSize} `) + this.logger.debug(`[addMessage] Size Encrypted Message ${encryptedMessageByteCount} `) // Create a message object to store in Redis const messageData = { @@ -139,7 +150,7 @@ export class WebsocketService { recipientDids, encryptedMessage: payload, state: MessageState.pending, - encryptedMessageSize, + encryptedMessageByteCount, receivedAt, } @@ -728,8 +739,8 @@ export class WebsocketService { * @returns {Promise} - A promise that resolves to an array of queued messages. */ private async takeMessagesWithByteCountLimit(dto: TakeFromQueueDto): Promise { - const { connectionId, recipientDid, limitSize } = dto - const maxMessageSizeBytes = limitSize * 1024 * 1024 + const { connectionId, recipientDid, limitBytes } = dto + const maxMessageSizeBytes = limitBytes let currentSize = 0 const combinedMessages: QueuedMessage[] = [] @@ -741,12 +752,13 @@ export class WebsocketService { state: 'pending', }) .sort({ createdAt: 1 }) - .select({ messageId: 1, encryptedMessage: 1, createdAt: 1, encryptedMessageSize: 1 }) + .select({ messageId: 1, encryptedMessage: 1, createdAt: 1, encryptedMessageByteCount: 1 }) .lean() .exec() for (const msg of mongoMessages) { - const messageSize = msg.encryptedMessageSize || Buffer.byteLength(JSON.stringify(msg.encryptedMessage), 'utf8') + const messageSize = + msg.encryptedMessageByteCount || Buffer.byteLength(JSON.stringify(msg.encryptedMessage), 'utf8') if (currentSize + messageSize > maxMessageSizeBytes) break From 2db9406d8f68077020dfe6da9d2192e55d3d9996 Mon Sep 17 00:00:00 2001 From: Gabriel Mata Date: Fri, 8 Nov 2024 14:37:37 -0500 Subject: [PATCH 10/10] feat: update constructor to use options object for initialization - Modified the constructor of `MessagePickupRepositoryClient` to accept a single `options` object. - Refactored `url` and `maxReceiveBytes` to be properties within the `options` object for more flexible parameter handling. - Ensured `url` and `maxReceiveBytes` remain `readonly` properties in the class. --- packages/client/src/MessagePickupRepositoryClient.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/client/src/MessagePickupRepositoryClient.ts b/packages/client/src/MessagePickupRepositoryClient.ts index e68b69f..63d8d58 100644 --- a/packages/client/src/MessagePickupRepositoryClient.ts +++ b/packages/client/src/MessagePickupRepositoryClient.ts @@ -21,11 +21,13 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository { private client?: Client private readonly logger = log private messagesReceivedCallback: ((data: MessagesReceivedCallbackParams) => void) | null = null + private readonly url: string + private readonly maxReceiveBytes?: number - constructor( - private readonly url: string, - private readonly maxReceiveBytes?: number, - ) {} + constructor(options: { url: string; maxReceiveBytes?: number }) { + this.url = options.url + this.maxReceiveBytes = options.maxReceiveBytes + } /** * Connect to the WebSocket server.