Skip to content

Commit

Permalink
feat(entity-archival): file renamed
Browse files Browse the repository at this point in the history
  • Loading branch information
yeshamavani committed Dec 10, 2024
1 parent 0a50c23 commit 1c5361d
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 119 deletions.
17 changes: 7 additions & 10 deletions packages/archival/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ $ npm install @sourceloop/entity-archival

## Overview

`@sourceloop/entity-archival` package is a powerful LoopBack 4 extension designed to seamlessly
implement entity level archival for your loopback applications. With this extension you can archive entries of any model to an external system. Many times we dont wish to delete data but considering the latency it brings with it we can save that to an external system and use it at our convenience. For that we have a retrieval process that helps us to fetch that data from external system and use it.
`@sourceloop/entity-archival` package is a powerful LoopBack 4 extension designed to implement entity level archival for your loopback applications. With this extension you can archive entries of any model to an external system. Many times we dont wish to delete data but considering the latency it brings with it we can save that to an external system and use it at our convenience. For that we have a retrieval process that helps us to fetch that data from external system and use it.

Here we have a repository level mixin that overrides the `deleteAll()` method - this method first saves the data to the external system, maintain a entry in the mapping table and then permanently delete the data from the system.

Expand All @@ -40,12 +39,10 @@ export class MyApplication extends BootMixin(ServiceMixin(RepositoryMixin(RestAp
const opts: ArchivalComponentOptions = DEFAULT_ARCHIVAL_OPTIONS;
this.configure(ArchivalComponentBindings.COMPONENT).to(opts);
// Put the configuration options here
});
};
this.component(ArchivalComponent);
// ...
}
// ...
}
```

- Add Datasource
Expand Down Expand Up @@ -86,7 +83,7 @@ export class AuditDataSource

- This extension already has the required models and repositories.

- The component exposes a mixin for your repository classes. Just extend your repository class with `ArchivalRepositoryMixin`, for all those repositories where you need archive data. See an example below. For a model `Product`, here we are extending the `ProductRepository` with `ArchivalRepositoryMixin`.
- The component exposes a mixin for your repository classes. Just extend your repository class with `ArchivalRepositoryMixin`, for all those repositories where you need to archive data. See an example below. For a model `Product`, here we are extending the `ProductRepository` with `ArchivalRepositoryMixin`.

```ts
import {Getter, inject} from '@loopback/core';
Expand Down Expand Up @@ -130,7 +127,7 @@ Make sure you provide `getCurrentUser` and `getRepository` Getter functions in c
deleteAll(data, {skipArchive: true});
```

- The Actor field is now configurable and can save any string type value in the field.
- The Actor field is configurable and can save any string type value in the field.
Though the default value will be userId a developer can save any string field from the current User that is being passed.

```ts
Expand Down Expand Up @@ -168,11 +165,11 @@ this.bind(AuthServiceBindings.ActorIdKey).to('username');
public actorIdKey?: ActorId,
```

- To implement the importing of entries from external system
<!-- - To implement the importing of entries from external system
- Create a job and return its Id to the user
- Call the import function of the ImportArchivedDataService and pass the jobId to it.
This function asynchronously fetch the data from external system and returns the json data.
This function asynchronously fetch the data from external system and returns the json data. -->

## Providers and Services

Expand All @@ -196,7 +193,7 @@ this.bind(ArchivalComponentBindings.IMPORT_ARCHIVE_DATA).toProvider(
);
```

- ImportArchivedDataService uses a ProcessImportedDataProvider that takes the json data as input. You can implement it for data processing.
- ImportArchivedDataService uses a ProcessRetrievedDataProvider that takes the json data as input. You can implement it for data processing and save the retrieved data to desired system.

- BuildWhereConditionService is to save the filter column of archive_mapping model.

