Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): tenanted webhooks #3317

Draft
wants to merge 10 commits into
base: 2893/multi-tenancy-v1
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.up = function (knex) {
return knex.schema
.alterTable('webhookEvents', (table) => {
table.uuid('tenantId').references('tenants.id').index()
})
.then(() => {
return knex.raw(
`UPDATE "webhookEvents" SET "tenantId" = (SELECT id from "tenants" LIMIT 1)`
)
})
.then(() => {
return knex.schema.alterTable('webhookEvents', (table) => {
table.uuid('tenantId').notNullable().alter()
})
})
}

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.down = function (knex) {
return knex.schema.alterTable('webhookEvents', function (table) {
table.dropColumn('tenantId')
})
}
1 change: 1 addition & 0 deletions packages/backend/src/accounting/psql/balance.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ describe('Balances', (): void => {
deps = initIocContainer({ ...Config, useTigerBeetle: false })
appContainer = await createTestApp(deps)
serviceDeps = {
config: await deps.use('config'), // TODO: remove once tenanted peers are added
logger: await deps.use('logger'),
knex: await deps.use('knex'),
telemetry: await deps.use('telemetry')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ describe('Ledger Account', (): void => {
const deps = initIocContainer({ ...Config, useTigerBeetle: false })
appContainer = await createTestApp(deps)
serviceDeps = {
config: await deps.use('config'), // TODO: remove once tenanted peers are in
logger: await deps.use('logger'),
knex: await deps.use('knex'),
telemetry: await deps.use('telemetry')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ describe('Ledger Transfer', (): void => {
const deps = initIocContainer({ ...Config, useTigerBeetle: false })
appContainer = await createTestApp(deps)
serviceDeps = {
config: await deps.use('config'), // TODO: remove once tenanted peers are in
logger: await deps.use('logger'),
knex: await deps.use('knex'),
telemetry: await deps.use('telemetry')
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/src/accounting/psql/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ import {
} from './ledger-transfer'
import { LedgerTransfer, LedgerTransferType } from './ledger-transfer/model'
import { TelemetryService } from '../../telemetry/service'
import { IAppConfig } from '../../config/app'

export interface ServiceDependencies extends BaseService {
knex: TransactionOrKnex
telemetry: TelemetryService
withdrawalThrottleDelay?: number
config: IAppConfig // TODO: remove when peers are tenanted
}

export function createAccountingService(
Expand Down
7 changes: 6 additions & 1 deletion packages/backend/src/accounting/service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { TransactionOrKnex } from 'objection'
import { BaseService } from '../shared/baseService'
import { TransferError, isTransferError } from './errors'
import { IAppConfig } from '../config/app'

export enum LiquidityAccountType {
ASSET = 'ASSET',
Expand Down Expand Up @@ -44,6 +45,7 @@ export interface OnCreditOptions {

export interface OnDebitOptions {
balance: bigint
tenantId: string
}

export interface BaseTransfer {
Expand Down Expand Up @@ -134,6 +136,7 @@ export interface TransferToCreate {

export interface BaseAccountingServiceDependencies extends BaseService {
withdrawalThrottleDelay?: number
config: IAppConfig // TODO: Remove when peers are tenanted
}

interface CreateAccountToAccountTransferArgs {
Expand Down Expand Up @@ -201,8 +204,10 @@ export async function createAccountToAccountTransfer(
const balance = await getAccountBalance(account.id)
if (balance === undefined) throw new Error('undefined account balance')

const { config } = deps
await account.onDebit({
balance
balance,
tenantId: config.operatorTenantId
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/src/accounting/tigerbeetle/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
} from './transfers'
import { toTigerBeetleId } from './utils'
import { TelemetryService } from '../../telemetry/service'
import { IAppConfig } from '../../config/app'

export enum TigerBeetleAccountCode {
LIQUIDITY_WEB_MONETIZATION = 1,
Expand Down Expand Up @@ -71,6 +72,7 @@ export interface ServiceDependencies extends BaseService {
tigerBeetle: Client
telemetry: TelemetryService
withdrawalThrottleDelay?: number
config: IAppConfig // TODO: remove when peers are tenanted
}

export function createAccountingService(
Expand Down
4 changes: 2 additions & 2 deletions packages/backend/src/asset/model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ describe('Models', (): void => {
`(
'creates webhook event if balance=$balance <= liquidityThreshold',
async ({ balance }): Promise<void> => {
await asset.onDebit({ balance })
await asset.onDebit({ balance, tenantId: asset.tenantId }) // TODO: remove once tenanted peers are in
const event = (
await AssetEvent.query(knex).where(
'type',
Expand All @@ -77,7 +77,7 @@ describe('Models', (): void => {
}
)
test('does not create webhook event if balance > liquidityThreshold', async (): Promise<void> => {
await asset.onDebit({ balance: BigInt(110) })
await asset.onDebit({ balance: BigInt(110), tenantId: asset.tenantId }) // TODO: remove once tenanted peers are in
await expect(
AssetEvent.query(knex).where('type', AssetEventType.LiquidityLow)
).resolves.toEqual([])
Expand Down
3 changes: 2 additions & 1 deletion packages/backend/src/asset/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ export class Asset extends BaseModel implements LiquidityAccount {
},
liquidityThreshold: this.liquidityThreshold,
balance
}
},
tenantId: this.tenantId
})
}
}
Expand Down
25 changes: 17 additions & 8 deletions packages/backend/src/graphql/resolvers/liquidity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,8 @@ describe('Liquidity Resolvers', (): void => {
data: payment.toData({
amountSent: BigInt(0),
balance: BigInt(0)
})
}),
tenantId: Config.operatorTenantId
})
})

Expand Down Expand Up @@ -1956,6 +1957,7 @@ describe('Liquidity Resolvers', (): void => {
incomingPaymentId?: string
outgoingPaymentId?: string
walletAddressId?: string
tenantId?: string
}

const isIncomingPaymentEventType = (
Expand Down Expand Up @@ -2011,7 +2013,8 @@ describe('Liquidity Resolvers', (): void => {
accountId: liquidityAccount.id,
assetId: liquidityAccount.asset.id,
amount
}
},
tenantId: Config.operatorTenantId
}

if (resourceId) {
Expand Down Expand Up @@ -2230,7 +2233,8 @@ describe('Liquidity Resolvers', (): void => {
accountId: incomingPayment.id,
assetId: incomingPayment.asset.id,
amount
}
},
tenantId: Config.operatorTenantId
})

const response = await appContainer.apolloClient
Expand Down Expand Up @@ -2278,7 +2282,8 @@ describe('Liquidity Resolvers', (): void => {
accountId: incomingPayment.id,
assetId: incomingPayment.asset.id,
amount
}
},
tenantId: Config.operatorTenantId
})
let error
try {
Expand Down Expand Up @@ -2376,7 +2381,8 @@ describe('Liquidity Resolvers', (): void => {
accountId: incomingPayment.id,
assetId: incomingPayment.asset.id,
amount
}
},
tenantId: Config.operatorTenantId
})
await expect(
accountingService.createWithdrawal({
Expand Down Expand Up @@ -2463,7 +2469,8 @@ describe('Liquidity Resolvers', (): void => {
accountId: outgoingPayment.id,
assetId: outgoingPayment.asset.id,
amount
}
},
tenantId: Config.operatorTenantId
})

const response = await appContainer.apolloClient
Expand Down Expand Up @@ -2604,7 +2611,8 @@ describe('Liquidity Resolvers', (): void => {
accountId: outgoingPayment.id,
assetId: outgoingPayment.asset.id,
amount
}
},
tenantId: Config.operatorTenantId
})
await expect(
accountingService.createWithdrawal({
Expand Down Expand Up @@ -2673,7 +2681,8 @@ describe('Liquidity Resolvers', (): void => {
data: outgoingPayment.toData({
amountSent: BigInt(0),
balance: BigInt(0)
})
}),
tenantId: Config.operatorTenantId
})
})

Expand Down
3 changes: 3 additions & 0 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ export function initIocContainer(
return createTigerbeetleAccountingService({
logger,
knex,
config,
tigerBeetle,
withdrawalThrottleDelay: config.withdrawalThrottleDelay,
telemetry
Expand All @@ -366,6 +367,7 @@ export function initIocContainer(
return createPsqlAccountingService({
logger,
knex,
config,
withdrawalThrottleDelay: config.withdrawalThrottleDelay,
telemetry
})
Expand Down Expand Up @@ -395,6 +397,7 @@ export function initIocContainer(
})
container.singleton('webhookService', async (deps) => {
return createWebhookService({
tenantSettingService: await deps.use('tenantSettingService'),
config: await deps.use('config'),
knex: await deps.use('knex'),
logger: await deps.use('logger')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ describe('Incoming Payment Service', (): void => {
const incomingPayment = await incomingPaymentService.create({
walletAddressId,
...options,
incomingAmount: options.incomingAmount ? amount : undefined
incomingAmount: options.incomingAmount ? amount : undefined,
tenantId: Config.operatorTenantId
})
assert.ok(!isIncomingPaymentError(incomingPayment))
expect(incomingPayment).toMatchObject({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ async function createIncomingPayment(
await IncomingPaymentEvent.query(trx || deps.knex).insert({
incomingPaymentId: incomingPayment.id,
type: IncomingPaymentEventType.IncomingPaymentCreated,
data: incomingPayment.toData(0n)
data: incomingPayment.toData(0n),
tenantId: incomingPayment.tenantId
})

incomingPayment = await addReceivedAmount(deps, incomingPayment, BigInt(0))
Expand Down Expand Up @@ -364,7 +365,8 @@ async function handleDeactivated(
accountId: incomingPayment.id,
assetId: incomingPayment.assetId,
amount: amountReceived
}
},
tenantId: incomingPayment.tenantId
})

await incomingPayment.$query(deps.knex).patch({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ export async function sendWebhookEvent(
outgoingPaymentId: payment.id,
type,
data: payment.toData({ amountSent, balance }),
withdrawal
withdrawal,
tenantId: payment.tenantId
})
stopTimer()
}
Expand Down
6 changes: 4 additions & 2 deletions packages/backend/src/open_payments/wallet_address/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ async function getOrPollByUrl(
type: WalletAddressEventType.WalletAddressNotFound,
data: {
walletAddressUrl: url
}
},
tenantId: deps.config.operatorTenantId
})

deps.logger.debug(
Expand Down Expand Up @@ -458,7 +459,8 @@ async function createWithdrawalEvent(
accountId: walletAddress.id,
assetId: walletAddress.assetId,
amount
}
},
tenantId: walletAddress.tenantId
})

await walletAddress.$query(deps.knex).patch({
Expand Down
10 changes: 7 additions & 3 deletions packages/backend/src/payment-method/ilp/peer/model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe('Models', (): void => {
`(
'creates webhook event if balance=$balance <= liquidityThreshold',
async ({ balance }): Promise<void> => {
await peer.onDebit({ balance })
await peer.onDebit({ balance, tenantId: Config.operatorTenantId })
const event = (
await PeerEvent.query(knex).where(
'type',
Expand All @@ -91,12 +91,16 @@ describe('Models', (): void => {
},
liquidityThreshold: peer.liquidityThreshold?.toString(),
balance: balance.toString()
}
},
tenantId: Config.operatorTenantId
})
}
)
test('does not create webhook event if balance > liquidityThreshold', async (): Promise<void> => {
await peer.onDebit({ balance: BigInt(110) })
await peer.onDebit({
balance: BigInt(110),
tenantId: Config.operatorTenantId
})
await expect(
PeerEvent.query(knex).where('type', PeerEventType.LiquidityLow)
).resolves.toEqual([])
Expand Down
5 changes: 3 additions & 2 deletions packages/backend/src/payment-method/ilp/peer/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class Peer

public name?: string

public async onDebit({ balance }: OnDebitOptions): Promise<Peer> {
public async onDebit({ balance, tenantId }: OnDebitOptions): Promise<Peer> {
if (this.liquidityThreshold !== null) {
if (balance <= this.liquidityThreshold) {
await PeerEvent.query().insert({
Expand All @@ -68,7 +68,8 @@ export class Peer
},
liquidityThreshold: this.liquidityThreshold,
balance
}
},
tenantId // TODO: update when peers are tenanted
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now this is using the operator tenant ID, but will be replaced with the peer's own tenant id once #3129 is merged in.

})
}
}
Expand Down
Loading
Loading