Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: escape destroyed objects on workers #9719

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import { Injectable } from '@nestjs/common';

import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/types/object-record-delete.event';
import { ObjectRecordDestroyEvent } from 'src/engine/core-modules/event-emitter/types/object-record-destroy.event';
import { ObjectRecordEvent } from 'src/engine/core-modules/event-emitter/types/object-record-event.event';
import { ObjectRecordNonDestructiveEvent } from 'src/engine/core-modules/event-emitter/types/object-record-non-destructive-event';
import { ObjectRecordRestoreEvent } from 'src/engine/core-modules/event-emitter/types/object-record-restore.event';
import { ObjectRecordUpdateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-update.event';
import { ObjectRecordBaseEvent } from 'src/engine/core-modules/event-emitter/types/object-record.base.event';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { CreateAuditLogFromInternalEvent } from 'src/modules/timeline/jobs/create-audit-log-from-internal-event';
import { UpsertTimelineActivityFromInternalEvent } from 'src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { CallWebhookJobsJob } from 'src/modules/webhook/jobs/call-webhook-jobs.job';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';

@Injectable()
export class EntityEventsToDbListener {
Expand All @@ -24,47 +28,66 @@ export class EntityEventsToDbListener {

@OnDatabaseBatchEvent('*', DatabaseEventAction.CREATED)
async handleCreate(batchEvent: WorkspaceEventBatch<ObjectRecordCreateEvent>) {
return this.handle(batchEvent);
return this.handleEvent(batchEvent, DatabaseEventAction.CREATED);
}

@OnDatabaseBatchEvent('*', DatabaseEventAction.UPDATED)
async handleUpdate(batchEvent: WorkspaceEventBatch<ObjectRecordUpdateEvent>) {
return this.handle(batchEvent);
return this.handleEvent(batchEvent, DatabaseEventAction.UPDATED);
}

@OnDatabaseBatchEvent('*', DatabaseEventAction.DELETED)
async handleDelete(batchEvent: WorkspaceEventBatch<ObjectRecordUpdateEvent>) {
return this.handle(batchEvent);
async handleDelete(batchEvent: WorkspaceEventBatch<ObjectRecordDeleteEvent>) {
return this.handleEvent(batchEvent, DatabaseEventAction.DELETED);
}

@OnDatabaseBatchEvent('*', DatabaseEventAction.RESTORED)
async handleRestore(
batchEvent: WorkspaceEventBatch<ObjectRecordRestoreEvent>,
) {
return this.handleEvent(batchEvent, DatabaseEventAction.RESTORED);
}

@OnDatabaseBatchEvent('*', DatabaseEventAction.DESTROYED)
async handleDestroy(
batchEvent: WorkspaceEventBatch<ObjectRecordUpdateEvent>,
batchEvent: WorkspaceEventBatch<ObjectRecordDestroyEvent>,
) {
return this.handle(batchEvent);
return this.handleEvent(batchEvent, DatabaseEventAction.DESTROYED);
}

private async handle(batchEvent: WorkspaceEventBatch<ObjectRecordBaseEvent>) {
private async handleEvent<T extends ObjectRecordEvent>(
batchEvent: WorkspaceEventBatch<T>,
action: DatabaseEventAction,
) {
const filteredEvents = batchEvent.events.filter(
(event) => event.objectMetadata?.isAuditLogged,
);

await this.entityEventsToDbQueueService.add<
WorkspaceEventBatch<ObjectRecordBaseEvent>
>(CreateAuditLogFromInternalEvent.name, {
...batchEvent,
events: filteredEvents,
});

await this.entityEventsToDbQueueService.add<
WorkspaceEventBatch<ObjectRecordBaseEvent>
>(UpsertTimelineActivityFromInternalEvent.name, {
...batchEvent,
events: filteredEvents,
});

await this.webhookQueueService.add<
WorkspaceEventBatch<ObjectRecordBaseEvent>
>(CallWebhookJobsJob.name, batchEvent, { retryLimit: 3 });
await Promise.all([
this.webhookQueueService.add<WorkspaceEventBatch<T>>(
CallWebhookJobsJob.name,
batchEvent,
{
retryLimit: 3,
},
),
this.entityEventsToDbQueueService.add<WorkspaceEventBatch<T>>(
CreateAuditLogFromInternalEvent.name,
{
...batchEvent,
events: filteredEvents,
},
),
...(action !== DatabaseEventAction.DESTROYED
? [
this.entityEventsToDbQueueService.add<
WorkspaceEventBatch<ObjectRecordNonDestructiveEvent>
>(UpsertTimelineActivityFromInternalEvent.name, {
...batchEvent,
events: filteredEvents,
}),
]
: []),
]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { MicrosoftAPIsService } from 'src/engine/core-modules/auth/services/micr
// import { OAuthService } from 'src/engine/core-modules/auth/services/oauth.service';
import { ResetPasswordService } from 'src/engine/core-modules/auth/services/reset-password.service';
import { SignInUpService } from 'src/engine/core-modules/auth/services/sign-in-up.service';
import { SocialSsoService } from 'src/engine/core-modules/auth/services/social-sso.service';
import { SamlAuthStrategy } from 'src/engine/core-modules/auth/strategies/saml.auth.strategy';
import { AccessTokenService } from 'src/engine/core-modules/auth/token/services/access-token.service';
import { LoginTokenService } from 'src/engine/core-modules/auth/token/services/login-token.service';
Expand All @@ -43,7 +44,6 @@ import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-s
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { WorkspaceManagerModule } from 'src/engine/workspace-manager/workspace-manager.module';
import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module';
import { SocialSsoService } from 'src/engine/core-modules/auth/services/social-sso.service';

import { AuthResolver } from './auth.resolver';

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/types/object-record-delete.event';
import { ObjectRecordDestroyEvent } from 'src/engine/core-modules/event-emitter/types/object-record-destroy.event';
import { ObjectRecordRestoreEvent } from 'src/engine/core-modules/event-emitter/types/object-record-restore.event';
import { ObjectRecordUpdateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-update.event';

export type ObjectRecordEvent<T = object> =
| ObjectRecordUpdateEvent<T>
| ObjectRecordDeleteEvent<T>
| ObjectRecordCreateEvent<T>
| ObjectRecordDestroyEvent<T>
| ObjectRecordRestoreEvent<T>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/types/object-record-delete.event';
import { ObjectRecordRestoreEvent } from 'src/engine/core-modules/event-emitter/types/object-record-restore.event';
import { ObjectRecordUpdateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-update.event';

export type ObjectRecordNonDestructiveEvent =
| ObjectRecordCreateEvent
| ObjectRecordUpdateEvent
| ObjectRecordDeleteEvent
| ObjectRecordRestoreEvent;
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { ObjectRecordBaseEvent } from 'src/engine/core-modules/event-emitter/types/object-record.base.event';
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';

export class ObjectRecordRestoreEvent<
T = object,
> extends ObjectRecordBaseEvent<T> {
> extends ObjectRecordCreateEvent<T> {
properties: {
before: T;
after: T;
};
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { ObjectRecordBaseEvent } from 'src/engine/core-modules/event-emitter/types/object-record.base.event';
import { ObjectRecordDiff } from 'src/engine/core-modules/event-emitter/types/object-record-diff';
import { ObjectRecordBaseEvent } from 'src/engine/core-modules/event-emitter/types/object-record.base.event';

export class ObjectRecordUpdateEvent<
T = object,
> extends ObjectRecordBaseEvent<T> {
properties: {
updatedFields?: string[];
diff?: Partial<ObjectRecordDiff<T>>;
before: T;
after: T;
diff?: Partial<ObjectRecordDiff<T>>;
};
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Injectable } from '@nestjs/common';

import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
import { ObjectRecordUpdateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-update.event';
import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperties } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-properties.util';
Expand All @@ -16,8 +18,6 @@ import {
MessageParticipantUnmatchParticipantJobData,
} from 'src/modules/messaging/message-participant-manager/jobs/message-participant-unmatch-participant.job';
import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';

@Injectable()
export class MessageParticipantPersonListener {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ObjectRecordBaseEvent } from 'src/engine/core-modules/event-emitter/types/object-record.base.event';
import { ObjectRecordEvent } from 'src/engine/core-modules/event-emitter/types/object-record-event.event';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
Expand All @@ -20,7 +20,7 @@ export class CreateAuditLogFromInternalEvent {

@Process(CreateAuditLogFromInternalEvent.name)
async handle(
workspaceEventBatch: WorkspaceEventBatch<ObjectRecordBaseEvent>,
workspaceEventBatch: WorkspaceEventBatch<ObjectRecordEvent>,
): Promise<void> {
for (const eventData of workspaceEventBatch.events) {
let workspaceMemberId: string | null = null;
Expand All @@ -34,16 +34,14 @@ export class CreateAuditLogFromInternalEvent {
workspaceMemberId = workspaceMember.id;
}

if (eventData.properties.diff) {
// we remove "before" and "after" property for a cleaner/slimmer event payload
eventData.properties = {
diff: eventData.properties.diff,
};
}

await this.auditLogRepository.insert(
workspaceEventBatch.name,
eventData.properties,
'diff' in eventData.properties
? {
// we remove "before" and "after" property for a cleaner/slimmer event payload
diff: eventData.properties.diff,
}
: eventData.properties,
workspaceMemberId,
workspaceEventBatch.name.split('.')[0],
eventData.objectMetadata.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ObjectRecordBaseEvent } from 'src/engine/core-modules/event-emitter/types/object-record.base.event';
import { ObjectRecordNonDestructiveEvent } from 'src/engine/core-modules/event-emitter/types/object-record-non-destructive-event';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
Expand All @@ -7,7 +7,6 @@ import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/wo
import { TimelineActivityService } from 'src/modules/timeline/services/timeline-activity.service';
import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity';

@Processor(MessageQueue.entityEventsToDbQueue)
export class UpsertTimelineActivityFromInternalEvent {
Expand All @@ -19,9 +18,7 @@ export class UpsertTimelineActivityFromInternalEvent {

@Process(UpsertTimelineActivityFromInternalEvent.name)
async handle(
workspaceEventBatch: WorkspaceEventBatch<
ObjectRecordBaseEvent<TimelineActivityWorkspaceEntity>
>,
workspaceEventBatch: WorkspaceEventBatch<ObjectRecordNonDestructiveEvent>,
): Promise<void> {
for (const eventData of workspaceEventBatch.events) {
if (eventData.userId) {
Expand All @@ -33,13 +30,6 @@ export class UpsertTimelineActivityFromInternalEvent {
eventData.workspaceMemberId = workspaceMember.id;
}

if (eventData.properties.diff) {
// we remove "before" and "after" property for a cleaner/slimmer event payload
eventData.properties = {
diff: eventData.properties.diff,
};
}

// Temporary
// We ignore every that is not a LinkedObject or a Business Object
if (
Expand All @@ -51,7 +41,16 @@ export class UpsertTimelineActivityFromInternalEvent {
}

await this.timelineActivityService.upsertEvent({
event: eventData,
event:
// we remove "before" and "after" property for a cleaner/slimmer event payload
'diff' in eventData.properties && eventData.properties.diff
? {
...eventData,
properties: {
diff: eventData.properties.diff,
},
}
: eventData,
eventName: workspaceEventBatch.name,
workspaceId: workspaceEventBatch.workspaceId,
});
Expand Down
Loading
Loading