Expand Down
11 changes: 4 additions & 7 deletions packages/archival/src/component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ import {Class, Repository} from '@loopback/repository';
import {Model, RestApplication} from '@loopback/rest';
import {CoreComponent} from '@sourceloop/core';
import {ArchivalComponentBindings} from './keys';
import {
ProcessRetrievedDataProvider,
RetrieveArchivedDataProvider,
} from './providers';
import {ProcessRetrievedDataProvider} from './providers';
import {
ArchivalMappingRepository,
RetrievalJobDetailsRepository,
Expand All @@ -38,10 +35,10 @@ export class ArchivalComponent implements Component {
this.providers = {};
this.application.component(CoreComponent);

this.providers[ArchivalComponentBindings.PROCESS_IMPORT_DATA.key] =
this.providers[ArchivalComponentBindings.PROCESS_RETRIEVED_DATA.key] =
ProcessRetrievedDataProvider;
this.providers[ArchivalComponentBindings.GET_ARCHIVED_DATA_JOB.key] =
RetrieveArchivedDataProvider;
// this.providers[ArchivalComponentBindings.GET_ARCHIVED_DATA_JOB.key] =
// GetRetrievalJobDetailsProvider;

this.application
.bind('services.BuildWhereConditionService')
Expand Down
12 changes: 3 additions & 9 deletions packages/archival/src/keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import {BINDING_PREFIX} from '@sourceloop/core';
import {ArchivalComponent} from './component';
import {
ExportDataExternalSystem,
GetJobDetailsFn,
ImportDataExternalSystem,
ProcessImportedData,
ProcessRetrievedData,
} from './types';

/**
Expand All @@ -27,15 +26,10 @@ export namespace ArchivalComponentBindings {
`${BINDING_PREFIX}.entity.archive.import`,
);

export const PROCESS_IMPORT_DATA =
BindingKey.create<ProcessImportedData | null>(
export const PROCESS_RETRIEVED_DATA =
BindingKey.create<ProcessRetrievedData | null>(
`${BINDING_PREFIX}.entity.import`,
);

export const GET_ARCHIVED_DATA_JOB =
BindingKey.create<GetJobDetailsFn | null>(
`${BINDING_PREFIX}.get.entity.archive`,
);
}

export namespace AWSS3Bindings {
Expand Down
2 changes: 1 addition & 1 deletion packages/archival/src/providers/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// export * from './get-retrieval-job-details.provider';
export * from './process-retrieved-data.provider';
export * from './retrieve-archived-data.provider';
17 changes: 4 additions & 13 deletions packages/archival/src/providers/process-retrieved-data.provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,17 @@ import {
Provider,
ValueOrPromise,
} from '@loopback/core';
import {ProcessImportedData} from '../types';
import {ProcessRetrievedData} from '../types';

@injectable({scope: BindingScope.TRANSIENT})
export class ProcessRetrievedDataProvider
implements Provider<ProcessImportedData>
implements Provider<ProcessRetrievedData>
{
/**Implement this provider to process and save the records do desired system
* @param importedData - The data that is imported from the external system
* that now needs to be processed and saved to the desired system
*/
value(): ValueOrPromise<ProcessImportedData> {
throw new Error('Method not implemented.');
value(): ValueOrPromise<ProcessRetrievedData> {
throw new Error('Process Retrieved Data not implemented.');
}
// value(): ValueOrPromise<ProcessImportedData> {
// return async (importedData: AnyObject[]) => {
// return this.processData(importedData);
// };
// }
// async processData(importedData: AnyObject[]): Promise<void> {
// console.log('Processing Imported Data');
// console.log(importedData);
// }
}
32 changes: 0 additions & 32 deletions packages/archival/src/providers/retrieve-archived-data.provider.ts

This file was deleted.

127 changes: 86 additions & 41 deletions packages/archival/src/services/import-archived-data.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ import {
import {DefaultUserModifyCrudRepository} from '@sourceloop/core';
import {ArchivalApplication} from '../application';
import {ArchivalComponentBindings} from '../keys';
import {ArchiveMapping} from '../models';
import {ArchiveMapping, RetrievalJobDetails} from '../models';
import {
ArchivalMappingRepository,
RetrievalJobDetailsRepository,
} from '../repositories';
import {
IBuildWhereConditionService,
ImportDataExternalSystem,
JobResponse,
JobStatus,
ProcessImportedData,
ProcessRetrievedData,
} from '../types';

@injectable({scope: BindingScope.TRANSIENT})
Expand All @@ -35,7 +36,7 @@ export class ImportArchivedDataService {
repo: any; //NOSONAR
constructor(
@repository(RetrievalJobDetailsRepository)
public jobDetailsRepo: RetrievalJobDetailsRepository,
public retrievalJobDetailsRepo: RetrievalJobDetailsRepository,
@repository(ArchivalMappingRepository)
public archivalMappingRepo: ArchivalMappingRepository,
@inject(CoreBindings.APPLICATION_INSTANCE)
Expand All @@ -45,35 +46,61 @@ export class ImportArchivedDataService {
private buildWhereConditionService: IBuildWhereConditionService,
@inject(ArchivalComponentBindings.IMPORT_ARCHIVE_DATA)
private importArchiveData: ImportDataExternalSystem,
@inject(ArchivalComponentBindings.PROCESS_IMPORT_DATA)
private processImportedData: ProcessImportedData,
@inject(ArchivalComponentBindings.PROCESS_RETRIEVED_DATA)
private processRetrievedData: ProcessRetrievedData,
) {}

// 1. add entry in the job processing table return that

/**
* start scanning the mapping table and get all the filename matching,
* read the file get all the data
* temporarily save the data to the in-memory source
* filter the required data from there
* return the json data to user
*This particular method is used to import the data from the external system
* 1. inserts the job details in the job processing table
* 2. Then it filters the data from the external system
* @param entityName - name of the entity on which the data needs to be imported
* @param filter - filter to be applied on the data to be imported
* @returns - job id for the import process
*/

async import(jobId: string) {
const jobDetails = await this.jobDetailsRepo.findById(jobId);
const modelName = jobDetails.entity;
const filter = jobDetails.filter;
async import(entityName: string, filter: Filter): Promise<JobResponse> {
const jobResponse = await this.insertToRetrievalJobDetails(
entityName,
filter,
);
this.filterData(jobResponse.jobId, entityName, filter);
return jobResponse;
}

private async insertToRetrievalJobDetails(
entityName: string,
filter: Filter,
): Promise<JobResponse> {
const job = new RetrievalJobDetails({
status: JobStatus.IN_PROGRESS,
filter,
entity: entityName,
});
let jobResponse: JobResponse = {jobId: '0'};
const jobDetails = await this.retrievalJobDetailsRepo.create(job);
jobResponse = {jobId: jobDetails.id ?? '0'};
return jobResponse;
}

/**
* Builds a custom where condition from the filter provided to
* find the possible list of files that need to be imported
* gets the data from the files and saves it into the in-memory source temporarily
* and filters the data based on the filter provided
* updates the job status based on the success or failure of the import process
* passes the filtered data for further processing
*/
private async filterData(jobId: string, entityName: string, filter: Filter) {
const archiveFilter: Filter<ArchiveMapping> =
await this.buildWhereConditionService.buildConditionForFetch(
filter,
modelName,
entityName,
);

const archivedEntries = await this.archivalMappingRepo.find(archiveFilter);

const data: AnyObject[] = [];

for (const entry of archivedEntries) {
const fileContent = await this.importArchiveData(entry.key);
data.push(...fileContent);
Expand All @@ -84,31 +111,49 @@ export class ImportArchivedDataService {
name: dsName,
connector: 'memory',
});
await csvDataSource.connect();
this.application.dataSource(csvDataSource, dsName);

this.repo = await this.getRepositoryByModelName<Entity>(modelName);
this.repo.dataSource = csvDataSource;
// Fill in the json returned from the csv
await this.repo.createAll(data); //jsondata
const isSFRepo = this.repo instanceof DefaultUserModifyCrudRepository;
let allRecords: AnyObject[];
/**save the records with us and
* delete the records to clear up the memory
*/
if (isSFRepo) {
allRecords = await this.repo.findAll(filter);
await this.repo.deleteAll(undefined, {skipArchive: true});
} else {
allRecords = await this.repo.find(filter);
await this.repo.deleteAll(undefined, {skipArchive: true});
try {
await csvDataSource.connect();
this.application.dataSource(csvDataSource, dsName);

this.repo = await this.getRepositoryByModelName<Entity>(entityName);
this.repo.dataSource = csvDataSource;

// Fill in the json returned from the csv
await this.repo.createAll(data); // jsondata
const isSFRepo = this.repo instanceof DefaultUserModifyCrudRepository;
let allRecords: AnyObject[];
/** Save the records with us and
* delete the records to clear up the memory
*/
if (isSFRepo) {
allRecords = await this.repo.findAll(filter);
await this.repo.deleteAll(undefined, {skipArchive: true});
} else {
allRecords = await this.repo.find(filter);
await this.repo.deleteAll(undefined, {skipArchive: true});
}

// Update the respective job status
await this.retrievalJobDetailsRepo.updateById(jobId, {
status: JobStatus.SUCCESS,
result: JSON.stringify(allRecords),
});

await this.processRetrievedData(allRecords);
} catch (error) {
// Log or handle the error appropriately
console.error('Error during import:', error);
// Optionally, update the job status to failure
await this.retrievalJobDetailsRepo.updateById(jobId, {
status: JobStatus.FAILED,
result: JSON.stringify({error: error.message}),
});
throw error;
} finally {
// Close the data source connection
await csvDataSource.disconnect();
}
//update the respective job status
await this.jobDetailsRepo.updateById(jobId, {
status: JobStatus.SUCCESS,
result: JSON.stringify(allRecords),
});
await this.processImportedData(allRecords);
}

// sonarignore:start
Expand Down
9 changes: 3 additions & 6 deletions packages/archival/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,9 @@ export type ImportDataExternalSystem = (
fileName: string,
) => Promise<AnyObject[]>;

export type ProcessImportedData = (importedData: AnyObject[]) => Promise<void>;

export type GetJobDetailsFn = (
entityName: string,
filter?: Filter,
) => Promise<JobResponse>;
export type ProcessRetrievedData = (
retrievedData: AnyObject[],
) => Promise<void>;

export const ArchivalDbSourceName = 'ArchivalDB';

Expand Down

0 comments on commit 1c5361d

Please sign in to comment.