Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add improvements to the Postgres package #55

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions packages/postgres/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ This package provides a simple but efficient Message Pickup Repository implement

- Pluggable support for Push notifications: provide your own callback that be called whenever a message has arrived for an offline user (not connected to any instance)


## How does it work?

`PostgresMessagePickupRepository` creates two tables in its PostgreSQL database: one to store the queued messages, and another one to keep track of Live sessions when clients connect and disconnect from it. It also registers to a [PG PubSub](https://github.com/voxpelli/node-pg-pubsub) channel to be notified when a message arrives (for any connection).
Expand All @@ -36,7 +35,6 @@ The following diagrams show the operation when messages arrive in both online an

![Message arrived for an online client](./docs/diagrams/message-arrived-online-client.png)


![Message arrived for an offline client](./docs/diagrams/message-arrived-offline-client.png)

## Installation
Expand All @@ -48,8 +46,10 @@ To use it, install package in your DIDComm Mediator application. For example:
```bash
npm i @2060.io/credo-ts-message-pickup-repository-pg
```
or
```

or

```bash
yarn add @2060.io/credo-ts-message-pickup-repository-pg
```

Expand All @@ -61,7 +61,6 @@ Setting up PostgresMessagePickupRepository is quite simple if you have some prio

You need to instance `PostgresMessagePickupRepository` with explicit database configuration (remember: it could be the same used for Credo wallet). If `postgresDatabaseName` is not specified, default `messagepickuprepository` will be used (if it does not exist, it will try to automatically create it using the provided credentials).


```ts
const messageRepository = new PostgresMessagePickupRepository({
logger: yourLoggerInstance,
Expand All @@ -85,7 +84,7 @@ const connectionInfoCallback = async (connectionId) => {
const connectionRecord = await this.agent.connections.findById(connectionId)
const token = connectionRecord?.getTag('device_token') as string | null
return {
sendPushNotification: token ? (messageId) => { this.notificationSender.send(token, messageId }: undefined,
handleNotificationEvent: token ? (messageId) => { this.notificationSender.send(token, messageId }: undefined,
}
}
await messagePickupRepository.initialize({ agent, connectionInfoCallback })
Expand Down Expand Up @@ -124,7 +123,7 @@ const connectionInfoCallback = async (connectionId: string) => {
const token = connectionRecord?.getTag('device_token') as string | null

return {
sendPushNotification: token
handleNotificationEvent: token
? (messageId: string) => {
notificationSender.send(token, messageId)
}
Expand Down
9 changes: 4 additions & 5 deletions packages/postgres/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@2060.io/credo-ts-message-pickup-repository-pg",
"main": "build/src/index.js",
"types": "build/src/index.d.ts",
"main": "build/index.js",
"types": "build/index.d.ts",
"version": "0.0.7",
"files": [
"build"
Expand All @@ -25,8 +25,6 @@
"test": "jest"
},
"dependencies": {
"@credo-ts/core": "^0.5.11",
"loglevel": "^1.8.0",
"pg": "^8.11.3",
"pg-pubsub": "^0.8.1",
"typescript": "^4.0.0"
Expand All @@ -38,7 +36,8 @@
"ts-jest": "^27.0.0",
"ts-loader": "^9.0.0",
"webpack": "^5.0.0",
"webpack-cli": "^4.0.0"
"webpack-cli": "^4.0.0",
"@credo-ts/core": "^0.5.11"
},
"peerDependencies": {
"@credo-ts/core": "^0.5.11"
Expand Down
93 changes: 63 additions & 30 deletions packages/postgres/src/PostgresMessagePickupRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import {
liveSessionTableName,
liveSessionTableIndex,
messageTableIndex,
} from '../config/dbCollections'
message_state_type,
} from './config/dbCollections'
import { ConnectionInfo, PostgresMessagePickupRepositoryConfig } from './interfaces'
import { MessagePickupSession } from '@credo-ts/core/build/modules/message-pickup/MessagePickupSession'
import { randomUUID } from 'crypto'

@injectable()
export class PostgresMessagePickupRepository implements MessagePickupRepository {
Expand Down Expand Up @@ -87,7 +89,10 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository

// Set instance variables
this.agent = options.agent
this.instanceName = os.hostname() // Retrieve hostname for instance identification

this.instanceName = `${os.hostname()}-${process.pid}-${randomUUID()}`
this.logger?.info(`[initialize] Instance identifier set to: ${this.instanceName}`)

this.connectionInfoCallback = options.connectionInfoCallback

// Register event handlers
Expand Down Expand Up @@ -142,44 +147,60 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
this.logger?.info(`[takeFromQueue] Initializing method for ConnectionId: ${connectionId}, Limit: ${limit}`)

try {
// Query to fetch messages from the database
// If deleteMessages is true, just fetch messages without updating their state
if (deleteMessages) {
const query = `
SELECT id, encryptedmessage, state
FROM ${messagesTableName}
WHERE (connectionid = $1 OR $2 = ANY (recipientKeysBase58)) AND state = 'pending'
ORDER BY created_at
LIMIT $3
`
const params = [connectionId, recipientDid, limit ?? 0]
const result = await this.messagesCollection?.query(query, params)

if (!result || result.rows.length === 0) {
this.logger?.debug(`[takeFromQueue] No messages found for ConnectionId: ${connectionId}`)
return []
}

return result.rows.map((message) => ({
id: message.id,
encryptedMessage: message.encryptedmessage,
state: message.state,
}))
}

// Use UPDATE and RETURNING to fetch and update messages in one step
const query = `
SELECT id, encryptedmessage, state
FROM ${messagesTableName}
WHERE (connectionid = $1 OR $2 = ANY (recipientkeys)) AND state = 'pending'
ORDER BY created_at
LIMIT $3
UPDATE ${messagesTableName}
SET state = 'sending'
WHERE id IN (
SELECT id
FROM ${messagesTableName}
WHERE (connectionid = $1 OR $2 = ANY (recipientKeysBase58))
AND state = 'pending'
ORDER BY created_at
LIMIT $3
)
RETURNING id, encryptedmessage, state;
`
const params = [connectionId, recipientDid, limit ?? 0]
const result = await this.messagesCollection?.query(query, params)

if (!result || result.rows.length === 0) {
this.logger?.debug(`[takeFromQueue] No messages found for ConnectionId: ${connectionId}`)
this.logger?.debug(`[takeFromQueue] No messages updated for ConnectionId: ${connectionId}`)
return []
}

const messagesToUpdateIds = result.rows.map((message) => message.id)

// Update message states to 'sending' if deleteMessages is false
if (!deleteMessages && messagesToUpdateIds.length > 0) {
const updateQuery = `UPDATE ${messagesTableName} SET state = 'sending' WHERE id = ANY($1)`
const updateResult = await this.messagesCollection?.query(updateQuery, [messagesToUpdateIds])
this.logger?.debug(`[takeFromQueue] ${result.rows.length} messages updated to "sending" state.`)

if (updateResult?.rowCount !== result.rows.length) {
this.logger?.debug(`[takeFromQueue] Not all messages were updated to "sending" state.`)
} else {
this.logger?.debug(`[takeFromQueue] ${updateResult.rowCount} messages updated to "sending" state.`)
}
}

// Map database rows to QueuedMessage objects
const queuedMessages: QueuedMessage[] = result.rows.map((message) => ({
// Return the messages as QueuedMessage objects
return result.rows.map((message) => ({
id: message.id,
encryptedMessage: message.encryptedmessage,
state: !deleteMessages ? 'sending' : message.state,
state: 'sending',
}))

return queuedMessages
} catch (error) {
this.logger?.error(`[takeFromQueue] Error: ${error}`)
return []
Expand Down Expand Up @@ -248,7 +269,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository

// Insert the message into the database
const query = `
INSERT INTO ${messagesTableName}(connectionid, recipientKeys, encryptedmessage, state)
INSERT INTO ${messagesTableName}(connectionid, recipientKeysBase58, encryptedmessage, state)
VALUES($1, $2, $3, $4)
RETURNING id
`
Expand All @@ -275,8 +296,8 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
if (this.connectionInfoCallback) {
const connectionInfo = await this.connectionInfoCallback(connectionId)

if (connectionInfo?.sendPushNotification) {
await connectionInfo.sendPushNotification(messageId)
if (connectionInfo?.handleNotificationEvent) {
await connectionInfo.handleNotificationEvent(messageId)
}
} else {
this.logger?.error(`connectionInfoCallback is not defined`)
Expand Down Expand Up @@ -410,6 +431,9 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
try {
await client.connect()

// Use advisory lock to prevent
await client.query('SELECT pg_advisory_lock(99998)')

// Check if the database already exists.
const result = await client.query('SELECT 1 FROM pg_database WHERE datname = $1', [this.postgresDatabaseName])
this.logger?.debug(`[buildPgDatabase] PostgresDbService exist ${result.rowCount}`)
Expand All @@ -420,16 +444,22 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
this.logger?.info(`[buildPgDatabase] PostgresDbService Database "${this.postgresDatabaseName}" created.`)
}

await client.query('SELECT pg_advisory_unlock(99998)')

// Create a new client connected to the specific database.
const dbClient = new Client(poolConfig)

try {
await dbClient.connect()

// Use advisory lock to prevent race conditions
await client.query('SELECT pg_advisory_lock(99999)')

// Check if the 'messagesTableName' table exists.
const messageTableResult = await dbClient.query(`SELECT to_regclass('${messagesTableName}')`)
if (!messageTableResult.rows[0].to_regclass) {
// If it doesn't exist, create the table.
await dbClient.query(message_state_type)
await dbClient.query(createTableMessage)
await dbClient.query(messageTableIndex)
this.logger?.info(`[buildPgDatabase] PostgresDbService Table "${messagesTableName}" created.`)
Expand All @@ -447,6 +477,9 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
await dbClient.query(`TRUNCATE TABLE ${liveSessionTableName}`)
this.logger?.info(`[buildPgDatabase] PostgresDbService Table "${liveSessionTableName}" cleared.`)
}

// Unlock after table creation
await dbClient.query('SELECT pg_advisory_unlock(99999)')
} finally {
await dbClient.end()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
export const messagesTableName = 'queuedmessage'

export const message_state_type = `CREATE TYPE message_state AS ENUM ('pending', 'sending');`

export const createTableMessage = `
CREATE TABLE IF NOT EXISTS ${messagesTableName} (
id VARCHAR(20) DEFAULT substr(md5(random()::text), 1, 20) PRIMARY KEY,
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
connectionId VARCHAR(255),
recipientKeys TEXT[],
recipientKeysBase58 TEXT[],
encryptedMessage JSONB,
state VARCHAR(50),
state message_state NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`
Expand All @@ -19,7 +21,7 @@ CREATE TABLE IF NOT EXISTS ${liveSessionTableName} (
connectionid VARCHAR(50),
protocolVersion VARCHAR(50),
role VARCHAR(50),
instance VARCHAR(50),
instance VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`

Expand Down
2 changes: 1 addition & 1 deletion packages/postgres/src/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Logger } from '@credo-ts/core'

export interface ConnectionInfo {
sendPushNotification?: (messageId: string) => Promise<void>
handleNotificationEvent?: (messageId: string) => Promise<void>
}

export interface PostgresMessagePickupRepositoryConfig {
Expand Down
20 changes: 16 additions & 4 deletions packages/postgres/tsconfig.build.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
{
"extends": "../../tsconfig.build.json",

"compilerOptions": {
"outDir": "./build",
"rootDir": "./src",
"module": "CommonJS",
"target": "ES2017",
"declaration": true,
"sourceMap": true,
"strict": true,
"noEmitOnError": true,
"lib": ["ES2017"],
"esModuleInterop": true,
"allowSyntheticDefaultImports": true,
"resolveJsonModule": true,
"experimentalDecorators": true,
"emitDecoratorMetadata": true,
"skipLibCheck": true,
"types": ["node"]
},

"include": ["src/**/*"]
"include": ["src/**/*"],
"exclude": ["node_modules", "build", "**/*.test.ts", "**/__tests__/**/*", "**/__mocks__/**/*"]
}
11 changes: 7 additions & 4 deletions packages/postgres/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
{
"extends": "../../tsconfig.json",
"extends": "./tsconfig.build.json",
"compilerOptions": {
"types": ["jest"]
},
"@2060.io/credo-ts-message-pickup-repository-pg": ["packages/postgres/src"]
"types": ["jest", "node"],
"baseUrl": ".",
"paths": {
"@2060.io/credo-ts-message-pickup-repository-pg": ["packages/postgres/src"]
}
}
}
Loading