Skip to content

Commit

Permalink
Merge pull request #18 from getlarge/17-fix-improve-communication-bet…
Browse files Browse the repository at this point in the history
…ween-services

refactor: improve communication between services
  • Loading branch information
getlarge authored Jan 7, 2024
2 parents a08723b + 2e0a3e2 commit 8771dd3
Show file tree
Hide file tree
Showing 18 changed files with 195 additions and 179 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ yarn docker:deps:up
# start Nginx Proxy (for backend services and frontend app)
yarn docker:proxy:up

# Generate Ory network configuration from .env
yarn ory:generate:kratos
yarn ory:generate:keto

# start Ory network (Kratos and Keto with database migrations)
yarn docker:ory:up

Expand Down
50 changes: 26 additions & 24 deletions apps/expiration/src/app/orders/orders-ms.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
Logger,
UseFilters,
ValidationPipe,
ValidationPipeOptions,
} from '@nestjs/common';
import {
Ctx,
Expand All @@ -22,6 +23,14 @@ import type { Message } from 'amqplib';

import { OrderService } from './orders.service';

const validationPipeOptions: ValidationPipeOptions = {
transform: true,
transformOptions: { enableImplicitConversion: true },
exceptionFactory: requestValidationErrorFactory,
forbidUnknownValues: true,
whitelist: true,
};

@UseFilters(GlobalErrorFilter)
@Controller()
export class OrdersMSController {
Expand All @@ -34,58 +43,51 @@ export class OrdersMSController {
@ApiExcludeEndpoint()
@EventPattern(Patterns.OrderCreated, Transport.RMQ)
async onCreated(
@Payload(
new ValidationPipe({
transform: true,
transformOptions: { enableImplicitConversion: true },
exceptionFactory: requestValidationErrorFactory,
forbidUnknownValues: true,
whitelist: true,
}),
)
@Payload(new ValidationPipe(validationPipeOptions))
data: Order,
@Ctx() context: RmqContext,
): Promise<void> {
): Promise<{
ok: boolean;
}> {
const channel = context.getChannelRef() as Channel;
const message = context.getMessage() as Message;
const pattern = context.getPattern();
this.logger.debug(`received message on ${pattern}`, {
data,
});
// TODO: conditional ack

try {
await this.orderService.createJob(data);
} finally {
channel.ack(message);
return { ok: true };
} catch (e) {
channel.nack(message, false, false);
throw e;
}
}

@ApiExcludeEndpoint()
@EventPattern(Patterns.OrderCancelled, Transport.RMQ)
async onCancelled(
@Payload(
new ValidationPipe({
transform: true,
transformOptions: { enableImplicitConversion: true },
exceptionFactory: requestValidationErrorFactory,
forbidUnknownValues: true,
whitelist: true,
}),
)
@Payload(new ValidationPipe(validationPipeOptions))
data: Order,
@Ctx() context: RmqContext,
): Promise<void> {
): Promise<{
ok: boolean;
}> {
const channel = context.getChannelRef() as Channel;
const message = context.getMessage() as Message;
const pattern = context.getPattern();
this.logger.debug(`received message on ${pattern}`, {
data,
});
// TODO: conditional ack
try {
await this.orderService.cancelJob(data);
} finally {
channel.ack(message);
return { ok: true };
} catch (e) {
channel.nack(message, false, false);
throw e;
}
}
}
10 changes: 4 additions & 6 deletions apps/expiration/src/app/orders/orders.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ export class OrdersProcessor {
const { data } = job;
this.logger.debug(`Expire order ${data.id}`);
await lastValueFrom(
this.client
.emit<
ExpirationCompletedEvent['name'],
ExpirationCompletedEvent['data']
>(Patterns.ExpirationCompleted, data)
.pipe(),
this.client.emit<
ExpirationCompletedEvent['name'],
ExpirationCompletedEvent['data']
>(Patterns.ExpirationCompleted, data),
);
}

Expand Down
37 changes: 23 additions & 14 deletions apps/orders/src/app/orders/orders-ms.controller.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import { Controller, Inject, Logger, ValidationPipe } from '@nestjs/common';
import {
Controller,
Inject,
Logger,
ValidationPipe,
ValidationPipeOptions,
} from '@nestjs/common';
import {
Ctx,
EventPattern,
MessagePattern,
Payload,
RmqContext,
Transport,
Expand All @@ -16,8 +23,16 @@ import { Payment } from '@ticketing/shared/models';
import type { Channel } from 'amqp-connection-manager';
import type { Message } from 'amqplib';

import { OrderDto } from './models';
import { OrdersService } from './orders.service';

const validationPipeOptions: ValidationPipeOptions = {
transform: true,
transformOptions: { enableImplicitConversion: true },
exceptionFactory: requestValidationErrorFactory,
forbidUnknownValues: true,
};

@Controller()
export class OrdersMSController {
readonly logger = new Logger(OrdersMSController.name);
Expand All @@ -43,37 +58,31 @@ export class OrdersMSController {
channel.ack(message);
} catch (e) {
// TODO: requeue when error is timeout or connection error
channel.nack(message);
channel.nack(message, false, false);
throw e;
}
}

@ApiExcludeEndpoint()
@EventPattern(Patterns.PaymentCreated, Transport.RMQ)
@MessagePattern(Patterns.PaymentCreated, Transport.RMQ)
async onPaymentCreated(
@Payload(
new ValidationPipe({
transform: true,
transformOptions: { enableImplicitConversion: true },
exceptionFactory: requestValidationErrorFactory,
forbidUnknownValues: true,
}),
)
@Payload(new ValidationPipe(validationPipeOptions))
data: Payment,
@Ctx() context: RmqContext,
): Promise<void> {
): Promise<OrderDto> {
const channel = context.getChannelRef() as Channel;
const message = context.getMessage() as Message;
const pattern = context.getPattern();
this.logger.debug(`received message on ${pattern}`, {
data,
});
try {
await this.ordersService.complete(data);
const order = await this.ordersService.complete(data);
channel.ack(message);
return order;
} catch (e) {
// TODO: requeue when error is timeout or connection error
channel.nack(message);
channel.nack(message, false, false);
throw e;
}
}
Expand Down
2 changes: 1 addition & 1 deletion apps/orders/src/app/orders/orders.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class OrdersController {
return relationTupleToString({
namespace: PermissionNamespaces[Resources.TICKETS],
object: resourceId,
relation: 'owners',
relation: 'order',
subjectIdOrSet: {
namespace: PermissionNamespaces[Resources.USERS],
object: currentUserId,
Expand Down
24 changes: 12 additions & 12 deletions apps/orders/src/app/orders/orders.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import { PermissionNamespaces } from '@ticketing/microservices/shared/models';
import { transactionManager } from '@ticketing/microservices/shared/mongo';
import { RelationTuple } from '@ticketing/microservices/shared/relation-tuple-parser';
import { Resources } from '@ticketing/shared/constants';
import { User } from '@ticketing/shared/models';
import { Ticket, User } from '@ticketing/shared/models';
import { isEmpty } from 'lodash-es';
import { Model } from 'mongoose';
import { lastValueFrom, Observable, zip } from 'rxjs';
import { lastValueFrom, Observable, timeout, zip } from 'rxjs';

import type { EnvironmentVariables } from '../env';
import {
Expand Down Expand Up @@ -56,14 +56,14 @@ export class OrdersService {
);
}

emitEvent(
private sendEvent(
pattern: Patterns.OrderCreated | Patterns.OrderCancelled,
event: OrderCreatedEvent['data'] | OrderCancelledEvent['data'],
): Observable<[string, string, string]> {
): Observable<[Ticket, { ok: boolean }, { ok: boolean }]> {
return zip(
this.ticketsClient.emit<string, typeof event>(pattern, event),
this.expirationClient.emit<string, typeof event>(pattern, event),
this.paymentsClient.emit<string, typeof event>(pattern, event),
this.ticketsClient.send(pattern, event).pipe(timeout(5000)),
this.expirationClient.send(pattern, event).pipe(timeout(5000)),
this.paymentsClient.send(pattern, event).pipe(timeout(5000)),
);
}

Expand Down Expand Up @@ -153,7 +153,7 @@ export class OrdersService {
await this.createRelationShip(relationTupleWithUser);
this.logger.debug(`Created relation ${relationTupleWithUser.toString()}`);
// 7. Publish an event
await lastValueFrom(this.emitEvent(Patterns.OrderCreated, order));
await lastValueFrom(this.sendEvent(Patterns.OrderCreated, order));
this.logger.debug(`Sent event ${Patterns.OrderCreated}`);
return order;
});
Expand Down Expand Up @@ -207,7 +207,7 @@ export class OrdersService {
await this.deleteRelationShip(relationTuple);

await lastValueFrom(
this.emitEvent(Patterns.OrderCancelled, updatedOrder),
this.sendEvent(Patterns.OrderCancelled, updatedOrder),
);
return updatedOrder;
});
Expand Down Expand Up @@ -235,7 +235,7 @@ export class OrdersService {

const relationTuple = new RelationTuple(
PermissionNamespaces[Resources.ORDERS],
order.id,
updatedOrder.id,
'parents',
{
namespace: PermissionNamespaces[Resources.TICKETS],
Expand All @@ -245,7 +245,7 @@ export class OrdersService {
await this.deleteRelationShip(relationTuple);

await lastValueFrom(
this.emitEvent(Patterns.OrderCancelled, updatedOrder),
this.sendEvent(Patterns.OrderCancelled, updatedOrder),
);
return updatedOrder;
});
Expand All @@ -263,7 +263,7 @@ export class OrdersService {
order.set({ status: OrderStatus.Complete });
await order.save();
const result = order.toJSON<Order>();
//? TODO: this.emitEvent(Patterns.OrderComplete, result);
//? TODO: this.sendEvent(Patterns.OrderComplete, result);
return result;
}
}
Loading

0 comments on commit 8771dd3

Please sign in to comment.