diff --git a/docs/message-pickup-repository-client.md b/docs/message-pickup-repository-client.md index 7eaf41b..79f1810 100644 --- a/docs/message-pickup-repository-client.md +++ b/docs/message-pickup-repository-client.md @@ -63,6 +63,7 @@ Retrieves messages from the queue. - `connectionId: string`: ID of the connection. - `recipientDid?: string`: Optional DID of the recipient. - `limit?: number`: Optional limit on the number of messages. + - `limitBytes?: number`: Optional byte size limit for retrieving messages - `deleteMessages?: boolean`: Whether to delete the messages after retrieval. - **Returns**: `Promise` diff --git a/packages/client/src/MessagePickupRepositoryClient.ts b/packages/client/src/MessagePickupRepositoryClient.ts index 636e2ab..63d8d58 100644 --- a/packages/client/src/MessagePickupRepositoryClient.ts +++ b/packages/client/src/MessagePickupRepositoryClient.ts @@ -5,6 +5,7 @@ import { ConnectionIdOptions, AddLiveSessionOptions, MessagesReceivedCallbackParams, + ExtendedTakeFromQueueOptions, } from './interfaces' import { AddMessageOptions, @@ -12,7 +13,6 @@ import { MessagePickupRepository, QueuedMessage, RemoveMessagesOptions, - TakeFromQueueOptions, } from '@credo-ts/core' log.setLevel('info') @@ -21,8 +21,13 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository { private client?: Client private readonly logger = log private messagesReceivedCallback: ((data: MessagesReceivedCallbackParams) => void) | null = null + private readonly url: string + private readonly maxReceiveBytes?: number - constructor(private readonly url: string) {} + constructor(options: { url: string; maxReceiveBytes?: number }) { + this.url = options.url + this.maxReceiveBytes = options.maxReceiveBytes + } /** * Connect to the WebSocket server. @@ -90,21 +95,32 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository { } /** - * Call the 'takeFromQueue' RPC method. - * This method sends a request to the WebSocket server to take messages from the queue. - * It expects the response to be an array of `QueuedMessage` objects. + * Calls the 'takeFromQueue' RPC method on the WebSocket server. + * This method sends a request to retrieve messages from the queue for the specified connection. + * It can retrieve messages up to a specified byte limit (`limitBytes`) or by a count limit (`limit`). + * The response is expected to be an array of `QueuedMessage` objects. * - * @param {TakeFromQueueOptions} params - The parameters to pass to the 'takeFromQueue' method, including: + * @param {ExtendedTakeFromQueueOptions} params - The parameters to pass to the 'takeFromQueue' method, including: * @property {string} connectionId - The ID of the connection from which to take messages. * @property {string} [recipientDid] - Optional DID of the recipient to filter messages by. - * @property {number} [limit] - Optional maximum number of messages to take from the queue. + * @property {number} [limit] - Optional maximum number of messages to take from the queue. Ignored if `limitBytes` is set. + * @property {number} [limitBytes] - Optional maximum cumulative byte size of messages to retrieve. * @property {boolean} [deleteMessages] - Optional flag indicating whether to delete the messages after retrieving them. - * @returns {Promise} - The result from the WebSocket server, expected to be an array of `QueuedMessage`. - * @throws Will throw an error if the result is not an array or if there's any issue with the WebSocket call. + * If provided, limits the retrieval by the total byte size of messages rather than by count. + * + * @returns {Promise} - A promise that resolves to an array of `QueuedMessage` objects from the WebSocket server. + * @throws {Error} Will throw an error if the result is not an array of `QueuedMessage` objects, + * or if any issue occurs with the WebSocket call. */ - async takeFromQueue(params: TakeFromQueueOptions): Promise { + async takeFromQueue(params: ExtendedTakeFromQueueOptions): Promise { try { const client = this.checkClient() + + // Add limitBytes to params if maxReceiveBytes is set + if (this.maxReceiveBytes) { + params = { ...params, limitBytes: this.maxReceiveBytes } + } + // Call the RPC method and store the result as 'unknown' type initially const result: unknown = await client.call('takeFromQueue', params, 2000) diff --git a/packages/client/src/interfaces.ts b/packages/client/src/interfaces.ts index 4270b27..95ef12f 100644 --- a/packages/client/src/interfaces.ts +++ b/packages/client/src/interfaces.ts @@ -1,4 +1,4 @@ -import { QueuedMessage } from '@credo-ts/core' +import { QueuedMessage, TakeFromQueueOptions } from '@credo-ts/core' export interface RemoveAllMessagesOptions { connectionId: string @@ -18,3 +18,7 @@ export interface MessagesReceivedCallbackParams { connectionId: string messages: QueuedMessage[] } + +export interface ExtendedTakeFromQueueOptions extends TakeFromQueueOptions { + limitBytes?: number +} diff --git a/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts b/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts index b65dce8..61ba074 100644 --- a/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts +++ b/packages/server/src/websocket/dto/messagerepository-websocket.dto.ts @@ -25,7 +25,11 @@ export class TakeFromQueueDto { @IsOptional() @IsInt() - limit: number + limit?: number + + @IsOptional() + @IsInt() + limitBytes?: number @IsOptional() @IsBoolean() diff --git a/packages/server/src/websocket/schemas/StoreQueuedMessage.ts b/packages/server/src/websocket/schemas/StoreQueuedMessage.ts index 4905b05..f3b8ceb 100644 --- a/packages/server/src/websocket/schemas/StoreQueuedMessage.ts +++ b/packages/server/src/websocket/schemas/StoreQueuedMessage.ts @@ -31,6 +31,13 @@ export class StoreQueuedMessage extends Document { @Prop({ type: Object, required: true }) encryptedMessage: EncryptedMessage + /** + * The size Encrypted Message store in collection + * @type {number} + */ + @Prop() + encryptedMessageByteCount?: number + /** * The recipient keys (DIDs or other identifiers) associated with the message. * @type {string[]} @@ -44,6 +51,7 @@ export class StoreQueuedMessage extends Document { */ @Prop() state?: string + /** * The timestamp when the message was created. * Mongoose automatically creates this field when `timestamps: true` is set in the schema. diff --git a/packages/server/src/websocket/services/MessagePersister.ts b/packages/server/src/websocket/services/MessagePersister.ts index 647d777..53eb99c 100644 --- a/packages/server/src/websocket/services/MessagePersister.ts +++ b/packages/server/src/websocket/services/MessagePersister.ts @@ -62,6 +62,7 @@ export class MessagePersister { connectionId: message.connectionId, recipientKeys: message.recipientDids, encryptedMessage: message.encryptedMessage, + encryptedMessageSize: message.encryptedMessageSize, state: message.state, createdAt: new Date(message.receivedAt), }) diff --git a/packages/server/src/websocket/websocket.service.spec.ts b/packages/server/src/websocket/websocket.service.spec.ts index 039df06..7c861fa 100644 --- a/packages/server/src/websocket/websocket.service.spec.ts +++ b/packages/server/src/websocket/websocket.service.spec.ts @@ -71,12 +71,12 @@ describe('WebsocketService', () => { expect(service).toBeDefined() }) - it('should takeFromQueue (Redis and MongoDB)', async () => { + it('should takeFromQueue (Redis and MongoDB) with size message', async () => { // Mock configuration for Redis jest.spyOn(redisMock, 'lrange').mockResolvedValue([ JSON.stringify({ id: '1', // Usar 'id' en lugar de 'messageId' - encryptedMessage: 'test-message-1', + encryptedMessage: 'test-message-2', receivedAt: new Date().toISOString(), }), ]) @@ -85,7 +85,46 @@ describe('WebsocketService', () => { storeQueuedMessageMock.exec.mockResolvedValue([ { id: '2', // MongoDB usa _id por defecto + encryptedMessage: 'test-message-1', + createdAt: new Date(), + }, + ]) + + // Execute the takeFromQueue method + const result = await service.takeFromQueue({ + connectionId: 'test-connection-id', + id: '', + limitBytes: 1000000, + }) + + // Verify that Redis and MongoDB calls were made + expect(redisMock.lrange).toHaveBeenCalledWith('connectionId:test-connection-id:queuemessages', 0, -1) + expect(storeQueuedMessageMock.find).toHaveBeenCalledWith({ + $or: [{ connectionId: 'test-connection-id' }, { recipientKeys: undefined }], + state: 'pending', + }) + + // Verify the combined result from Redis and MongoDB + expect(result).toHaveLength(2) + expect(result[0].encryptedMessage).toBe('test-message-1') // Verifica por 'id' + expect(result[1].encryptedMessage).toBe('test-message-2') // Verifica por 'id' + }) + + it('should takeFromQueue (Redis and MongoDB) with limit message', async () => { + // Mock configuration for Redis + jest.spyOn(redisMock, 'lrange').mockResolvedValue([ + JSON.stringify({ + id: '1', // Usar 'id' en lugar de 'messageId' encryptedMessage: 'test-message-2', + receivedAt: new Date().toISOString(), + }), + ]) + + // Mock configuration for MongoDB + storeQueuedMessageMock.exec.mockResolvedValue([ + { + id: '2', // MongoDB usa _id por defecto + encryptedMessage: 'test-message-1', createdAt: new Date(), }, ]) diff --git a/packages/server/src/websocket/websocket.service.ts b/packages/server/src/websocket/websocket.service.ts index 729c99d..79de19b 100644 --- a/packages/server/src/websocket/websocket.service.ts +++ b/packages/server/src/websocket/websocket.service.ts @@ -55,72 +55,32 @@ export class WebsocketService { /** * Retrieves messages from both Redis and MongoDB based on the provided criteria. - * This method retrieves messages from Redis for the specified connection ID, as well as messages stored in MongoDB. + * Depending on the specified criteria, this method will retrieve messages either + * by a byte size limit or by a count limit, pulling messages from both Redis and MongoDB. + * + * If `limitBytes` is defined in the DTO, messages will be retrieved up to the specified byte limit + * using `takeMessagesWithByteCountLimit`. Otherwise, messages will be retrieved by a count limit + * using `takeMessagesWithMessageCountLimit`. * * @param {TakeFromQueueDto} dto - Data transfer object containing the query parameters. * @param {string} dto.connectionId - The unique identifier of the connection. - * @param {number} [dto.limit] - Optional limit on the number of messages to retrieve. + * @param {number} [dto.limit] - Optional limit on the number of messages to retrieve if `limitBytes` is not specified. + * @param {number} [dto.limitBytes] - Optional byte size limit for retrieving messages. * @param {boolean} [dto.deleteMessages] - Optional flag to determine if messages should be deleted after retrieval. * @param {string} [dto.recipientDid] - Optional recipient identifier for filtering messages. + * When set, retrieval is based on the cumulative byte size of messages rather than the count. + * * @returns {Promise} - A promise that resolves to an array of queued messages. + * The array will contain messages retrieved either by byte size or by count, based on the criteria provided. */ async takeFromQueue(dto: TakeFromQueueDto): Promise { - const { connectionId, limit = 10, recipientDid } = dto + const { limitBytes } = dto this.logger.debug('[takeFromQueue] Method called with DTO:', dto) - try { - // Retrieve messages from Redis - const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, limit - 1) - const redisMessages: QueuedMessage[] = redisMessagesRaw.map((message) => { - const parsedMessage = JSON.parse(message) - - // Map Redis data to QueuedMessage type - return { - id: parsedMessage.messageId, - receivedAt: new Date(parsedMessage.receivedAt), - encryptedMessage: parsedMessage.encryptedMessage, - } - }) - - this.logger.debug( - `[takeFromQueue] Fetched ${redisMessages.length} messages from Redis for connectionId ${connectionId}`, - ) - - // Query MongoDB with the provided connectionId or recipientDid, and state 'pending' - const mongoMessages = await this.queuedMessage - .find({ - $or: [{ connectionId }, { recipientKeys: recipientDid }], - state: 'pending', - }) - .sort({ createdAt: 1 }) - .limit(limit) - .select({ messageId: 1, encryptedMessage: 1, createdAt: 1 }) - .lean() - .exec() - - const mongoMappedMessages: QueuedMessage[] = mongoMessages.map((msg) => ({ - id: msg.messageId, - receivedAt: msg.createdAt, - encryptedMessage: msg.encryptedMessage, - })) - - this.logger.debug( - `[takeFromQueue] Fetched ${mongoMappedMessages.length} messages from MongoDB for connectionId ${connectionId}`, - ) - // Combine messages from Redis and MongoDB - const combinedMessages: QueuedMessage[] = [...redisMessages, ...mongoMappedMessages] - - this.logger.debug(`[takeFromQueue] combinedMessages for connectionId ${connectionId}: ${combinedMessages}`) - - return combinedMessages - } catch (error) { - this.logger.error('[takeFromQueue] Error retrieving messages from Redis and MongoDB:', { - connectionId, - error: error.message, - }) - return [] - } + return limitBytes + ? await this.takeMessagesWithByteCountLimit(dto) + : await this.takeMessagesWithMessageCountLimit(dto) } /** @@ -178,6 +138,11 @@ export class WebsocketService { messageId = new ObjectId().toString() receivedAt = new Date() + // Calculate the size in bytes of the encrypted message to add database + const encryptedMessageByteCount = Buffer.byteLength(JSON.stringify(payload), 'utf8') + + this.logger.debug(`[addMessage] Size Encrypted Message ${encryptedMessageByteCount} `) + // Create a message object to store in Redis const messageData = { messageId, @@ -185,6 +150,7 @@ export class WebsocketService { recipientDids, encryptedMessage: payload, state: MessageState.pending, + encryptedMessageByteCount, receivedAt, } @@ -698,4 +664,152 @@ export class WebsocketService { }) } } + + /** + * Retrieves messages from both Redis and MongoDB up to a specified message count limit. + * + * @param {TakeFromQueueDto} dto - Data transfer object containing query parameters. + * @returns {Promise} - A promise that resolves to an array of queued messages. + */ + private async takeMessagesWithMessageCountLimit(dto: TakeFromQueueDto): Promise { + const { connectionId, limit, recipientDid } = dto + + this.logger.debug('[takeMessagesWithLimit] Method called with DTO:', dto) + + try { + // Query MongoDB with the provided connectionId or recipientDid, and state 'pending' + const mongoMessages = await this.queuedMessage + .find({ + $or: [{ connectionId }, { recipientKeys: recipientDid }], + state: 'pending', + }) + .sort({ createdAt: 1 }) + .limit(limit) + .select({ messageId: 1, encryptedMessage: 1, createdAt: 1 }) + .lean() + .exec() + + const mongoMappedMessages: QueuedMessage[] = mongoMessages.map((msg) => ({ + id: msg.messageId, + receivedAt: msg.createdAt, + encryptedMessage: msg.encryptedMessage, + })) + + this.logger.debug( + `[takeMessagesWithLimit] Fetched ${mongoMappedMessages.length} messages from MongoDB for connectionId ${connectionId}`, + ) + + // Retrieve messages from Redis + const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, limit - 1) + const redisMessages: QueuedMessage[] = redisMessagesRaw.map((message) => { + const parsedMessage = JSON.parse(message) + + // Map Redis data to QueuedMessage type + return { + id: parsedMessage.messageId, + receivedAt: new Date(parsedMessage.receivedAt), + encryptedMessage: parsedMessage.encryptedMessage, + } + }) + + this.logger.debug( + `[takeMessagesWithLimit] Fetched ${redisMessages.length} messages from Redis for connectionId ${connectionId}`, + ) + // Combine messages from Redis and MongoDB + const combinedMessages: QueuedMessage[] = [...mongoMappedMessages, ...redisMessages] + + this.logger.debug( + `[takeMessagesWithLimit] combinedMessages for connectionId ${connectionId}: ${combinedMessages}`, + ) + + return combinedMessages + } catch (error) { + this.logger.error('[takeMessagesWithLimit] Error retrieving messages from Redis and MongoDB:', { + connectionId, + error: error.message, + }) + return [] + } + } + + /** + * Retrieves messages from both Redis and MongoDB up to a specified total size in bytes. + * + * @param {TakeFromQueueDto} dto - Data transfer object containing query parameters. + * @returns {Promise} - A promise that resolves to an array of queued messages. + */ + private async takeMessagesWithByteCountLimit(dto: TakeFromQueueDto): Promise { + const { connectionId, recipientDid, limitBytes } = dto + const maxMessageSizeBytes = limitBytes + let currentSize = 0 + const combinedMessages: QueuedMessage[] = [] + + try { + // Step 1: Retrieve messages from MongoDB with size limit + const mongoMessages = await this.queuedMessage + .find({ + $or: [{ connectionId }, { recipientKeys: recipientDid }], + state: 'pending', + }) + .sort({ createdAt: 1 }) + .select({ messageId: 1, encryptedMessage: 1, createdAt: 1, encryptedMessageByteCount: 1 }) + .lean() + .exec() + + for (const msg of mongoMessages) { + const messageSize = + msg.encryptedMessageByteCount || Buffer.byteLength(JSON.stringify(msg.encryptedMessage), 'utf8') + + if (currentSize + messageSize > maxMessageSizeBytes) break + + combinedMessages.push({ + id: msg.messageId, + receivedAt: msg.createdAt, + encryptedMessage: msg.encryptedMessage, + }) + currentSize += messageSize + } + + // Skip Redis if size limit reached + if (currentSize >= maxMessageSizeBytes) { + this.logger.debug( + `[takeMessagesWithSize] Size limit reached with MongoDB messages for connectionId ${connectionId}`, + ) + return combinedMessages + } + + // Step 2: Retrieve messages from Redis with size limit + const redisMessagesRaw = await this.redis.lrange(`connectionId:${connectionId}:queuemessages`, 0, -1) + + for (const message of redisMessagesRaw) { + const parsedMessage = JSON.parse(message) + const messageSize = + parsedMessage.sizeInBytes || Buffer.byteLength(JSON.stringify(parsedMessage.encryptedMessage), 'utf8') + + if (currentSize + messageSize > maxMessageSizeBytes) break + + combinedMessages.push({ + id: parsedMessage.messageId, + receivedAt: new Date(parsedMessage.receivedAt), + encryptedMessage: parsedMessage.encryptedMessage, + }) + currentSize += messageSize + } + + this.logger.debug( + `[takeMessagesWithSize] Fetched ${combinedMessages.length} total messages for connectionId ${connectionId}`, + ) + + this.logger.debug( + `[takeMessagesWithSize] Total message size to be sent for connectionId ${connectionId}: ${currentSize} bytes`, + ) + + return combinedMessages + } catch (error) { + this.logger.error( + `[takeMessagesWithSize] Error retrieving messages for connectionId ${connectionId}: ${error.message}`, + ) + return [] + } + } }