diff --git a/packages/server/modules/blobstorage/domain/operations.ts b/packages/server/modules/blobstorage/domain/operations.ts new file mode 100644 index 0000000000..072d6cac1c --- /dev/null +++ b/packages/server/modules/blobstorage/domain/operations.ts @@ -0,0 +1,17 @@ +import { + BlobStorageItem, + BlobStorageItemInput +} from '@/modules/blobstorage/domain/types' +import { MaybeNullOrUndefined } from '@speckle/shared' + +export type GetBlobs = (params: { + streamId?: MaybeNullOrUndefined + blobIds: string[] +}) => Promise + +export type UpsertBlob = (item: BlobStorageItemInput) => Promise + +export type UpdateBlob = (params: { + id: string + item: Partial +}) => Promise diff --git a/packages/server/modules/blobstorage/domain/operationts.ts b/packages/server/modules/blobstorage/domain/operationts.ts deleted file mode 100644 index 2dae5bb062..0000000000 --- a/packages/server/modules/blobstorage/domain/operationts.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { BlobStorageItem } from '@/modules/blobstorage/domain/types' -import { MaybeNullOrUndefined } from '@speckle/shared' - -export type GetBlobs = (params: { - streamId?: MaybeNullOrUndefined - blobIds: string[] -}) => Promise diff --git a/packages/server/modules/blobstorage/domain/types.ts b/packages/server/modules/blobstorage/domain/types.ts index c8ec6247a2..e2422ffffa 100644 --- a/packages/server/modules/blobstorage/domain/types.ts +++ b/packages/server/modules/blobstorage/domain/types.ts @@ -1,4 +1,5 @@ import { Nullable } from '@speckle/shared' +import { SetOptional } from 'type-fest' export type BlobStorageItem = { id: string @@ -13,3 +14,8 @@ export type BlobStorageItem = { createdAt: Date fileHash: Nullable } + +export type BlobStorageItemInput = SetOptional< + BlobStorageItem, + 'fileSize' | 'uploadStatus' | 'uploadError' | 'createdAt' | 'fileHash' +> diff --git a/packages/server/modules/blobstorage/index.js b/packages/server/modules/blobstorage/index.js index 6e710f52d4..090ea23ff7 100644 --- a/packages/server/modules/blobstorage/index.js +++ b/packages/server/modules/blobstorage/index.js @@ -17,7 +17,6 @@ const crs = require('crypto-random-string') const { authMiddlewareCreator } = require('@/modules/shared/middleware') const { - uploadFileStream, getFileStream, markUploadError, markUploadSuccess, @@ -36,10 +35,19 @@ const { BadRequestError } = require('@/modules/shared/errors') const { moduleLogger, logger } = require('@/logging/logging') -const { getAllStreamBlobIdsFactory } = require('@/modules/blobstorage/repositories') +const { + getAllStreamBlobIdsFactory, + upsertBlobFactory, + updateBlobFactory +} = require('@/modules/blobstorage/repositories') const { db } = require('@/db/knex') +const { uploadFileStreamFactory } = require('@/modules/blobstorage/services/upload') const getAllStreamBlobIds = getAllStreamBlobIdsFactory({ db }) +const uploadFileStream = uploadFileStreamFactory({ + upsertBlob: upsertBlobFactory({ db }), + updateBlob: updateBlobFactory({ db }) +}) const ensureConditions = async () => { if (process.env.DISABLE_FILE_UPLOADS) { diff --git a/packages/server/modules/blobstorage/repositories/index.ts b/packages/server/modules/blobstorage/repositories/index.ts index 3eaf5c8c41..ecf300c085 100644 --- a/packages/server/modules/blobstorage/repositories/index.ts +++ b/packages/server/modules/blobstorage/repositories/index.ts @@ -1,5 +1,12 @@ -import { GetBlobs } from '@/modules/blobstorage/domain/operationts' -import { BlobStorageItem } from '@/modules/blobstorage/domain/types' +import { + GetBlobs, + UpdateBlob, + UpsertBlob +} from '@/modules/blobstorage/domain/operations' +import { + BlobStorageItem, + BlobStorageItemInput +} from '@/modules/blobstorage/domain/types' import { buildTableHelper } from '@/modules/core/dbSchema' import { Knex } from 'knex' @@ -43,3 +50,26 @@ export const getAllStreamBlobIdsFactory = const res = await tables.blobStorage(deps.db).where({ streamId }).select('id') return res } + +export const upsertBlobFactory = + (deps: { db: Knex }): UpsertBlob => + async (item: BlobStorageItemInput) => { + const [res] = await tables + .blobStorage(deps.db) + .insert(item) + .onConflict(['id', 'streamId']) + .ignore() + .returning('*') + return res + } + +export const updateBlobFactory = + (deps: { db: Knex }): UpdateBlob => + async (params: { id: string; item: Partial }) => { + const { id, item } = params + const [res] = await tables + .blobStorage(deps.db) + .where(BlobStorage.col.id, id) + .update(item, '*') + return res + } diff --git a/packages/server/modules/blobstorage/services.js b/packages/server/modules/blobstorage/services.js index b6303d33ed..0e10baf23f 100644 --- a/packages/server/modules/blobstorage/services.js +++ b/packages/server/modules/blobstorage/services.js @@ -10,34 +10,6 @@ const BlobStorage = () => knex('blob_storage') const blobLookup = ({ blobId, streamId }) => BlobStorage().where({ id: blobId, streamId }) -const uploadFileStream = async ( - storeFileStream, - { streamId, userId }, - { blobId, fileName, fileType, fileStream } -) => { - if (streamId.length !== 10) - throw new BadRequestError('The stream id has to be of length 10') - if (userId.length !== 10) - throw new BadRequestError('The user id has to be of length 10') - const objectKey = `assets/${streamId}/${blobId}` - const dbFile = { - id: blobId, - streamId, - userId, - objectKey, - fileName, - fileType - } - // need to insert the upload data before starting otherwise the upload finished - // even might fire faster, than the db insert, causing missing asset data in the db - await BlobStorage().insert(dbFile).onConflict(['id', 'streamId']).ignore() - - const { fileHash } = await storeFileStream({ objectKey, fileStream }) - // here we should also update the blob db record with the fileHash - await BlobStorage().where({ id: blobId }).update({ fileHash }) - return { blobId, fileName, fileHash } -} - /** * @returns {import('@/modules/blobstorage/helpers/types').BlobStorageRecord | null} */ @@ -152,7 +124,6 @@ module.exports = { cursorFromRows, decodeCursor, getBlobMetadata, - uploadFileStream, markUploadSuccess, markUploadOverFileSizeLimit, markUploadError, diff --git a/packages/server/modules/blobstorage/services/upload.ts b/packages/server/modules/blobstorage/services/upload.ts new file mode 100644 index 0000000000..36d0a103be --- /dev/null +++ b/packages/server/modules/blobstorage/services/upload.ts @@ -0,0 +1,41 @@ +import { UpdateBlob, UpsertBlob } from '@/modules/blobstorage/domain/operations' +import { BadRequestError } from '@/modules/shared/errors' + +export const uploadFileStreamFactory = + (deps: { upsertBlob: UpsertBlob; updateBlob: UpdateBlob }) => + async ( + storeFileStream: (params: { + objectKey: string + fileStream: Buffer + }) => Promise<{ fileHash: string }>, + params1: { streamId: string; userId: string }, + params2: { blobId: string; fileName: string; fileType: string; fileStream: Buffer } + ) => { + const { streamId, userId } = params1 + const { blobId, fileName, fileType, fileStream } = params2 + + if (streamId.length !== 10) + throw new BadRequestError('The stream id has to be of length 10') + if (userId.length !== 10) + throw new BadRequestError('The user id has to be of length 10') + + const objectKey = `assets/${streamId}/${blobId}` + const dbFile = { + id: blobId, + streamId, + userId, + objectKey, + fileName, + fileType + } + // need to insert the upload data before starting otherwise the upload finished + // even might fire faster, than the db insert, causing missing asset data in the db + await deps.upsertBlob(dbFile) + + const { fileHash } = await storeFileStream({ objectKey, fileStream }) + + // here we should also update the blob db record with the fileHash + await deps.updateBlob({ id: blobId, item: { fileHash } }) + + return { blobId, fileName, fileHash } + } diff --git a/packages/server/modules/blobstorage/tests/blobstorage.spec.js b/packages/server/modules/blobstorage/tests/blobstorage.spec.js index 26c11bbd9c..ef2fba7e2c 100644 --- a/packages/server/modules/blobstorage/tests/blobstorage.spec.js +++ b/packages/server/modules/blobstorage/tests/blobstorage.spec.js @@ -1,7 +1,6 @@ const expect = require('chai').expect const { beforeEachContext } = require('@/test/hooks') const { - uploadFileStream, getBlobMetadata, getBlobMetadataCollection, cursorFromRows, @@ -19,8 +18,18 @@ const { } = require('@/modules/shared/errors') const { range } = require('lodash') const { fakeIdGenerator, createBlobs } = require('@/modules/blobstorage/tests/helpers') +const { uploadFileStreamFactory } = require('@/modules/blobstorage/services/upload') +const { + upsertBlobFactory, + updateBlobFactory +} = require('@/modules/blobstorage/repositories') +const { db } = require('@/db/knex') const fakeFileStreamStore = (fakeHash) => async () => ({ fileHash: fakeHash }) +const uploadFileStream = uploadFileStreamFactory({ + upsertBlob: upsertBlobFactory({ db }), + updateBlob: updateBlobFactory({ db }) +}) describe('Blob storage @blobstorage', () => { before(async () => { diff --git a/packages/server/modules/gendo/services/index.ts b/packages/server/modules/gendo/services/index.ts index 347f23b97f..cea85e6bda 100644 --- a/packages/server/modules/gendo/services/index.ts +++ b/packages/server/modules/gendo/services/index.ts @@ -5,7 +5,17 @@ import { GendoAIRenderRecord } from '@/modules/gendo/helpers/types' import { ProjectSubscriptions, publish } from '@/modules/shared/utils/subscriptions' import { Merge } from 'type-fest' import { storeFileStream } from '@/modules/blobstorage/objectStorage' -import { uploadFileStream } from '@/modules/blobstorage/services' +import { uploadFileStreamFactory } from '@/modules/blobstorage/services/upload' +import { + updateBlobFactory, + upsertBlobFactory +} from '@/modules/blobstorage/repositories' +import { db } from '@/db/knex' + +const uploadFileStream = uploadFileStreamFactory({ + upsertBlob: upsertBlobFactory({ db }), + updateBlob: updateBlobFactory({ db }) +}) export async function createGendoAIRenderRequest( input: GendoAiRenderInput & {