Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Handling messages limit based on messages size #32

Merged
merged 10 commits into from
Nov 8, 2024
Merged
1 change: 1 addition & 0 deletions docs/message-pickup-repository-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Retrieves messages from the queue.
- `connectionId: string`: ID of the connection.
- `recipientDid?: string`: Optional DID of the recipient.
- `limit?: number`: Optional limit on the number of messages.
- `limitBytes?: number`: Optional byte size limit for retrieving messages
- `deleteMessages?: boolean`: Whether to delete the messages after retrieval.

- **Returns**: `Promise<QueuedMessage[]>`
Expand Down
36 changes: 26 additions & 10 deletions packages/client/src/MessagePickupRepositoryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import {
ConnectionIdOptions,
AddLiveSessionOptions,
MessagesReceivedCallbackParams,
ExtendedTakeFromQueueOptions,
} from './interfaces'
import {
AddMessageOptions,
GetAvailableMessageCountOptions,
MessagePickupRepository,
QueuedMessage,
RemoveMessagesOptions,
TakeFromQueueOptions,
} from '@credo-ts/core'

log.setLevel('info')
Expand All @@ -21,8 +21,13 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository {
private client?: Client
private readonly logger = log
private messagesReceivedCallback: ((data: MessagesReceivedCallbackParams) => void) | null = null
private readonly url: string
private readonly maxReceiveBytes?: number

constructor(private readonly url: string) {}
constructor(options: { url: string; maxReceiveBytes?: number }) {
this.url = options.url
this.maxReceiveBytes = options.maxReceiveBytes
}

/**
* Connect to the WebSocket server.
Expand Down Expand Up @@ -90,21 +95,32 @@ export class MessagePickupRepositoryClient implements MessagePickupRepository {
}

/**
* Call the 'takeFromQueue' RPC method.
* This method sends a request to the WebSocket server to take messages from the queue.
* It expects the response to be an array of `QueuedMessage` objects.
* Calls the 'takeFromQueue' RPC method on the WebSocket server.
* This method sends a request to retrieve messages from the queue for the specified connection.
* It can retrieve messages up to a specified byte limit (`limitBytes`) or by a count limit (`limit`).
* The response is expected to be an array of `QueuedMessage` objects.
*
* @param {TakeFromQueueOptions} params - The parameters to pass to the 'takeFromQueue' method, including:
* @param {ExtendedTakeFromQueueOptions} params - The parameters to pass to the 'takeFromQueue' method, including:
* @property {string} connectionId - The ID of the connection from which to take messages.
* @property {string} [recipientDid] - Optional DID of the recipient to filter messages by.
* @property {number} [limit] - Optional maximum number of messages to take from the queue.
* @property {number} [limit] - Optional maximum number of messages to take from the queue. Ignored if `limitBytes` is set.
* @property {number} [limitBytes] - Optional maximum cumulative byte size of messages to retrieve.
* @property {boolean} [deleteMessages] - Optional flag indicating whether to delete the messages after retrieving them.
* @returns {Promise<QueuedMessage[]>} - The result from the WebSocket server, expected to be an array of `QueuedMessage`.
* @throws Will throw an error if the result is not an array or if there's any issue with the WebSocket call.
* If provided, limits the retrieval by the total byte size of messages rather than by count.
*
* @returns {Promise<QueuedMessage[]>} - A promise that resolves to an array of `QueuedMessage` objects from the WebSocket server.
* @throws {Error} Will throw an error if the result is not an array of `QueuedMessage` objects,
* or if any issue occurs with the WebSocket call.
*/
async takeFromQueue(params: TakeFromQueueOptions): Promise<QueuedMessage[]> {
async takeFromQueue(params: ExtendedTakeFromQueueOptions): Promise<QueuedMessage[]> {
try {
const client = this.checkClient()

// Add limitBytes to params if maxReceiveBytes is set
if (this.maxReceiveBytes) {
params = { ...params, limitBytes: this.maxReceiveBytes }
}

// Call the RPC method and store the result as 'unknown' type initially
const result: unknown = await client.call('takeFromQueue', params, 2000)

Expand Down
6 changes: 5 additions & 1 deletion packages/client/src/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { QueuedMessage } from '@credo-ts/core'
import { QueuedMessage, TakeFromQueueOptions } from '@credo-ts/core'

export interface RemoveAllMessagesOptions {
connectionId: string
Expand All @@ -18,3 +18,7 @@ export interface MessagesReceivedCallbackParams {
connectionId: string
messages: QueuedMessage[]
}

export interface ExtendedTakeFromQueueOptions extends TakeFromQueueOptions {
limitBytes?: number
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ export class TakeFromQueueDto {

@IsOptional()
@IsInt()
limit: number
limit?: number

@IsOptional()
@IsInt()
limitBytes?: number

@IsOptional()
@IsBoolean()
Expand Down
8 changes: 8 additions & 0 deletions packages/server/src/websocket/schemas/StoreQueuedMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ export class StoreQueuedMessage extends Document {
@Prop({ type: Object, required: true })
encryptedMessage: EncryptedMessage

/**
* The size Encrypted Message store in collection
* @type {number}
*/
@Prop()
encryptedMessageByteCount?: number

/**
* The recipient keys (DIDs or other identifiers) associated with the message.
* @type {string[]}
Expand All @@ -44,6 +51,7 @@ export class StoreQueuedMessage extends Document {
*/
@Prop()
state?: string

/**
* The timestamp when the message was created.
* Mongoose automatically creates this field when `timestamps: true` is set in the schema.
Expand Down
1 change: 1 addition & 0 deletions packages/server/src/websocket/services/MessagePersister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export class MessagePersister {
connectionId: message.connectionId,
recipientKeys: message.recipientDids,
encryptedMessage: message.encryptedMessage,
encryptedMessageSize: message.encryptedMessageSize,
state: message.state,
createdAt: new Date(message.receivedAt),
})
Expand Down
43 changes: 41 additions & 2 deletions packages/server/src/websocket/websocket.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ describe('WebsocketService', () => {
expect(service).toBeDefined()
})

it('should takeFromQueue (Redis and MongoDB)', async () => {
it('should takeFromQueue (Redis and MongoDB) with size message', async () => {
// Mock configuration for Redis
jest.spyOn(redisMock, 'lrange').mockResolvedValue([
JSON.stringify({
id: '1', // Usar 'id' en lugar de 'messageId'
encryptedMessage: 'test-message-1',
encryptedMessage: 'test-message-2',
receivedAt: new Date().toISOString(),
}),
])
Expand All @@ -85,7 +85,46 @@ describe('WebsocketService', () => {
storeQueuedMessageMock.exec.mockResolvedValue([
{
id: '2', // MongoDB usa _id por defecto
encryptedMessage: 'test-message-1',
createdAt: new Date(),
},
])

// Execute the takeFromQueue method
const result = await service.takeFromQueue({
connectionId: 'test-connection-id',
id: '',
limitBytes: 1000000,
})

// Verify that Redis and MongoDB calls were made
expect(redisMock.lrange).toHaveBeenCalledWith('connectionId:test-connection-id:queuemessages', 0, -1)
expect(storeQueuedMessageMock.find).toHaveBeenCalledWith({
$or: [{ connectionId: 'test-connection-id' }, { recipientKeys: undefined }],
state: 'pending',
})

// Verify the combined result from Redis and MongoDB
expect(result).toHaveLength(2)
expect(result[0].encryptedMessage).toBe('test-message-1') // Verifica por 'id'
expect(result[1].encryptedMessage).toBe('test-message-2') // Verifica por 'id'
})

it('should takeFromQueue (Redis and MongoDB) with limit message', async () => {
// Mock configuration for Redis
jest.spyOn(redisMock, 'lrange').mockResolvedValue([
JSON.stringify({
id: '1', // Usar 'id' en lugar de 'messageId'
encryptedMessage: 'test-message-2',
receivedAt: new Date().toISOString(),
}),
])

// Mock configuration for MongoDB
storeQueuedMessageMock.exec.mockResolvedValue([
{
id: '2', // MongoDB usa _id por defecto
encryptedMessage: 'test-message-1',
createdAt: new Date(),
},
])
Expand Down
Loading
Loading