Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add standalone MPR postgres module #41

Merged
merged 18 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
71eb74e
feat: Initial commit add standalone MPR postgress module
gabrielmatau79 Dec 18, 2024
c8d3cf8
refactor: Resolve the requested changes regarding module configuratio…
gabrielmatau79 Dec 18, 2024
e3a532d
refactor: Resolve the requested changes regarding the refactoring of …
gabrielmatau79 Dec 18, 2024
99f4fb2
feat: Resolve the requested changes regarding the introduction of a c…
gabrielmatau79 Dec 18, 2024
a20acf9
fix: fix export class into package/postgress/index.ts
gabrielmatau79 Dec 18, 2024
3d66932
fix: fix name packages postgres
gabrielmatau79 Dec 19, 2024
23852ae
refactor: Resolve the requested changes in variable names and type ex…
gabrielmatau79 Dec 19, 2024
a57ab7e
feat: add release configuration and encapsulated in an options object…
gabrielmatau79 Dec 19, 2024
2decf51
docs: Add documentation for using the PostgresMessagePickupRepository
gabrielmatau79 Dec 19, 2024
d07f5bc
fix: fix request changes about package.json and remove fcmServiceBaseUrl
gabrielmatau79 Dec 19, 2024
739e45f
fix: Fix description package.json
gabrielmatau79 Dec 19, 2024
1aceb9c
fix: fix querys sql into dbcollections.ts
gabrielmatau79 Dec 20, 2024
2061999
fix: remove conditionial 'this.instance' to publish message when live…
gabrielmatau79 Dec 20, 2024
fe221a6
refactor: The table names are changed to livesession and queuedmessage.
gabrielmatau79 Dec 20, 2024
4621818
fix: fix include indexMessageTable in buildPgDatabase
gabrielmatau79 Dec 20, 2024
8f86e72
chore: fix names, simplify example in README
genaris Dec 24, 2024
7493cbe
style: ignore CHANGELOG
genaris Dec 24, 2024
e2ae0ad
revert: changes in CHANGELOG
genaris Dec 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions packages/postgres/config/dbCollections.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export const messagesTableName = 'storequeuedMessage'
export const messagesTableName = 'storequeuedmessage'

export const createTableMessage = `
CREATE TABLE IF NOT EXISTS ${messagesTableName} (
Expand All @@ -9,9 +9,6 @@ CREATE TABLE IF NOT EXISTS ${messagesTableName} (
state VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX IF NOT EXISTS "${messagesTableName}_connectionId_index" ON "queuedmessages" (connectionId);
CREATE INDEX IF NOT EXISTS "${messagesTableName}_created_at_index" ON "queuedmessages" (created_at);
`

