Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPO3DPKRT-802/large model zip failure #600

Merged
merged 8 commits into from
Jun 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ function DetailsThumbnail(props: DetailsThumbnailProps): React.ReactElement {
const [rootStoryLink, setRootStoryLink] = useState('');
const [documentLink, setDocumentLink] = useState('');
const [openVoyagerStory, setOpenVoyagerStory] = React.useState(false);
const [showVoyagerExplorer, setShowVoyagerExplorer] = React.useState(true);

// helper function to wait X ms
const delay = (ms) => {
Expand Down Expand Up @@ -215,6 +216,7 @@ function DetailsThumbnail(props: DetailsThumbnailProps): React.ReactElement {

// attach an event listener for Voyager 'Exit'
await addVoyagerExitListener();
setShowVoyagerExplorer(false);
};
const handleCloseVoyagerStory = async () => {
console.log('[PACKRAT] Closing Voyager Story...');
Expand All @@ -226,6 +228,7 @@ function DetailsThumbnail(props: DetailsThumbnailProps): React.ReactElement {
if(!removeVoyagerStoryElement())
console.log('[PACKRAT: ERROR] Failed to remove Voyager Story');

setShowVoyagerExplorer(true);
setOpenVoyagerStory(false);
};

Expand All @@ -239,12 +242,14 @@ function DetailsThumbnail(props: DetailsThumbnailProps): React.ReactElement {
{objectType !== eSystemObjectType.eScene && thumbnailContent}
{(objectType === eSystemObjectType.eScene || objectType === eSystemObjectType.eModel) && rootExplorerLink.length > 0 && documentLink.length > 0 && (
<React.Fragment>
<voyager-explorer
id='Voyager-Explorer'
root={rootExplorerLink}
document={encodeURIComponent(documentLink)}
style={{ width: '100%', height: '500px', display: 'block', position: 'relative' }}
/>
{ showVoyagerExplorer && (
<voyager-explorer
id='Voyager-Explorer'
root={rootExplorerLink}
document={encodeURIComponent(documentLink)}
style={{ width: '100%', height: '500px', display: 'block', position: 'relative' }}
/>
)}
<Button
className={classes.editButton}
variant='contained'
Expand Down
28 changes: 19 additions & 9 deletions server/graphql/schema/asset/resolvers/mutations/uploadAsset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import * as DBAPI from '../../../../../db';
import * as WF from '../../../../../workflow/interface';
import * as REP from '../../../../../report/interface';
import { RouteBuilder, eHrefMode } from '../../../../../http/routes/routeBuilder';
import { ASL, LocalStore } from '../../../../../utils/localStore';
import { ASL, ASR, LocalStore } from '../../../../../utils/localStore';
import { AuditFactory } from '../../../../../audit/interface/AuditFactory';
import { eEventKey } from '../../../../../event/interface/EventEnums';
import * as COMMON from '@dpo-packrat/common';
Expand Down Expand Up @@ -86,7 +86,7 @@ class UploadAssetWorker extends ResolverBase {
const storage: STORE.IStorage | null = await STORE.StorageFactory.getInstance(); /* istanbul ignore next */
if (!storage) {
LOG.error('uploadAsset unable to retrieve Storage Implementation from StorageFactory.getInstance()', LOG.LS.eGQL);
return { status: UploadStatus.Failed, error: 'Storage unavailable' };
return { status: UploadStatus.Failed, error: 'Storage system unavailable' };
}

// get a write stream for us to store the incoming stream
Expand All @@ -104,24 +104,26 @@ class UploadAssetWorker extends ResolverBase {

try {
// write our incoming stream of bytes to a file in local storage (staging)
// TODO: use ASR.bind(async () =>< {});
const fileStream = createReadStream();
const stream = fileStream.pipe(writeStream);
LOG.info(`UploadAssetWorker.uploadWorker writing stream to Staging (filename: ${filename} | streamPath: ${fileStream.path})`,LOG.LS.eDEBUG);

return new Promise(resolve => {
fileStream.on('error', (error) => {
fileStream.on('error', ASR.bind((error) => {
LOG.error('uploadAsset', LOG.LS.eGQL, error);
stream.emit('error', error);
});
}));

stream.on('finish', async () => {
stream.on('finish', ASR.bind(async () => {
resolve(this.uploadWorkerOnFinish(storageKey, filename, vocabulary.idVocabulary));
});
}));

stream.on('error', async (error) => {
stream.on('error', ASR.bind(async (error) => {
await this.appendToWFReport(`uploadAsset Upload failed (${error.message})`, true, true);
await storage.discardWriteStream({ storageKey });
resolve({ status: UploadStatus.Failed, error: `Upload failed (${error.message})` });
});
}));

// stream.on('close', async () => { });
});
Expand All @@ -133,12 +135,17 @@ class UploadAssetWorker extends ResolverBase {

private async uploadWorkerOnFinish(storageKey: string, filename: string, idVocabulary: number): Promise<UploadAssetResult> {

// grab our local storage
LOG.info(`UploadAssetWorker.uploadWorkerOnFinish upload finished (storageKey: ${storageKey} | filename: ${filename} | idVocabulary: ${idVocabulary})`,LOG.LS.eDEBUG);

// grab our local storage and log context in case it's lost
const LSLocal: LocalStore | undefined = ASL.getStore();
ASL.checkLocalStore('UploadAssetVersion.uploadWorkerOnFinish',true);
if (LSLocal)
return await this.uploadWorkerOnFinishWorker(storageKey, filename, idVocabulary);

// if we can't get the local storage system we will use the cache
// TODO: do we want this as it occurs when the context is lost for ASL. when is the cache set?
// if CACHE is used how does it sync back with the main storage system?
if (this.LS) {
LOG.info('uploadAsset missing LocalStore, using cached value', LOG.LS.eGQL);
return ASL.run(this.LS, async () => {
Expand Down Expand Up @@ -175,6 +182,7 @@ class UploadAssetWorker extends ResolverBase {
DateCreated: new Date()
};

LOG.info(`UploadAssetWorker.uploadWorkerOnFinishWorker committing new asset (asset: ${H.Helpers.JSONStringify(ASCNAI)})`,LOG.LS.eDEBUG);
commitResult = await STORE.AssetStorageAdapter.commitNewAsset(ASCNAI);
} else { // update existing asset with new asset version
const asset: DBAPI.Asset | null = await DBAPI.Asset.fetch(this.idAsset);
Expand All @@ -201,6 +209,8 @@ class UploadAssetWorker extends ResolverBase {
opInfo,
DateCreated: new Date()
};

LOG.info(`UploadAssetWorker.uploadWorkerOnFinishWorker committing new asset version (assetVersion: ${H.Helpers.JSONStringify(ASCNAVI)})`,LOG.LS.eDEBUG);
commitResult = await STORE.AssetStorageAdapter.commitNewAssetVersion(ASCNAVI);
}

Expand Down
1 change: 1 addition & 0 deletions server/http/routes/api/generateDownloads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export async function generateDownloads(req: Request, res: Response): Promise<vo
// if we only want the status then we need to do some quick checks ourself instead of WorkflowEngine
if(statusOnly === true) {
// see if scene is valid
// TODO: shouldn't be an error if first run by page but only when responding to user action
const isSceneValid: boolean = (scene.PosedAndQCd)?true:false;
if(isSceneValid === false) {
LOG.error(`API.generateDownloads failed. scene is not QC'd. (id:${idSystemObject} | scene:${scene.idScene})`,LOG.LS.eHTTP);
Expand Down
65 changes: 47 additions & 18 deletions server/job/impl/Cook/JobCookSIPackratInspect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ import * as REP from '../../../report/interface';
import * as H from '../../../utils/helpers';
import { eEventKey } from '../../../event/interface/EventEnums';
import { IZip } from '../../../utils/IZip';
import { ZipStream } from '../../../utils/zipStream';
import { ZipFile } from '../../../utils/zipFile';
import { maybe, maybeString } from '../../../utils/types';

import { isArray } from 'lodash';
import * as path from 'path';
import tmp from 'tmp-promise';

export class JobCookSIPackratInspectParameters {
/** Specify sourceMeshStream when we have the stream for sourceMeshFile in hand (e.g. during upload fo a scene zip that contains this model) */
Expand Down Expand Up @@ -693,6 +691,7 @@ export class JobCookSIPackratInspectOutput implements H.IOResults {
export class JobCookSIPackratInspect extends JobCook<JobCookSIPackratInspectParameters> {
private parameters: JobCookSIPackratInspectParameters;
private sourceMeshStream: NodeJS.ReadableStream | undefined;
private tempFilePath: string | undefined = undefined;

constructor(jobEngine: JOB.IJobEngine, idAssetVersions: number[] | null, report: REP.IReport | null,
parameters: JobCookSIPackratInspectParameters, dbJobRun: DBAPI.JobRun) {
Expand All @@ -709,6 +708,13 @@ export class JobCookSIPackratInspect extends JobCook<JobCookSIPackratInspectPara
}

async cleanupJob(): Promise<H.IOResults> {
// if we have a temp file path, see if it exists, and clean it up.
if(this.tempFilePath) {
LOG.info(`JobCookSIPackratInspect.cleanupJob removing temp file (PATH: ${this.tempFilePath})`,LOG.LS.eDEBUG);
await H.Helpers.removeFile(this.tempFilePath);
this.tempFilePath = undefined;
}

return { success: true };
}

Expand Down Expand Up @@ -749,33 +755,40 @@ export class JobCookSIPackratInspect extends JobCook<JobCookSIPackratInspectPara
}

const ZS: IZip | null = await this.fetchZip(assetVersion);
if (!ZS)
if (!ZS) {
LOG.error(`JobCookSIPackratInspect.testZipOrStream failed (assetVersion: ${assetVersion.FileName}:${assetVersion.idAssetVersion})`,LOG.LS.eDEBUG);
return false;
}

const zipRes: H.IOResults = await ZS.load();
if (!zipRes.success) {
LOG.error(`JobCookSIPackratInspect.testForZipOrStream unable to read asset version ${this._idAssetVersions[0]}: ${zipRes.error}`, LOG.LS.eJOB);
return false;
}

// grab our list of files in the ZIP and cycle through each streaming them in
let sourceMeshFile: string | undefined = undefined;
const files: string[] = await ZS.getJustFiles(null);
const RSRs: STORE.ReadStreamResult[] = [];
LOG.info(`JobCookSIPackratInspect.testForZipOrStream processing files (assetVersion: ${assetVersion.idAssetVersion} | ${files.join('|')})`,LOG.LS.eDEBUG);
for (const file of files) {
// figure out our type based on the file's extension
const eVocabID: COMMON.eVocabularyID | undefined = CACHE.VocabularyCache.mapModelFileByExtensionID(file);
const extension: string = path.extname(file).toLowerCase() || file.toLowerCase();
// LOG.info(`JobCookSIPackratInspect.testForZipOrStream considering zip file entry ${file}, extension ${extension}, VocabID ${eVocabID ? COMMON.eVocabularyID[eVocabID] : 'undefined'}`, LOG.LS.eJOB);
LOG.info(`JobCookSIPackratInspect.testForZipOrStream considering zip file entry ${file}, extension ${extension}, VocabID ${eVocabID ? COMMON.eVocabularyID[eVocabID] : 'undefined'}`, LOG.LS.eJOB);

// for the time being, only handle model geometry files, OBJ .mtl files, and GLTF .bin files
if (eVocabID === undefined && extension !== '.mtl' && extension !== '.bin')
continue;

// stream our content in
const readStream: NodeJS.ReadableStream | null = await ZS.streamContent(file);
if (!readStream) {
LOG.error(`JobCookSIPackratInspect.testForZipOrStream unable to fetch read steram for ${file} in zip of idAssetVersion ${this._idAssetVersions[0]}`, LOG.LS.eJOB);
return false;
}

// store the stream for later reference
LOG.info(`JobCookSIPackratInspect.testForZipOrStream creating stream override for zip file entry ${file}`, LOG.LS.eJOB);
RSRs.push({
readStream,
Expand All @@ -785,53 +798,69 @@ export class JobCookSIPackratInspect extends JobCook<JobCookSIPackratInspectPara
});

// If we haven't yet defined the source mesh and we are processing a geometry file (eVocabID is defined), use this file as our source mesh:
// TEST: multiple models in the same zip. may need validation/rejection to avoid confusion
if (!sourceMeshFile && eVocabID !== undefined)
sourceMeshFile = path.basename(file);
}

// if we found a mesh file to use as the source we update or JobRun in the db so it knows about it
// when feeding parameters to the Cook recipe.
if (sourceMeshFile) {
this.parameters.sourceMeshFile = sourceMeshFile;
this._dbJobRun.Parameters = JSON.stringify(this.parameters, H.Helpers.saferStringify);
if (!await this._dbJobRun.update())
LOG.error(`JobCookSIPackratInspect.testForZipOrStream failed to update JobRun.parameters for ${JSON.stringify(this._dbJobRun, H.Helpers.saferStringify)}`, LOG.LS.eJOB);
}

// if we have at least one stream store them in our overrideMap, which is used to keep track of
// streams/files for processing
if (RSRs.length > 0) {
// LOG.info(`JobCookSIPackratInspect.testForZipOrStream recording ${RSRs.length} stream overrides for idAssetVersion ${this._idAssetVersions[0]}`, LOG.LS.eJOB);
this._streamOverrideMap.set(this._idAssetVersions[0], RSRs);
return true;
}

// no streams found so we return
LOG.error (`JobCookSIPackratInspect.testForZipOrStream no streams found (assetVersion: ${assetVersion.idAssetVersion})`,LOG.LS.eDEBUG);
return false;
}

private async fetchZip(assetVersion: DBAPI.AssetVersion): Promise<IZip | null> {
// LOG.info(`JobCookSIPackratInspect.testForZipOrStream processing zip file ${RSR.fileName}`, LOG.LS.eJOB);

LOG.info(`JobCookSIPackratInspect.fetchZip fetching ZIP file (${H.Helpers.JSONStringify(assetVersion)})`,LOG.LS.eDEBUG);
const RSR: STORE.ReadStreamResult = await STORE.AssetStorageAdapter.readAssetVersionByID(assetVersion.idAssetVersion);
if (!RSR.success || !RSR.readStream) {
LOG.error(`JobCookSIPackratInspect.fetchZip unable to read asset version ${assetVersion.idAssetVersion}: ${RSR.error}`, LOG.LS.eJOB);
if (!RSR.success || !RSR.readStream || !RSR.fileName) {
LOG.error(`JobCookSIPackratInspect.fetchZip unable to read asset version ${assetVersion.idAssetVersion}: ${RSR.error} (${RSR.success} | ${RSR.readStream ? true:false} | ${ RSR.fileName ? true:false})`, LOG.LS.eJOB);
return null;
} else {
LOG.info(`JobCookSIPackratInspect.fetchZip processing zip file ${RSR.fileName}`, LOG.LS.eJOB);
}

if (assetVersion.StorageSize <= BigInt(500 * 1024 * 1024))
return new ZipStream(RSR.readStream);

// if our zipped asset is larger than 500MB, copy it locally so that we can avoid loading the full zip into memory
// copy our zip locally so that we can avoid loading the full zip into memory and use ZipFile
// This also avoids an issue we're experiencing (as of 8/1/2022) with JSZip not emitting "end" events
// when we've fully read a (very large) zip entry with its nodeStream method
const tempFile: tmp.FileResult = await tmp.file({ mode: 0o666, postfix: '.zip' });

// if we have a temp file already, destroy it.
if(this.tempFilePath) {
await H.Helpers.removeFile(this.tempFilePath);
this.tempFilePath = undefined;
}

// construct our full path with a random filename to avoid collisions
// and then write our stream to that location.
this.tempFilePath = path.join(Config.storage.rootStaging,'tmp', H.Helpers.randomFilename('',RSR.fileName));
try {
const res: H.IOResults = await H.Helpers.writeStreamToFile(RSR.readStream, tempFile.path);
const res: H.IOResults = await H.Helpers.writeStreamToFile(RSR.readStream, this.tempFilePath);
if (!res.success) {
LOG.error(`JobCookSIPackratInspect.fetchZip unable to copy asset version ${assetVersion.idAssetVersion} locally to ${tempFile.path}: ${res.error}`, LOG.LS.eJOB);
LOG.error(`JobCookSIPackratInspect.fetchZip unable to copy asset version ${assetVersion.idAssetVersion} locally to ${this.tempFilePath}: ${res.error}`, LOG.LS.eJOB);
return null;
}
return new ZipFile(tempFile.path);

LOG.info(`JobCookSIPackratInspect.fetchZip stream stored at: ${this.tempFilePath} (${H.Helpers.JSONStringify(res)})`,LOG.LS.eDEBUG);
return new ZipFile(this.tempFilePath);
} catch (err) {
LOG.error(`JobCookSIPackratInspect.fetchZip unable to copy asset version ${assetVersion.idAssetVersion} locally to ${tempFile.path}`, LOG.LS.eJOB, err);
LOG.error(`JobCookSIPackratInspect.fetchZip unable to copy asset version ${assetVersion.idAssetVersion} locally to ${this.tempFilePath}`, LOG.LS.eJOB, err);
return null;
} finally {
await tempFile.cleanup();
}
}
}
Expand Down
1 change: 1 addition & 0 deletions server/storage/impl/LocalStorage/LocalStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export class LocalStorage implements STORE.IStorage {
return retValue;
}

LOG.info(`LocalStorage.writeStream writing to disk (res: ${H.Helpers.JSONStringify(res)})`,LOG.LS.eDEBUG);
try {
retValue.writeStream = fs.createWriteStream(res.locationPrivate);
retValue.storageKey = res.locationPublic;
Expand Down
Loading
Loading