diff --git a/packages/archival/README.md b/packages/archival/README.md index e9a43e768d..b2adb34c8f 100644 --- a/packages/archival/README.md +++ b/packages/archival/README.md @@ -18,8 +18,7 @@ $ npm install @sourceloop/entity-archival ## Overview -`@sourceloop/entity-archival` package is a powerful LoopBack 4 extension designed to seamlessly -implement entity level archival for your loopback applications. With this extension you can archive entries of any model to an external system. Many times we dont wish to delete data but considering the latency it brings with it we can save that to an external system and use it at our convenience. For that we have a retrieval process that helps us to fetch that data from external system and use it. +`@sourceloop/entity-archival` package is a powerful LoopBack 4 extension designed to implement entity level archival for your loopback applications. With this extension you can archive entries of any model to an external system. Many times we dont wish to delete data but considering the latency it brings with it we can save that to an external system and use it at our convenience. For that we have a retrieval process that helps us to fetch that data from external system and use it. Here we have a repository level mixin that overrides the `deleteAll()` method - this method first saves the data to the external system, maintain a entry in the mapping table and then permanently delete the data from the system. @@ -40,12 +39,10 @@ export class MyApplication extends BootMixin(ServiceMixin(RepositoryMixin(RestAp const opts: ArchivalComponentOptions = DEFAULT_ARCHIVAL_OPTIONS; this.configure(ArchivalComponentBindings.COMPONENT).to(opts); // Put the configuration options here - }); + }; this.component(ArchivalComponent); // ... } - // ... -} ``` - Add Datasource @@ -86,7 +83,7 @@ export class AuditDataSource - This extension already has the required models and repositories. -- The component exposes a mixin for your repository classes. Just extend your repository class with `ArchivalRepositoryMixin`, for all those repositories where you need archive data. See an example below. For a model `Product`, here we are extending the `ProductRepository` with `ArchivalRepositoryMixin`. +- The component exposes a mixin for your repository classes. Just extend your repository class with `ArchivalRepositoryMixin`, for all those repositories where you need to archive data. See an example below. For a model `Product`, here we are extending the `ProductRepository` with `ArchivalRepositoryMixin`. ```ts import {Getter, inject} from '@loopback/core'; @@ -130,7 +127,7 @@ Make sure you provide `getCurrentUser` and `getRepository` Getter functions in c deleteAll(data, {skipArchive: true}); ``` -- The Actor field is now configurable and can save any string type value in the field. +- The Actor field is configurable and can save any string type value in the field. Though the default value will be userId a developer can save any string field from the current User that is being passed. ```ts @@ -168,11 +165,11 @@ this.bind(AuthServiceBindings.ActorIdKey).to('username'); public actorIdKey?: ActorId, ``` -- To implement the importing of entries from external system + ## Providers and Services @@ -196,7 +193,7 @@ this.bind(ArchivalComponentBindings.IMPORT_ARCHIVE_DATA).toProvider( ); ``` -- ImportArchivedDataService uses a ProcessImportedDataProvider that takes the json data as input. You can implement it for data processing. +- ImportArchivedDataService uses a ProcessRetrievedDataProvider that takes the json data as input. You can implement it for data processing and save the retrieved data to desired system. - BuildWhereConditionService is to save the filter column of archive_mapping model. diff --git a/packages/archival/src/component.ts b/packages/archival/src/component.ts index c843efba5b..76fc06602a 100644 --- a/packages/archival/src/component.ts +++ b/packages/archival/src/component.ts @@ -14,10 +14,7 @@ import {Class, Repository} from '@loopback/repository'; import {Model, RestApplication} from '@loopback/rest'; import {CoreComponent} from '@sourceloop/core'; import {ArchivalComponentBindings} from './keys'; -import { - ProcessRetrievedDataProvider, - RetrieveArchivedDataProvider, -} from './providers'; +import {ProcessRetrievedDataProvider} from './providers'; import { ArchivalMappingRepository, RetrievalJobDetailsRepository, @@ -38,10 +35,10 @@ export class ArchivalComponent implements Component { this.providers = {}; this.application.component(CoreComponent); - this.providers[ArchivalComponentBindings.PROCESS_IMPORT_DATA.key] = + this.providers[ArchivalComponentBindings.PROCESS_RETRIEVED_DATA.key] = ProcessRetrievedDataProvider; - this.providers[ArchivalComponentBindings.GET_ARCHIVED_DATA_JOB.key] = - RetrieveArchivedDataProvider; + // this.providers[ArchivalComponentBindings.GET_ARCHIVED_DATA_JOB.key] = + // GetRetrievalJobDetailsProvider; this.application .bind('services.BuildWhereConditionService') diff --git a/packages/archival/src/keys.ts b/packages/archival/src/keys.ts index a6c624a80b..422ecda542 100644 --- a/packages/archival/src/keys.ts +++ b/packages/archival/src/keys.ts @@ -4,9 +4,8 @@ import {BINDING_PREFIX} from '@sourceloop/core'; import {ArchivalComponent} from './component'; import { ExportDataExternalSystem, - GetJobDetailsFn, ImportDataExternalSystem, - ProcessImportedData, + ProcessRetrievedData, } from './types'; /** @@ -27,15 +26,10 @@ export namespace ArchivalComponentBindings { `${BINDING_PREFIX}.entity.archive.import`, ); - export const PROCESS_IMPORT_DATA = - BindingKey.create( + export const PROCESS_RETRIEVED_DATA = + BindingKey.create( `${BINDING_PREFIX}.entity.import`, ); - - export const GET_ARCHIVED_DATA_JOB = - BindingKey.create( - `${BINDING_PREFIX}.get.entity.archive`, - ); } export namespace AWSS3Bindings { diff --git a/packages/archival/src/providers/index.ts b/packages/archival/src/providers/index.ts index 50ab6c9bd0..fc05631a75 100644 --- a/packages/archival/src/providers/index.ts +++ b/packages/archival/src/providers/index.ts @@ -1,2 +1,2 @@ +// export * from './get-retrieval-job-details.provider'; export * from './process-retrieved-data.provider'; -export * from './retrieve-archived-data.provider'; diff --git a/packages/archival/src/providers/process-retrieved-data.provider.ts b/packages/archival/src/providers/process-retrieved-data.provider.ts index dfc0fb700c..cf5b15b0ef 100644 --- a/packages/archival/src/providers/process-retrieved-data.provider.ts +++ b/packages/archival/src/providers/process-retrieved-data.provider.ts @@ -4,26 +4,17 @@ import { Provider, ValueOrPromise, } from '@loopback/core'; -import {ProcessImportedData} from '../types'; +import {ProcessRetrievedData} from '../types'; @injectable({scope: BindingScope.TRANSIENT}) export class ProcessRetrievedDataProvider - implements Provider + implements Provider { /**Implement this provider to process and save the records do desired system * @param importedData - The data that is imported from the external system * that now needs to be processed and saved to the desired system */ - value(): ValueOrPromise { - throw new Error('Method not implemented.'); + value(): ValueOrPromise { + throw new Error('Process Retrieved Data not implemented.'); } - // value(): ValueOrPromise { - // return async (importedData: AnyObject[]) => { - // return this.processData(importedData); - // }; - // } - // async processData(importedData: AnyObject[]): Promise { - // console.log('Processing Imported Data'); - // console.log(importedData); - // } } diff --git a/packages/archival/src/providers/retrieve-archived-data.provider.ts b/packages/archival/src/providers/retrieve-archived-data.provider.ts deleted file mode 100644 index b147dfe118..0000000000 --- a/packages/archival/src/providers/retrieve-archived-data.provider.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { - BindingScope, - Provider, - ValueOrPromise, - injectable, -} from '@loopback/core'; -import {Filter, repository} from '@loopback/repository'; -import {RetrievalJobDetails} from '../models'; -import {RetrievalJobDetailsRepository} from '../repositories'; -import {GetJobDetailsFn, JobResponse, JobStatus} from '../types'; - -@injectable({scope: BindingScope.TRANSIENT}) -export class RetrieveArchivedDataProvider implements Provider { - constructor( - @repository(RetrievalJobDetailsRepository) - public jobDetailsRepo: RetrievalJobDetailsRepository, - ) {} - value(): ValueOrPromise { - return async (entityName: string, filter?: Filter) => { - let jobResponse: JobResponse = {jobId: '0'}; - const job = new RetrievalJobDetails({ - status: JobStatus.IN_PROGRESS, - filter, - entity: entityName, - }); - - const jobDetails = await this.jobDetailsRepo.create(job); - jobResponse = {jobId: jobDetails.id ?? '0'}; - return jobResponse; - }; - } -} diff --git a/packages/archival/src/services/import-archived-data.service.ts b/packages/archival/src/services/import-archived-data.service.ts index cdbbf2cdc6..91d29ffcf2 100644 --- a/packages/archival/src/services/import-archived-data.service.ts +++ b/packages/archival/src/services/import-archived-data.service.ts @@ -17,7 +17,7 @@ import { import {DefaultUserModifyCrudRepository} from '@sourceloop/core'; import {ArchivalApplication} from '../application'; import {ArchivalComponentBindings} from '../keys'; -import {ArchiveMapping} from '../models'; +import {ArchiveMapping, RetrievalJobDetails} from '../models'; import { ArchivalMappingRepository, RetrievalJobDetailsRepository, @@ -25,8 +25,9 @@ import { import { IBuildWhereConditionService, ImportDataExternalSystem, + JobResponse, JobStatus, - ProcessImportedData, + ProcessRetrievedData, } from '../types'; @injectable({scope: BindingScope.TRANSIENT}) @@ -35,7 +36,7 @@ export class ImportArchivedDataService { repo: any; //NOSONAR constructor( @repository(RetrievalJobDetailsRepository) - public jobDetailsRepo: RetrievalJobDetailsRepository, + public retrievalJobDetailsRepo: RetrievalJobDetailsRepository, @repository(ArchivalMappingRepository) public archivalMappingRepo: ArchivalMappingRepository, @inject(CoreBindings.APPLICATION_INSTANCE) @@ -45,35 +46,61 @@ export class ImportArchivedDataService { private buildWhereConditionService: IBuildWhereConditionService, @inject(ArchivalComponentBindings.IMPORT_ARCHIVE_DATA) private importArchiveData: ImportDataExternalSystem, - @inject(ArchivalComponentBindings.PROCESS_IMPORT_DATA) - private processImportedData: ProcessImportedData, + @inject(ArchivalComponentBindings.PROCESS_RETRIEVED_DATA) + private processRetrievedData: ProcessRetrievedData, ) {} - // 1. add entry in the job processing table return that - /** - * start scanning the mapping table and get all the filename matching, - * read the file get all the data - * temporarily save the data to the in-memory source - * filter the required data from there - * return the json data to user + *This particular method is used to import the data from the external system + * 1. inserts the job details in the job processing table + * 2. Then it filters the data from the external system + * @param entityName - name of the entity on which the data needs to be imported + * @param filter - filter to be applied on the data to be imported + * @returns - job id for the import process */ - async import(jobId: string) { - const jobDetails = await this.jobDetailsRepo.findById(jobId); - const modelName = jobDetails.entity; - const filter = jobDetails.filter; + async import(entityName: string, filter: Filter): Promise { + const jobResponse = await this.insertToRetrievalJobDetails( + entityName, + filter, + ); + this.filterData(jobResponse.jobId, entityName, filter); + return jobResponse; + } + private async insertToRetrievalJobDetails( + entityName: string, + filter: Filter, + ): Promise { + const job = new RetrievalJobDetails({ + status: JobStatus.IN_PROGRESS, + filter, + entity: entityName, + }); + let jobResponse: JobResponse = {jobId: '0'}; + const jobDetails = await this.retrievalJobDetailsRepo.create(job); + jobResponse = {jobId: jobDetails.id ?? '0'}; + return jobResponse; + } + + /** + * Builds a custom where condition from the filter provided to + * find the possible list of files that need to be imported + * gets the data from the files and saves it into the in-memory source temporarily + * and filters the data based on the filter provided + * updates the job status based on the success or failure of the import process + * passes the filtered data for further processing + */ + private async filterData(jobId: string, entityName: string, filter: Filter) { const archiveFilter: Filter = await this.buildWhereConditionService.buildConditionForFetch( filter, - modelName, + entityName, ); const archivedEntries = await this.archivalMappingRepo.find(archiveFilter); const data: AnyObject[] = []; - for (const entry of archivedEntries) { const fileContent = await this.importArchiveData(entry.key); data.push(...fileContent); @@ -84,31 +111,49 @@ export class ImportArchivedDataService { name: dsName, connector: 'memory', }); - await csvDataSource.connect(); - this.application.dataSource(csvDataSource, dsName); - this.repo = await this.getRepositoryByModelName(modelName); - this.repo.dataSource = csvDataSource; - // Fill in the json returned from the csv - await this.repo.createAll(data); //jsondata - const isSFRepo = this.repo instanceof DefaultUserModifyCrudRepository; - let allRecords: AnyObject[]; - /**save the records with us and - * delete the records to clear up the memory - */ - if (isSFRepo) { - allRecords = await this.repo.findAll(filter); - await this.repo.deleteAll(undefined, {skipArchive: true}); - } else { - allRecords = await this.repo.find(filter); - await this.repo.deleteAll(undefined, {skipArchive: true}); + try { + await csvDataSource.connect(); + this.application.dataSource(csvDataSource, dsName); + + this.repo = await this.getRepositoryByModelName(entityName); + this.repo.dataSource = csvDataSource; + + // Fill in the json returned from the csv + await this.repo.createAll(data); // jsondata + const isSFRepo = this.repo instanceof DefaultUserModifyCrudRepository; + let allRecords: AnyObject[]; + /** Save the records with us and + * delete the records to clear up the memory + */ + if (isSFRepo) { + allRecords = await this.repo.findAll(filter); + await this.repo.deleteAll(undefined, {skipArchive: true}); + } else { + allRecords = await this.repo.find(filter); + await this.repo.deleteAll(undefined, {skipArchive: true}); + } + + // Update the respective job status + await this.retrievalJobDetailsRepo.updateById(jobId, { + status: JobStatus.SUCCESS, + result: JSON.stringify(allRecords), + }); + + await this.processRetrievedData(allRecords); + } catch (error) { + // Log or handle the error appropriately + console.error('Error during import:', error); + // Optionally, update the job status to failure + await this.retrievalJobDetailsRepo.updateById(jobId, { + status: JobStatus.FAILED, + result: JSON.stringify({error: error.message}), + }); + throw error; + } finally { + // Close the data source connection + await csvDataSource.disconnect(); } - //update the respective job status - await this.jobDetailsRepo.updateById(jobId, { - status: JobStatus.SUCCESS, - result: JSON.stringify(allRecords), - }); - await this.processImportedData(allRecords); } // sonarignore:start diff --git a/packages/archival/src/types.ts b/packages/archival/src/types.ts index 25fc72add1..fa14ffd26b 100644 --- a/packages/archival/src/types.ts +++ b/packages/archival/src/types.ts @@ -87,12 +87,9 @@ export type ImportDataExternalSystem = ( fileName: string, ) => Promise; -export type ProcessImportedData = (importedData: AnyObject[]) => Promise; - -export type GetJobDetailsFn = ( - entityName: string, - filter?: Filter, -) => Promise; +export type ProcessRetrievedData = ( + retrievedData: AnyObject[], +) => Promise; export const ArchivalDbSourceName = 'ArchivalDB';