export const liveSessionTableName = 'storelivesession'
Expand All @@ -24,6 +21,8 @@ CREATE TABLE IF NOT EXISTS ${liveSessionTableName} (
role VARCHAR(50),
instance VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
);`

export const indexMessageTable = `CREATE INDEX IF NOT EXISTS "${messagesTableName}_connectionId_index" ON "${messagesTableName}" (connectionId);`

CREATE INDEX IF NOT EXISTS "${liveSessionTableName}_connectionid" ON "${liveSessionTableName}" USING btree ("connectionid");`
export const indexLiveSessionTable = `CREATE INDEX IF NOT EXISTS "${liveSessionTableName}_connectionid" ON "${liveSessionTableName}" USING btree ("connectionid");`
4 changes: 2 additions & 2 deletions packages/postgres/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@2060.io/credo-ts-message-pickup-repository-pg",
"main": "build/index",
"types": "build/index",
"main": "build/src/index.js",
"types": "build/src/index.d.ts",
"version": "0.0.1",
"files": [
"build"
Expand Down
55 changes: 31 additions & 24 deletions packages/postgres/src/PostgresMessagePickupRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ import {
import { Pool, Client } from 'pg'
import PGPubsub from 'pg-pubsub'
import * as os from 'os'
import { createTableMessage, createTableLive, messagesTableName, liveSessionTableName } from '../config/dbCollections'
import {
createTableMessage,
createTableLive,
messagesTableName,
liveSessionTableName,
indexLiveSessionTable,
} from '../config/dbCollections'
import { ConnectionInfo, PostgresMessagePickupRepositoryConfig } from './interfaces'
import { MessagePickupSession } from '@credo-ts/core/build/modules/message-pickup/MessagePickupSession'
import axios from 'axios'
Expand Down Expand Up @@ -56,7 +62,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
*/
public async initialize(options: {
agent: Agent
connectionInfoCallback: (connectionId: string) => Promise<ConnectionInfo | undefined>
connectionInfoCallback?: (connectionId: string) => Promise<ConnectionInfo | undefined>
}): Promise<void> {
try {
// Initialize the database
Expand Down Expand Up @@ -142,7 +148,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
// Query to fetch messages from the database
const query = `
SELECT id, encryptedmessage, state
FROM queuedmessages
FROM ${messagesTableName}
WHERE (connectionid = $1 OR $2 = ANY (recipientkeys)) AND state = 'pending'
ORDER BY created_at
LIMIT $3
Expand All @@ -159,7 +165,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository

// Update message states to 'sending' if deleteMessages is false
if (!deleteMessages && messagesToUpdateIds.length > 0) {
const updateQuery = `UPDATE queuedmessages SET state = 'sending' WHERE id = ANY($1)`
const updateQuery = `UPDATE ${messagesTableName} SET state = 'sending' WHERE id = ANY($1)`
const updateResult = await this.messagesCollection?.query(updateQuery, [messagesToUpdateIds])

if (updateResult?.rowCount !== result.rows.length) {
Expand Down Expand Up @@ -198,7 +204,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
// Query to count pending messages for the specified connection ID
const query = `
SELECT COUNT(*) AS count
FROM queuedmessages
FROM ${messagesTableName}
WHERE connectionid = $1 AND state = 'pending'
`
const params = [connectionId]
Expand Down Expand Up @@ -245,7 +251,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository

// Insert the message into the database
const query = `
INSERT INTO queuedmessages(connectionid, recipientKeys, encryptedmessage, state)
INSERT INTO ${messagesTableName}(connectionid, recipientKeys, encryptedmessage, state)
VALUES($1, $2, $3, $4)
RETURNING id
`
Expand Down Expand Up @@ -277,7 +283,6 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
}
} else {
this.logger?.error(`connectionInfoCallback is not defined`)
throw new Error(`connectionInfoCallback is not defined`)
}
} else if (this.dbListener) {
// Publish to the Pub/Sub channel if a live session exists on another instance
Expand Down Expand Up @@ -322,7 +327,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
const placeholders = messageIds.map((_, index) => `$${index + 2}`).join(', ')

// Construct the SQL DELETE query
const query = `DELETE FROM queuedmessages WHERE connectionid = $1 AND id IN (${placeholders})`
const query = `DELETE FROM ${messagesTableName} WHERE connectionid = $1 AND id IN (${placeholders})`

// Combine connectionId with messageIds as query parameters
const queryParams = [connectionId, ...messageIds]
Expand Down Expand Up @@ -428,11 +433,12 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
try {
await dbClient.connect()

// Check if the 'queuedmessages' table exists.
// Check if the 'messagesTableName' table exists.
const messageTableResult = await dbClient.query(`SELECT to_regclass('${messagesTableName}')`)
if (!messageTableResult.rows[0].to_regclass) {
// If it doesn't exist, create the 'storequeuedmessage' table.
// If it doesn't exist, create the table.
await dbClient.query(createTableMessage)
await dbClient.query(indexLiveSessionTable)
this.logger?.info(`[buildPgDatabase] PostgresDbService Table "${messagesTableName}" created.`)
}

Expand All @@ -441,6 +447,7 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
if (!liveTableResult.rows[0].to_regclass) {
// If it doesn't exist, create the table.
await dbClient.query(createTableLive)
await dbClient.query(indexLiveSessionTable)
this.logger?.info(`[buildPgDatabase] PostgresDbService Table "${liveSessionTableName}" created.`)
} else {
// If the table exists, clean it (truncate or delete, depending on your requirements).
Expand All @@ -467,13 +474,13 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
try {
this.logger?.debug(`[checkQueueMessages] Init verify messages state 'sending'`)
const messagesToSend = await this.messagesCollection?.query(
'SELECT * FROM queuedmessages WHERE state = $1 and connectionid = $2',
`SELECT * FROM ${messagesTableName} WHERE state = $1 and connectionid = $2`,
['sending', connectionID],
)
if (messagesToSend && messagesToSend.rows.length > 0) {
for (const message of messagesToSend.rows) {
// Update the message state to 'pending'
await this.messagesCollection?.query('UPDATE queuedmessages SET state = $1 WHERE id = $2', [
await this.messagesCollection?.query(`UPDATE ${messagesTableName} SET state = $1 WHERE id = $2`, [
'pending',
message.id,
])
Expand Down Expand Up @@ -510,19 +517,19 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
* @returns liveSession object or false
*/
private async findLiveSessionInDb(connectionId: string): Promise<MessagePickupSession | undefined> {
this.logger?.debug(`[getLiveSessionFromDB] initializing find registry for connectionId ${connectionId}`)
this.logger?.debug(`[findLiveSessionInDb] initializing find registry for connectionId ${connectionId}`)
if (!connectionId) throw new Error('connectionId is not defined')
try {
const queryLiveSession = await this.messagesCollection?.query(
`SELECT sessionid, connectionid, protocolVersion, role FROM storelivesession WHERE connectionid = $1 LIMIT $2`,
`SELECT sessionid, connectionid, protocolVersion, role FROM ${liveSessionTableName} WHERE connectionid = $1 LIMIT $2`,
[connectionId, 1],
)
// Check if liveSession is not empty (record found)
const recordFound = queryLiveSession && queryLiveSession.rows && queryLiveSession.rows.length > 0
this.logger?.debug(`[getLiveSessionFromDB] record found status ${recordFound} to connectionId ${connectionId}`)
this.logger?.debug(`[findLiveSessionInDb] record found status ${recordFound} to connectionId ${connectionId}`)
return recordFound ? queryLiveSession.rows[0] : undefined
} catch (error) {
this.logger?.debug(`[getLiveSessionFromDB] Error find to connectionId ${connectionId}`)
this.logger?.debug(`[findLiveSessionInDb] Error find to connectionId ${connectionId}`)
return undefined // Return false in case of an error
}
}
Expand All @@ -534,17 +541,17 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
*/
private async addLiveSessionOnDb(session: MessagePickupSession, instance: string): Promise<void> {
const { id, connectionId, protocolVersion, role } = session
this.logger?.debug(`[addLiveSessionFromDb] initializing add LiveSession DB to connectionId ${connectionId}`)
this.logger?.debug(`[addLiveSessionOnDb] initializing add LiveSession DB to connectionId ${connectionId}`)
if (!session) throw new Error('session is not defined')
try {
const insertMessageDB = await this.messagesCollection?.query(
'INSERT INTO storelivesession (sessionid, connectionid, protocolVersion, role, instance) VALUES($1, $2, $3, $4, $5) RETURNING sessionid',
`INSERT INTO ${liveSessionTableName} (sessionid, connectionid, protocolVersion, role, instance) VALUES($1, $2, $3, $4, $5) RETURNING sessionid`,
[id, connectionId, protocolVersion, role, instance],
)
const liveSessionId = insertMessageDB?.rows[0].sessionid
this.logger?.debug(`[addLiveSessionFromDb] add liveSession to ${connectionId} and result ${liveSessionId}`)
this.logger?.debug(`[addLiveSessionOnDb] add liveSession to ${connectionId} and result ${liveSessionId}`)
} catch (error) {
this.logger?.debug(`[addLiveSessionFromDb] error add liveSession DB ${connectionId}`)
this.logger?.debug(`[addLiveSessionOnDb] error add liveSession DB ${connectionId}`)
}
}

Expand All @@ -553,20 +560,20 @@ export class PostgresMessagePickupRepository implements MessagePickupRepository
* @param connectionId
*/
private async removeLiveSessionOnDb(connectionId: string): Promise<void> {
this.logger?.debug(`[removeLiveSessionFromDb] initializing remove LiveSession to connectionId ${connectionId}`)
this.logger?.debug(`[removeLiveSessionOnDb] initializing remove LiveSession to connectionId ${connectionId}`)
if (!connectionId) throw new Error('connectionId is not defined')
try {
// Construct the SQL query with the placeholders
const query = `DELETE FROM storelivesession WHERE connectionid = $1`
const query = `DELETE FROM ${liveSessionTableName} WHERE connectionid = $1`

// Add connectionId for query parameters
const queryParams = [connectionId]

await this.messagesCollection?.query(query, queryParams)

this.logger?.debug(`[removeLiveSessionFromDb] removed LiveSession to connectionId ${connectionId}`)
this.logger?.debug(`[removeLiveSessionOnDb] removed LiveSession to connectionId ${connectionId}`)
} catch (error) {
this.logger?.error(`[removeLiveSessionFromDb] Error removing LiveSession: ${error}`)
this.logger?.error(`[removeLiveSessionOnDb] Error removing LiveSession: ${error}`)
}
}
}
Loading