Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TM-1652] Add Workdays and new columns for some tables. #44

Merged
merged 9 commits into from
Jan 22, 2025
45 changes: 30 additions & 15 deletions apps/unified-database-service/src/airtable/airtable.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,34 @@ import { ConfigService } from "@nestjs/config";
import Airtable from "airtable";
import {
ApplicationEntity,
DemographicEntity,
NurseryEntity,
NurseryReportEntity,
OrganisationEntity,
ProjectEntity,
ProjectReportEntity,
RestorationPartnerEntity,
SiteEntity,
SiteReportEntity,
TreeSpeciesEntity
TreeSpeciesEntity,
WorkdayEntity
} from "./entities";
import * as Sentry from "@sentry/node";
import { SlackService } from "nestjs-slack";

export const AIRTABLE_ENTITIES = {
application: ApplicationEntity,
demographic: DemographicEntity,
nursery: NurseryEntity,
"nursery-report": NurseryReportEntity,
organisation: OrganisationEntity,
project: ProjectEntity,
"project-report": ProjectReportEntity,
"restoration-partner": RestorationPartnerEntity,
site: SiteEntity,
"site-report": SiteReportEntity,
"tree-species": TreeSpeciesEntity
"tree-species": TreeSpeciesEntity,
workday: WorkdayEntity
};

export type EntityType = keyof typeof AIRTABLE_ENTITIES;
Expand Down Expand Up @@ -64,26 +70,28 @@ export class AirtableProcessor extends WorkerHost {
}

async process(job: Job) {
switch (job.name) {
const { name, data } = job;
await this.sendSlackUpdate(`:construction_worker: Beginning job: ${JSON.stringify({ name, data })}`);
switch (name) {
case "updateEntities":
return await this.updateEntities(job.data as UpdateEntitiesData);
return await this.updateEntities(data as UpdateEntitiesData);

case "deleteEntities":
return await this.deleteEntities(job.data as DeleteEntitiesData);
return await this.deleteEntities(data as DeleteEntitiesData);

case "updateAll":
return await this.updateAll(job.data as UpdateAllData);
return await this.updateAll(data as UpdateAllData);

default:
throw new NotImplementedException(`Unknown job type: ${job.name}`);
throw new NotImplementedException(`Unknown job type: ${name}`);
}
}

@OnWorkerEvent("failed")
async onFailed(job: Job, error: Error) {
Sentry.captureException(error);
this.logger.error(`Worker event failed: ${JSON.stringify(job)}`, error.stack);
this.sendSlackUpdate(`:warning: ERROR: Job processing failed: ${JSON.stringify(job)}`);
await this.sendSlackUpdate(`:warning: ERROR: Job processing failed: ${JSON.stringify(job)}`);
}

private async updateEntities({ entityType, startPage, updatedSince }: UpdateEntitiesData) {
Expand All @@ -98,7 +106,7 @@ export class AirtableProcessor extends WorkerHost {
await entity.updateBase(this.base, { startPage, updatedSince });

this.logger.log(`Completed entity update: ${JSON.stringify({ entityType, updatedSince })}`);
this.sendSlackUpdate(`Completed updating table "${entity.TABLE_NAME}" [updatedSince: ${updatedSince}]`);
await this.sendSlackUpdate(`Completed updating table "${entity.TABLE_NAME}" [updatedSince: ${updatedSince}]`);
}

private async deleteEntities({ entityType, deletedSince }: DeleteEntitiesData) {
Expand All @@ -113,23 +121,30 @@ export class AirtableProcessor extends WorkerHost {
await entity.deleteStaleRecords(this.base, deletedSince);

this.logger.log(`Completed entity delete: ${JSON.stringify({ entityType, deletedSince })}`);
this.sendSlackUpdate(`Completed deleting rows from table "${entity.TABLE_NAME}" [deletedSince: ${deletedSince}]`);
await this.sendSlackUpdate(
`Completed deleting rows from table "${entity.TABLE_NAME}" [deletedSince: ${deletedSince}]`
);
}

private async updateAll({ updatedSince }: UpdateAllData) {
this.sendSlackUpdate(`:white_check_mark: Beginning sync of all data [changedSince: ${updatedSince}]`);
await this.sendSlackUpdate(`:white_check_mark: Beginning sync of all data [changedSince: ${updatedSince}]`);
for (const entityType of ENTITY_TYPES) {
await this.updateEntities({ entityType, updatedSince });
await this.deleteEntities({ entityType, deletedSince: updatedSince });
}
this.sendSlackUpdate(`:100: Completed sync of all data [changedSince: ${updatedSince}]`);
await this.sendSlackUpdate(`:100: Completed sync of all data [changedSince: ${updatedSince}]`);
}

private sendSlackUpdate(message: string) {
private async sendSlackUpdate(message: string) {
const channel = this.config.get("UDB_SLACK_CHANNEL");
if (channel == null) return;

// Ignore promise; we don't want the process to fail if comms with Slack break down.
this.slack.sendText(`[${process.env.DEPLOY_ENV}]: ${message}`, { channel });
await this.slack
.sendText(`[${process.env.DEPLOY_ENV}]: ${message}`, { channel })
// Don't allow a failure in slack sending to hose our process, but do log it and send it to Sentry
.catch(error => {
Sentry.captureException(error);
this.logger.error("Send to slack failed", error.stack);
});
}
}
Loading
Loading