Skip to content

Commit

Permalink
Add billing meter event service (twentyhq#9865)
Browse files Browse the repository at this point in the history
Solves :
twentyhq/private-issues#241
twentyhq/private-issues#254

**TLDR:**

- Add BillingMeterEventService and StripeBillingMeterEventService in
order to send billing meter events to stripe.
- Plugged the service into workflow node execution for testing purposes
(more improvements on this area will be done in the next PR's)

**In order to test:**

- Have the environment variable IS_BILLING_ENABLED set to true and add
the other required environment variables for Billing to work
- Do a database reset (to ensure that the new feature flag is properly
added and that the billing tables are created)
- Run the command: npx nx run twenty-server:command
billing:sync-plans-data (if you don't do that the products and prices
will not be present in the database)
- Run the server , the frontend, the worker, and the stripe listen
command (stripe listen --forward-to
http://localhost:3000/billing/webhooks)
- Buy a subscription for the Acme workspace 
- Create a workflow and run it
- After the run has been finished check in sprite the quantity of events
in the CreditMeter, you should see that there is a new occurence with
value one.

**Take into consideration:**

- I used an eventName that I have made a long time ago, so it hasn't a
significant naming. I'm updating the meters and associated prices in
stripe to use the correct meter with a more clearer eventName.
- I put some error handling in the execution of the workflow nodes, this
is still incomplete and needs some refinement, I would like the feedback
of the workflows track for a more cleaner approach
  • Loading branch information
anamarn authored Jan 29, 2025
1 parent 1b3181b commit 0d6f4a3
Show file tree
Hide file tree
Showing 14 changed files with 251 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ export enum BillingExceptionCode {
BILLING_PRODUCT_NOT_FOUND = 'BILLING_PRODUCT_NOT_FOUND',
BILLING_PRICE_NOT_FOUND = 'BILLING_PRICE_NOT_FOUND',
BILLING_SUBSCRIPTION_EVENT_WORKSPACE_NOT_FOUND = 'BILLING_SUBSCRIPTION_EVENT_WORKSPACE_NOT_FOUND',
BILLING_ACTIVE_SUBSCRIPTION_NOT_FOUND = 'BILLING_ACTIVE_SUBSCRIPTION_NOT_FOUND',
BILLING_METER_EVENT_FAILED = 'BILLING_METER_EVENT_FAILED',
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import { BillingProduct } from 'src/engine/core-modules/billing/entities/billing
import { BillingSubscriptionItem } from 'src/engine/core-modules/billing/entities/billing-subscription-item.entity';
import { BillingSubscription } from 'src/engine/core-modules/billing/entities/billing-subscription.entity';
import { BillingRestApiExceptionFilter } from 'src/engine/core-modules/billing/filters/billing-api-exception.filter';
import { BillingExecuteBilledFunctionListener } from 'src/engine/core-modules/billing/listeners/billing-execute-billed-function.listener';
import { BillingWorkspaceMemberListener } from 'src/engine/core-modules/billing/listeners/billing-workspace-member.listener';
import { BillingPlanService } from 'src/engine/core-modules/billing/services/billing-plan.service';
import { BillingPortalWorkspaceService } from 'src/engine/core-modules/billing/services/billing-portal.workspace-service';
import { BillingProductService } from 'src/engine/core-modules/billing/services/billing-product.service';
import { BillingSubscriptionService } from 'src/engine/core-modules/billing/services/billing-subscription.service';
import { BillingUsageService } from 'src/engine/core-modules/billing/services/billing-usage.service';
import { BillingService } from 'src/engine/core-modules/billing/services/billing.service';
import { StripeModule } from 'src/engine/core-modules/billing/stripe/stripe.module';
import { BillingWebhookEntitlementService } from 'src/engine/core-modules/billing/webhooks/services/billing-webhook-entitlement.service';
Expand Down Expand Up @@ -63,17 +65,20 @@ import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
BillingResolver,
BillingPlanService,
BillingWorkspaceMemberListener,
BillingExecuteBilledFunctionListener,
BillingService,
BillingWebhookProductService,
BillingWebhookPriceService,
BillingRestApiExceptionFilter,
BillingSyncCustomerDataCommand,
BillingSyncPlansDataCommand,
BillingUsageService,
],
exports: [
BillingSubscriptionService,
BillingPortalWorkspaceService,
BillingService,
BillingUsageService,
],
})
export class BillingModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const BILLING_EXECUTE_BILLED_FUNCTION =
'billing_execute_billed_function';
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export enum BillingMeterEventName {
WORKFLOW_NODE_RUN = 'creditexecutiontest1',
}
//this is a test event name (no conventions) would you want camel case?, snake case, or all caps?
//Something like workflowNodeRunBillingMeterEvent ?
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Injectable } from '@nestjs/common';

import { OnCustomBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-custom-batch-event.decorator';
import { BILLING_EXECUTE_BILLED_FUNCTION } from 'src/engine/core-modules/billing/constants/billing-execute-billed-function.constant';
import { BillingUsageService } from 'src/engine/core-modules/billing/services/billing-usage.service';
import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';

@Injectable()
export class BillingExecuteBilledFunctionListener {
constructor(
private readonly billingUsageService: BillingUsageService,
private readonly environmentService: EnvironmentService,
) {}

@OnCustomBatchEvent(BILLING_EXECUTE_BILLED_FUNCTION)
async handleExecuteBilledFunctionEvent(
payload: WorkspaceEventBatch<BillingUsageEvent>,
) {
if (!this.environmentService.get('IS_BILLING_ENABLED')) {
return;
}

const canExecuteBilledFunction =
await this.billingUsageService.canExecuteBilledFunction(
payload.workspaceId,
);

if (!canExecuteBilledFunction) {
return;
}

await this.billingUsageService.billUsage({
workspaceId: payload.workspaceId,
billingEvents: payload.events,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ export class BillingPlanService {
}
const { baseProduct, meteredProducts, otherLicensedProducts } = plan;
const baseProductPrice = baseProduct.billingPrices.find(
(price) => price.interval === interval,
(price) => price.interval === interval && price.active,
);

if (!baseProductPrice) {
throw new BillingException(
'Base product price not found for given interval',
'Base product active price not found for given interval',
BillingExceptionCode.BILLING_PRICE_NOT_FOUND,
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ export class BillingProductService {
billingProductsByPlan: BillingProduct[];
}): BillingPrice[] {
const billingPrices = billingProductsByPlan.flatMap((product) =>
product.billingPrices.filter((price) => price.interval === interval),
product.billingPrices.filter(
(price) => price.interval === interval && price.active,
),
);

return billingPrices;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';

import { Repository } from 'typeorm';

import {
BillingException,
BillingExceptionCode,
} from 'src/engine/core-modules/billing/billing.exception';
import { BillingCustomer } from 'src/engine/core-modules/billing/entities/billing-customer.entity';
import { BillingSubscriptionService } from 'src/engine/core-modules/billing/services/billing-subscription.service';
import { StripeBillingMeterEventService } from 'src/engine/core-modules/billing/stripe/services/stripe-billing-meter-event.service';
import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum';
import { FeatureFlagService } from 'src/engine/core-modules/feature-flag/services/feature-flag.service';

@Injectable()
export class BillingUsageService {
protected readonly logger = new Logger(BillingUsageService.name);
constructor(
@InjectRepository(BillingCustomer, 'core')
private readonly billingCustomerRepository: Repository<BillingCustomer>,
private readonly featureFlagService: FeatureFlagService,
private readonly billingSubscriptionService: BillingSubscriptionService,
private readonly environmentService: EnvironmentService,
private readonly stripeBillingMeterEventService: StripeBillingMeterEventService,
) {}

async canExecuteBilledFunction(workspaceId: string): Promise<boolean> {
const isBillingEnabled = this.environmentService.get('IS_BILLING_ENABLED');
const isBillingPlansEnabled =
await this.featureFlagService.isFeatureEnabled(
FeatureFlagKey.IsBillingPlansEnabled,
workspaceId,
);

if (!isBillingPlansEnabled || !isBillingEnabled) {
return true;
}

const billingSubscription =
await this.billingSubscriptionService.getCurrentBillingSubscriptionOrThrow(
{
workspaceId,
},
);

if (!billingSubscription) {
return false;
}

return true;
}

async billUsage({
workspaceId,
billingEvents,
}: {
workspaceId: string;
billingEvents: BillingUsageEvent[];
}) {
const workspaceStripeCustomer =
await this.billingCustomerRepository.findOne({
where: {
workspaceId,
},
});

if (!workspaceStripeCustomer) {
throw new BillingException(
'Stripe customer not found',
BillingExceptionCode.BILLING_CUSTOMER_NOT_FOUND,
);
}

try {
await this.stripeBillingMeterEventService.sendBillingMeterEvent({
eventName: billingEvents[0].eventName,
value: billingEvents[0].value,
stripeCustomerId: workspaceStripeCustomer.stripeCustomerId,
});
} catch (error) {
throw new BillingException(
'Failed to send billing meter events to Cache Service',
BillingExceptionCode.BILLING_METER_EVENT_FAILED,
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { Injectable, Logger } from '@nestjs/common';

import Stripe from 'stripe';

import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';
import { StripeSDKService } from 'src/engine/core-modules/billing/stripe/stripe-sdk/services/stripe-sdk.service';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';

@Injectable()
export class StripeBillingMeterEventService {
protected readonly logger = new Logger(StripeBillingMeterEventService.name);
private readonly stripe: Stripe;

constructor(
private readonly environmentService: EnvironmentService,
private readonly stripeSDKService: StripeSDKService,
) {
if (!this.environmentService.get('IS_BILLING_ENABLED')) {
return;
}
this.stripe = this.stripeSDKService.getStripe(
this.environmentService.get('BILLING_STRIPE_API_KEY'),
);
}

async sendBillingMeterEvent({
eventName,
value,
stripeCustomerId,
}: {
eventName: BillingMeterEventName;
value: number;
stripeCustomerId: string;
}) {
await this.stripe.billing.meterEvents.create({
event_name: eventName,
payload: {
value: value.toString(),
stripe_customer_id: stripeCustomerId,
},
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Module } from '@nestjs/common';

import { StripeBillingMeterEventService } from 'src/engine/core-modules/billing/stripe/services/stripe-billing-meter-event.service';
import { StripeBillingMeterService } from 'src/engine/core-modules/billing/stripe/services/stripe-billing-meter.service';
import { StripeBillingPortalService } from 'src/engine/core-modules/billing/stripe/services/stripe-billing-portal.service';
import { StripeCheckoutService } from 'src/engine/core-modules/billing/stripe/services/stripe-checkout.service';
Expand All @@ -24,6 +25,7 @@ import { DomainManagerModule } from 'src/engine/core-modules/domain-manager/doma
StripeCustomerService,
StripePriceService,
StripeProductService,
StripeBillingMeterEventService,
],
exports: [
StripeWebhookService,
Expand All @@ -35,6 +37,7 @@ import { DomainManagerModule } from 'src/engine/core-modules/domain-manager/doma
StripeSubscriptionItemService,
StripeSubscriptionService,
StripeProductService,
StripeBillingMeterEventService,
],
})
export class StripeModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { NonNegative } from 'type-fest';

import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';

export type BillingUsageEvent = {
eventName: BillingMeterEventName;
value: NonNegative<number>;
};
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { Injectable, Logger } from '@nestjs/common';

import { BILLING_EXECUTE_BILLED_FUNCTION } from 'src/engine/core-modules/billing/constants/billing-execute-billed-function.constant';
import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';
import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
WorkflowRunOutput,
WorkflowRunStatus,
Expand All @@ -19,7 +24,11 @@ export type WorkflowExecutorOutput = {
@Injectable()
export class WorkflowExecutorWorkspaceService {
private readonly logger = new Logger(WorkflowExecutorWorkspaceService.name);
constructor(private readonly workflowActionFactory: WorkflowActionFactory) {}
constructor(
private readonly workflowActionFactory: WorkflowActionFactory,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
) {}

async execute({
currentStepIndex,
Expand Down Expand Up @@ -64,6 +73,10 @@ export class WorkflowExecutorWorkspaceService {
result.error?.errorMessage ??
(result.result ? undefined : 'Execution result error, no data or error');

if (!error) {
this.sendUsageEvent();
}

const updatedStepOutput = {
id: step.id,
name: step.name,
Expand Down Expand Up @@ -122,4 +135,20 @@ export class WorkflowExecutorWorkspaceService {

return { ...updatedOutput, status: WorkflowRunStatus.FAILED };
}

async sendUsageEvent() {
const workspaceId =
this.scopedWorkspaceContextFactory.create().workspaceId ?? '';

this.workspaceEventEmitter.emitCustomBatchEvent<BillingUsageEvent>(
BILLING_EXECUTE_BILLED_FUNCTION,
[
{
eventName: BillingMeterEventName.WORKFLOW_NODE_RUN,
value: 1,
},
],
workspaceId,
);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Module } from '@nestjs/common';

import { BillingModule } from 'src/engine/core-modules/billing/billing.module';
import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.module';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module';
Expand All @@ -8,7 +9,12 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service';

@Module({
imports: [WorkflowCommonModule, WorkflowExecutorModule, ThrottlerModule],
imports: [
WorkflowCommonModule,
WorkflowExecutorModule,
ThrottlerModule,
BillingModule,
],
providers: [
WorkflowRunnerWorkspaceService,
WorkflowRunWorkspaceService,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';

import { BillingUsageService } from 'src/engine/core-modules/billing/services/billing-usage.service';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
Expand All @@ -12,10 +13,12 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne

@Injectable()
export class WorkflowRunnerWorkspaceService {
private readonly logger = new Logger(WorkflowRunnerWorkspaceService.name);
constructor(
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
@InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService,
private readonly billingUsageService: BillingUsageService,
) {}

async run(
Expand All @@ -24,6 +27,14 @@ export class WorkflowRunnerWorkspaceService {
payload: object,
source: ActorMetadata,
) {
const canExecuteBilledFunction =
await this.billingUsageService.canExecuteBilledFunction(workspaceId);

if (!canExecuteBilledFunction) {
this.logger.log(
'Cannot execute billed function, there is no subscription for this workspace',
);
}
const workflowRunId =
await this.workflowRunWorkspaceService.createWorkflowRun(
workflowVersionId,
Expand Down

0 comments on commit 0d6f4a3

Please sign in to comment.