diff --git a/migrations/multitenant/0006-add-tenants-external-credentials.sql b/migrations/multitenant/0006-add-tenants-external-credentials.sql deleted file mode 100644 index 2a759462..00000000 --- a/migrations/multitenant/0006-add-tenants-external-credentials.sql +++ /dev/null @@ -1,38 +0,0 @@ -create extension if not exists "uuid-ossp"; - -CREATE TABLE tenants_external_credentials ( - "id" uuid NOT NULL DEFAULT uuid_generate_v4(), - name text NOT NULL unique, - tenant_id text NOT NULL references tenants (id), - provider text NOT NULL default 's3', - access_key text NULL, - secret_key text NULL, - role text null, - region text not null, - endpoint text NULL, - force_path_style boolean NOT NULL default false, - PRIMARY KEY (id) -); - -create index external_buckets_tenant_id_idx on tenants_external_credentials (tenant_id); - -CREATE FUNCTION tenants_external_credentials_update_notify_trigger () - RETURNS TRIGGER -AS $$ -BEGIN - PERFORM - pg_notify('tenants_external_credentials_update', '"' || NEW.id || ':' || NEW.tenant_id || '"'); - RETURN NULL; -END; -$$ - LANGUAGE plpgsql; - -CREATE TRIGGER tenants_external_credentials_notify_trigger - AFTER DELETE ON tenants_external_credentials - FOR EACH ROW -EXECUTE PROCEDURE tenants_external_credentials_update_notify_trigger(); - -CREATE TRIGGER tenants_external_credentials_notify_trigger - AFTER UPDATE ON tenants_external_credentials - FOR EACH ROW -EXECUTE PROCEDURE tenants_external_credentials_update_notify_trigger(); \ No newline at end of file diff --git a/migrations/tenant/0017-add_owner_id_column_deprecate_owner.sql b/migrations/tenant/0017-add_owner_id_column_deprecate_owner.sql new file mode 100644 index 00000000..1c492de9 --- /dev/null +++ b/migrations/tenant/0017-add_owner_id_column_deprecate_owner.sql @@ -0,0 +1,8 @@ +alter table storage.objects add column if not exists owner_id text default null; +alter table storage.buckets add column if not exists owner_id text default null; + +comment on column storage.objects.owner is 'Field is deprecated, use owner_id instead'; +comment on column storage.buckets.owner is 'Field is deprecated, use owner_id instead'; + +ALTER TABLE storage.buckets + DROP CONSTRAINT IF EXISTS buckets_owner_fkey; diff --git a/migrations/tenant/0018-external-buckets.sql b/migrations/tenant/0018-external-buckets.sql new file mode 100644 index 00000000..d49e67d6 --- /dev/null +++ b/migrations/tenant/0018-external-buckets.sql @@ -0,0 +1,14 @@ +CREATE TABLE bucket_credentials ( + "id" uuid NOT NULL DEFAULT extensions.uuid_generate_v4(), + name text NOT NULL unique, + access_key text NULL, + secret_key text NULL, + role text null, + region text not null, + endpoint text NULL, + force_path_style boolean NOT NULL default false, + PRIMARY KEY (id) +); + +ALTER TABLE storage.buckets ADD COLUMN credential_id uuid DEFAULT NULL; +ALTER TABLE storage.buckets ADD CONSTRAINT fk_bucket_credential FOREIGN KEY (credential_id) REFERENCES bucket_credentials(id); \ No newline at end of file diff --git a/src/app.ts b/src/app.ts index 7793daf8..213a3402 100644 --- a/src/app.ts +++ b/src/app.ts @@ -60,6 +60,7 @@ const build = (opts: buildOpts = {}): FastifyInstance => { app.register(plugins.logTenantId) app.register(plugins.logRequest({ excludeUrls: ['/status', '/metrics'] })) app.register(routes.multiPart, { prefix: 'upload/resumable' }) + app.register(routes.credentials, { prefix: 'credentials' }) app.register(routes.bucket, { prefix: 'bucket' }) app.register(routes.object, { prefix: 'object' }) app.register(routes.render, { prefix: 'render/image' }) diff --git a/src/auth/jwt.ts b/src/auth/jwt.ts index ce5e7101..6960ea66 100644 --- a/src/auth/jwt.ts +++ b/src/auth/jwt.ts @@ -1,8 +1,10 @@ -import { getJwtSecret as getJwtSecretForTenant } from '../database/tenant' +import { getJwtSecret as getJwtSecretForTenant, getTenantConfig } from '../database/tenant' import jwt from 'jsonwebtoken' +import crypto from 'crypto' import { getConfig } from '../config' +import { StorageBackendError } from '../storage' -const { isMultitenant, jwtSecret, jwtAlgorithm } = getConfig() +const { isMultitenant, jwtSecret, jwtAlgorithm, serviceKey } = getConfig() interface jwtInterface { sub?: string @@ -21,6 +23,21 @@ export type SignedUploadToken = { exp: number } +export async function compareServiceKey(tenantId: string, jwt: string) { + if (isMultitenant) { + const { serviceKey } = await getTenantConfig(tenantId) + return crypto.timingSafeEqual(Buffer.from(serviceKey), Buffer.from(jwt)) + } + + return crypto.timingSafeEqual(Buffer.from(serviceKey), Buffer.from(jwt)) +} + +export async function mustBeServiceKey(tenantId: string, jwt: string) { + if (!(await compareServiceKey(tenantId, jwt))) { + throw new StorageBackendError('unauthorized', 401, 'Unauthorized') + } +} + /** * Gets the JWT secret key from the env PGRST_JWT_SECRET when running in single-tenant * or querying the multi-tenant database by the given tenantId diff --git a/src/config.ts b/src/config.ts index 2c3ccff5..c92b9a98 100644 --- a/src/config.ts +++ b/src/config.ts @@ -119,7 +119,7 @@ export function getConfig(): StorageConfigType { 10 ), databaseConnectionTimeout: parseInt( - getOptionalConfigFromEnv('DATABASE_CONNECTION_TIMEOUT') || '30000', + getOptionalConfigFromEnv('DATABASE_CONNECTION_TIMEOUT') || '3000', 10 ), region: getConfigFromEnv('REGION'), diff --git a/src/database/connection.ts b/src/database/connection.ts index 6429ec37..20070219 100644 --- a/src/database/connection.ts +++ b/src/database/connection.ts @@ -2,7 +2,6 @@ import pg from 'pg' import { Knex, knex } from 'knex' import { JwtPayload } from 'jsonwebtoken' import { getConfig } from '../config' -import { logger } from '../monitoring' import { DbActiveConnection, DbActivePool } from '../monitoring/metrics' // https://github.com/knex/knex/issues/387#issuecomment-51554522 @@ -30,6 +29,7 @@ export interface User { } export const connections = new Map() +const searchPath = ['storage', 'public', 'extensions'] export class TenantConnection { public readonly role: string @@ -63,13 +63,13 @@ export class TenantConnection { knexPool = knex({ client: 'pg', - searchPath: ['public', 'storage', 'extensions'], + searchPath: isExternalPool ? undefined : searchPath, pool: { min: 0, max: isExternalPool ? 1 : options.maxConnections || databaseMaxConnections, propagateCreateError: false, acquireTimeoutMillis: databaseConnectionTimeout, - idleTimeoutMillis: isExternalPool ? 100 : undefined, + idleTimeoutMillis: isExternalPool ? 100 : databaseFreePoolAfterInactivity, reapIntervalMillis: isExternalPool ? 110 : undefined, }, connection: connectionString, @@ -97,38 +97,12 @@ export class TenantConnection { }) if (!isExternalPool) { - let freePoolIntervalFn: NodeJS.Timeout | undefined - knexPool.client.pool.on('poolDestroySuccess', () => { - if (freePoolIntervalFn) { - clearTimeout(freePoolIntervalFn) - } - if (connections.get(connectionString) === knexPool) { connections.delete(connectionString) } }) - knexPool.client.pool.on('stopReaping', () => { - if (freePoolIntervalFn) { - clearTimeout(freePoolIntervalFn) - } - - freePoolIntervalFn = setTimeout(async () => { - connections.delete(connectionString) - knexPool?.destroy().catch((e) => { - logger.error(e, 'Error destroying pool') - }) - clearTimeout(freePoolIntervalFn) - }, databaseFreePoolAfterInactivity) - }) - - knexPool.client.pool.on('startReaping', () => { - if (freePoolIntervalFn) { - clearTimeout(freePoolIntervalFn) - freePoolIntervalFn = undefined - } - }) connections.set(connectionString, knexPool) } @@ -141,10 +115,16 @@ export class TenantConnection { } } - transaction(isolation?: Knex.IsolationLevels, instance?: Knex) { - return (instance || this.pool).transactionProvider({ - isolationLevel: isolation, - }) + transaction(instance?: Knex): Knex.TransactionProvider { + return async () => { + const pool = instance || this.pool + const tnx = await pool.transaction() + + if (!instance) { + await tnx.raw(`set search_path to ${searchPath.join(', ')}`) + } + return tnx + } } asSuperUser() { diff --git a/src/database/tenant.ts b/src/database/tenant.ts index f25f6a26..c8f16982 100644 --- a/src/database/tenant.ts +++ b/src/database/tenant.ts @@ -5,6 +5,7 @@ import { runMigrationsOnTenant } from './migrate' import { knex } from './multitenant-db' import { StorageBackendError } from '../storage' import { JwtPayload } from 'jsonwebtoken' +import { Credential } from '../storage/schemas' interface TenantConfig { anonKey: string diff --git a/src/http/error-handler.ts b/src/http/error-handler.ts index df43e4a9..df9a3db2 100644 --- a/src/http/error-handler.ts +++ b/src/http/error-handler.ts @@ -1,6 +1,9 @@ import { FastifyInstance } from 'fastify' import { isRenderableError } from '../storage' import { FastifyError } from '@fastify/error' +import { getConfig } from '../config' + +const { tusPath } = getConfig() /** * The global error handler for all the uncaught exceptions within a request. @@ -20,7 +23,8 @@ export const setErrorHandler = (app: FastifyInstance) => { if (isRenderableError(error)) { const renderableError = error.render() - return reply.status(renderableError.statusCode === '500' ? 500 : 400).send(renderableError) + const body = request.routerPath.includes(tusPath) ? renderableError.error : renderableError + return reply.status(renderableError.statusCode === '500' ? 500 : 400).send(body) } // Fastify errors diff --git a/src/http/plugins/bucket.ts b/src/http/plugins/bucket.ts new file mode 100644 index 00000000..df751853 --- /dev/null +++ b/src/http/plugins/bucket.ts @@ -0,0 +1,41 @@ +import fastifyPlugin from 'fastify-plugin' +import { RouteGenericInterface } from 'fastify/types/route' +import { BucketWithCredentials } from '../../storage/schemas' +import { StorageBackendError } from '../../storage' + +declare module 'fastify' { + interface FastifyRequest { + bucket: BucketWithCredentials + } + + interface FastifyContextConfig { + getParentBucketId?: ((request: FastifyRequest) => string) | false + } +} + +export const parentBucket = fastifyPlugin(async (fastify) => { + fastify.decorateRequest('bucket', undefined) + fastify.addHook('preHandler', async (request) => { + if (typeof request.routeConfig.getParentBucketId === 'undefined') { + throw new Error( + `getParentBucketId not defined in route ${request.routerPath} ${request.routerPath} config` + ) + } + + if (request.routeConfig.getParentBucketId === false) { + return + } + + const bucketId = request.routeConfig.getParentBucketId(request) + + if (!bucketId) { + throw new StorageBackendError('invalid_bucket', 400, 'bucket name is invalid or not provided') + } + + const bucket = await request.db.asSuperUser().findBucketById(bucketId, '*', { + includeCredentials: true, + }) + + request.bucket = bucket + }) +}) diff --git a/src/http/plugins/db.ts b/src/http/plugins/db.ts index 1662f3fd..2ac6d453 100644 --- a/src/http/plugins/db.ts +++ b/src/http/plugins/db.ts @@ -3,20 +3,24 @@ import { TenantConnection } from '../../database/connection' import { getServiceKeyUser } from '../../database/tenant' import { getPostgresConnection } from '../../database' import { verifyJWT } from '../../auth' +import { Database, StorageKnexDB } from '../../storage/database' declare module 'fastify' { interface FastifyRequest { - db: TenantConnection + dbConnection: TenantConnection + db: Database } } export const db = fastifyPlugin(async (fastify) => { fastify.decorateRequest('db', null) + fastify.decorateRequest('dbConnection', null) + fastify.addHook('preHandler', async (request) => { const adminUser = await getServiceKeyUser(request.tenantId) const userPayload = await verifyJWT<{ role?: string }>(request.jwt, adminUser.jwtSecret) - request.db = await getPostgresConnection({ + request.dbConnection = await getPostgresConnection({ user: { payload: userPayload, jwt: request.jwt, @@ -28,11 +32,16 @@ export const db = fastifyPlugin(async (fastify) => { path: request.url, method: request.method, }) + + request.db = new StorageKnexDB(request.dbConnection, { + tenantId: request.tenantId, + host: request.headers['x-forwarded-host'] as string, + }) }) fastify.addHook('onSend', async (request, reply, payload) => { if (request.db) { - request.db.dispose().catch((e) => { + request.dbConnection.dispose().catch((e) => { request.log.error(e, 'Error disposing db connection') }) } @@ -41,13 +50,13 @@ export const db = fastifyPlugin(async (fastify) => { fastify.addHook('onTimeout', async (request) => { if (request.db) { - await request.db.dispose() + await request.dbConnection.dispose() } }) fastify.addHook('onRequestAbort', async (request) => { if (request.db) { - await request.db.dispose() + await request.dbConnection.dispose() } }) }) @@ -58,7 +67,7 @@ export const dbSuperUser = fastifyPlugin(async (fastify) => { fastify.addHook('preHandler', async (request) => { const adminUser = await getServiceKeyUser(request.tenantId) - request.db = await getPostgresConnection({ + request.dbConnection = await getPostgresConnection({ user: adminUser, superUser: adminUser, tenantId: request.tenantId, @@ -67,11 +76,16 @@ export const dbSuperUser = fastifyPlugin(async (fastify) => { method: request.method, headers: request.headers, }) + + request.db = new StorageKnexDB(request.dbConnection, { + tenantId: request.tenantId, + host: request.headers['x-forwarded-host'] as string, + }) }) fastify.addHook('onSend', async (request, reply, payload) => { if (request.db) { - request.db.dispose().catch((e) => { + request.dbConnection.dispose().catch((e) => { request.log.error(e, 'Error disposing db connection') }) } @@ -81,13 +95,13 @@ export const dbSuperUser = fastifyPlugin(async (fastify) => { fastify.addHook('onTimeout', async (request) => { if (request.db) { - await request.db.dispose() + await request.dbConnection.dispose() } }) fastify.addHook('onRequestAbort', async (request) => { if (request.db) { - await request.db.dispose() + await request.dbConnection.dispose() } }) }) diff --git a/src/http/plugins/index.ts b/src/http/plugins/index.ts index bf8bdf3d..a0947b15 100644 --- a/src/http/plugins/index.ts +++ b/src/http/plugins/index.ts @@ -6,4 +6,5 @@ export * from './db' export * from './storage' export * from './tenant-id' export * from './tenant-feature' +export * from './bucket' export * from './metrics' diff --git a/src/http/plugins/jwt.ts b/src/http/plugins/jwt.ts index de4b8c2a..6d3fd518 100644 --- a/src/http/plugins/jwt.ts +++ b/src/http/plugins/jwt.ts @@ -1,6 +1,6 @@ import fastifyPlugin from 'fastify-plugin' import { createResponse } from '../generic-routes' -import { getJwtSecret, getOwner } from '../../auth' +import { getJwtSecret, getOwner, mustBeServiceKey } from '../../auth' declare module 'fastify' { interface FastifyRequest { @@ -24,3 +24,12 @@ export const jwt = fastifyPlugin(async (fastify) => { } }) }) + +export const jwtServiceKey = fastifyPlugin(async (fastify) => { + fastify.decorateRequest('jwt', '') + fastify.addHook('preHandler', async (request, reply) => { + request.jwt = (request.headers.authorization || '').substring('Bearer '.length) + + await mustBeServiceKey(request.tenantId, request.jwt) + }) +}) diff --git a/src/http/plugins/storage.ts b/src/http/plugins/storage.ts index f894c352..7d0a2cdc 100644 --- a/src/http/plugins/storage.ts +++ b/src/http/plugins/storage.ts @@ -1,25 +1,48 @@ import fastifyPlugin from 'fastify-plugin' import { StorageBackendAdapter, createStorageBackend } from '../../storage/backend' import { Storage } from '../../storage' -import { StorageKnexDB } from '../../storage/database' +import { RouteGenericInterface } from 'fastify/types/route' +import { decrypt } from '../../auth' +import { BucketWithCredentials } from '../../storage/schemas' +import { getConfig } from '../../config' declare module 'fastify' { - interface FastifyRequest { + interface FastifyRequest { storage: Storage backend: StorageBackendAdapter } } -export const storage = fastifyPlugin(async (fastify) => { - const storageBackend = createStorageBackend() +const { region, globalS3Endpoint, globalS3Bucket, globalS3ForcePathStyle } = getConfig() +export const storage = fastifyPlugin(async (fastify) => { fastify.decorateRequest('storage', undefined) fastify.addHook('preHandler', async (request) => { - const database = new StorageKnexDB(request.db, { - tenantId: request.tenantId, - host: request.headers['x-forwarded-host'] as string, - }) + let storageBackend: StorageBackendAdapter | undefined = undefined + + const parentBucket: BucketWithCredentials | undefined = request.bucket + + if (parentBucket && parentBucket.credential_id) { + storageBackend = createStorageBackend({ + bucket: parentBucket.id, + role: parentBucket.role, + endpoint: parentBucket.endpoint, + region: parentBucket.region, + forcePathStyle: parentBucket.force_path_style, + accessKey: request.bucket.access_key ? decrypt(request.bucket.access_key) : undefined, + secretKey: request.bucket.secret_key ? decrypt(request.bucket.secret_key) : undefined, + }) + } else { + storageBackend = createStorageBackend({ + prefix: request.tenantId, + bucket: globalS3Bucket, + endpoint: globalS3Endpoint, + region, + forcePathStyle: globalS3ForcePathStyle, + }) + } + request.backend = storageBackend - request.storage = new Storage(storageBackend, database) + request.storage = new Storage(request.backend, request.db) }) }) diff --git a/src/http/routes/bucket/createBucket.ts b/src/http/routes/bucket/createBucket.ts index 530d74b6..de29b350 100644 --- a/src/http/routes/bucket/createBucket.ts +++ b/src/http/routes/bucket/createBucket.ts @@ -2,6 +2,7 @@ import { FastifyInstance } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema } from '../../generic-routes' import { AuthenticatedRequest } from '../../request' +import { mustBeServiceKey } from '../../../auth' const createBucketBodySchema = { type: 'object', @@ -9,6 +10,7 @@ const createBucketBodySchema = { name: { type: 'string', examples: ['avatars'] }, id: { type: 'string', examples: ['avatars'] }, public: { type: 'boolean', examples: [false] }, + credential_id: { type: 'string' }, file_size_limit: { anyOf: [ { type: 'integer', examples: [1000], nullable: true, minimum: 0 }, @@ -58,14 +60,20 @@ export default async function routes(fastify: FastifyInstance) { id, allowed_mime_types, file_size_limit, + credential_id, } = request.body + if (credential_id) { + await mustBeServiceKey(request.tenantId, request.jwt) + } + const bucket = await request.storage.createBucket({ id: id ?? bucketName, name: bucketName, owner, public: isPublic ?? false, fileSizeLimit: file_size_limit, + credentialId: credential_id, allowedMimeTypes: allowed_mime_types ? allowed_mime_types?.filter((mime) => mime) : allowed_mime_types, diff --git a/src/http/routes/bucket/updateBucket.ts b/src/http/routes/bucket/updateBucket.ts index 9bf6e78d..f8be2e3f 100644 --- a/src/http/routes/bucket/updateBucket.ts +++ b/src/http/routes/bucket/updateBucket.ts @@ -2,11 +2,13 @@ import { FastifyInstance } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema, createResponse } from '../../generic-routes' import { AuthenticatedRequest } from '../../request' +import { mustBeServiceKey } from '../../../auth' const updateBucketBodySchema = { type: 'object', properties: { public: { type: 'boolean', examples: [false] }, + credential_id: { type: 'string' }, file_size_limit: { anyOf: [ { type: 'integer', examples: [1000], nullable: true, minimum: 0 }, @@ -55,7 +57,11 @@ export default async function routes(fastify: FastifyInstance) { async (request, response) => { const { bucketId } = request.params - const { public: isPublic, file_size_limit, allowed_mime_types } = request.body + const { public: isPublic, file_size_limit, allowed_mime_types, credential_id } = request.body + + if (credential_id) { + await mustBeServiceKey(request.tenantId, request.jwt) + } await request.storage.updateBucket(bucketId, { public: isPublic, @@ -63,6 +69,7 @@ export default async function routes(fastify: FastifyInstance) { allowedMimeTypes: allowed_mime_types ? allowed_mime_types?.filter((mime) => mime) : allowed_mime_types, + credentialId: credential_id, }) return response.status(200).send(createResponse('Successfully updated')) diff --git a/src/http/routes/credentials/index.ts b/src/http/routes/credentials/index.ts new file mode 100644 index 00000000..d2422802 --- /dev/null +++ b/src/http/routes/credentials/index.ts @@ -0,0 +1,65 @@ +import { FastifyInstance, RequestGenericInterface } from 'fastify' +import { FromSchema } from 'json-schema-to-ts' +import { db, jwtServiceKey, storage } from '../../plugins' + +const postSchema = { + body: { + type: 'object', + properties: { + name: { type: 'string' }, + access_key: { type: 'string' }, + secret_key: { type: 'string' }, + role: { type: 'string' }, + endpoint: { type: 'string' }, + region: { + type: 'string', + // oneOf: ['us-east-2', 'us-east-1', 'us-west-1', 'us-west-2', 'af-south-1'], // TODO: add more + }, + }, + required: ['name', 'provider', 'region'], + }, +} as const + +interface createCredentialsSchema extends RequestGenericInterface { + Body: FromSchema +} + +interface deleteCredentialsSchema extends RequestGenericInterface { + Params: { + credentialId: string + } +} + +export default async function routes(fastify: FastifyInstance) { + fastify.register(jwtServiceKey) + fastify.register(db) + fastify.register(storage) + + fastify.get('/', async (request, reply) => { + const credentials = await request.storage.listCredentials() + + reply.status(200).send(credentials) + }) + + fastify.post('/', async (request, reply) => { + const { name, access_key, secret_key, role, endpoint, region, force_path_style } = request.body + + const credential = await request.storage.createCredential({ + name, + access_key, + secret_key, + role, + endpoint, + region, + force_path_style: Boolean(force_path_style), + }) + + reply.status(201).send(credential) + }) + + fastify.delete('/:credentialId', async (request, reply) => { + await request.storage.deleteCredential(request.params.credentialId) + + reply.code(204).send() + }) +} diff --git a/src/http/routes/index.ts b/src/http/routes/index.ts index e0a5eb3c..d8960c6f 100644 --- a/src/http/routes/index.ts +++ b/src/http/routes/index.ts @@ -2,4 +2,5 @@ export { default as bucket } from './bucket' export { default as object } from './object' export { default as render } from './render' export { default as tenant } from './tenant' +export { default as credentials } from './credentials' export { default as multiPart } from './tus' diff --git a/src/http/routes/object/copyObject.ts b/src/http/routes/object/copyObject.ts index 60eed6bb..591be194 100644 --- a/src/http/routes/object/copyObject.ts +++ b/src/http/routes/object/copyObject.ts @@ -1,4 +1,4 @@ -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema } from '../../generic-routes' import { AuthenticatedRequest } from '../../request' @@ -36,6 +36,11 @@ export default async function routes(fastify: FastifyInstance) { '/copy', { schema, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.body.bucketId + }, + }, }, async (request, response) => { const { sourceKey, destinationKey, bucketId } = request.body @@ -47,7 +52,7 @@ export default async function routes(fastify: FastifyInstance) { ) const result = await request.storage - .from(bucketId) + .from(request.bucket) .copyObject(sourceKey, destinationKey, request.owner) return response.status(result.httpStatusCode ?? 200).send({ diff --git a/src/http/routes/object/createObject.ts b/src/http/routes/object/createObject.ts index 529dd571..9bb871bf 100644 --- a/src/http/routes/object/createObject.ts +++ b/src/http/routes/object/createObject.ts @@ -1,4 +1,4 @@ -import { FastifyInstance, RequestGenericInterface } from 'fastify' +import { FastifyInstance, FastifyRequest, RequestGenericInterface } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema } from '../../generic-routes' @@ -53,19 +53,23 @@ export default async function routes(fastify: FastifyInstance) { '/:bucketName/*', { schema, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { const contentType = request.headers['content-type'] request.log.info(`content-type is ${contentType}`) - const { bucketName } = request.params const objectName = request.params['*'] const isUpsert = request.headers['x-upsert'] === 'true' const owner = request.owner as string const { objectMetadata, path, id } = await request.storage - .from(bucketName) + .from(request.bucket) .uploadNewObject(request, { objectName, owner, diff --git a/src/http/routes/object/deleteObject.ts b/src/http/routes/object/deleteObject.ts index a4c09a15..05544943 100644 --- a/src/http/routes/object/deleteObject.ts +++ b/src/http/routes/object/deleteObject.ts @@ -1,4 +1,4 @@ -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema, createResponse } from '../../generic-routes' import { AuthenticatedRequest } from '../../request' @@ -34,12 +34,16 @@ export default async function routes(fastify: FastifyInstance) { '/:bucketName/*', { schema, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { - const { bucketName } = request.params const objectName = request.params['*'] - await request.storage.from(bucketName).deleteObject(objectName) + await request.storage.from(request.bucket).deleteObject(objectName) return response.status(200).send(createResponse('Successfully deleted')) } diff --git a/src/http/routes/object/deleteObjects.ts b/src/http/routes/object/deleteObjects.ts index 14016ee5..a7ce29c3 100644 --- a/src/http/routes/object/deleteObjects.ts +++ b/src/http/routes/object/deleteObjects.ts @@ -1,8 +1,8 @@ -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema } from '../../generic-routes' import { AuthenticatedRequest } from '../../request' -import { objectSchema } from '../../../storage/schemas/object' +import { objectSchema } from '../../../storage/schemas' const deleteObjectsParamsSchema = { type: 'object', properties: { @@ -45,12 +45,16 @@ export default async function routes(fastify: FastifyInstance) { '/:bucketName', { schema, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { - const { bucketName } = request.params const prefixes = request.body['prefixes'] - const results = await request.storage.from(bucketName).deleteObjects(prefixes) + const results = await request.storage.from(request.bucket).deleteObjects(prefixes) return response.status(200).send(results) } diff --git a/src/http/routes/object/getObject.ts b/src/http/routes/object/getObject.ts index d323a385..9e99fdc7 100644 --- a/src/http/routes/object/getObject.ts +++ b/src/http/routes/object/getObject.ts @@ -4,8 +4,6 @@ import { IncomingMessage, Server, ServerResponse } from 'http' import { getConfig } from '../../../config' import { AuthenticatedRangeRequest } from '../../request' -const { globalS3Bucket } = getConfig() - const getObjectParamsSchema = { type: 'object', properties: { @@ -37,18 +35,15 @@ async function requestHandler( unknown > ) { - const { bucketName } = request.params const { download } = request.query const objectName = request.params['*'] - const obj = await request.storage.from(bucketName).findObject(objectName, 'id, version') + const obj = await request.storage.from(request.bucket).findObject(objectName, 'id, version') // send the object from s3 - const s3Key = `${request.tenantId}/${bucketName}/${objectName}` - request.log.info(s3Key) + const s3Key = request.storage.from(request.bucket).computeObjectPath(objectName) return request.storage.renderer('asset').render(request, response, { - bucket: globalS3Bucket, key: s3Key, version: obj.version, download, @@ -69,6 +64,11 @@ export default async function routes(fastify: FastifyInstance) { response: { '4xx': { $ref: 'errorSchema#', description: 'Error response' } }, tags: ['object'], }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { return requestHandler(request, response) @@ -88,6 +88,11 @@ export default async function routes(fastify: FastifyInstance) { response: { '4xx': { $ref: 'errorSchema#' } }, tags: ['deprecated'], }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { return requestHandler(request, response) diff --git a/src/http/routes/object/getObjectInfo.ts b/src/http/routes/object/getObjectInfo.ts index 51e55f0b..080041d4 100644 --- a/src/http/routes/object/getObjectInfo.ts +++ b/src/http/routes/object/getObjectInfo.ts @@ -5,8 +5,6 @@ import { getConfig } from '../../../config' import { AuthenticatedRangeRequest } from '../../request' import { Obj } from '../../../storage/schemas' -const { globalS3Bucket } = getConfig() - const getObjectParamsSchema = { type: 'object', properties: { @@ -31,20 +29,21 @@ async function requestHandler( >, publicRoute = false ) { - const { bucketName } = request.params const objectName = request.params['*'] - const s3Key = `${request.tenantId}/${bucketName}/${objectName}` + const s3Key = request.storage.from(request.bucket).computeObjectPath(objectName) let obj: Obj if (publicRoute) { - obj = await request.storage.asSuperUser().from(bucketName).findObject(objectName, 'id,version') + obj = await request.storage + .asSuperUser() + .from(request.bucket) + .findObject(objectName, 'id,version') } else { - obj = await request.storage.from(bucketName).findObject(objectName, 'id,version') + obj = await request.storage.from(request.bucket).findObject(objectName, 'id,version') } return request.storage.renderer('head').render(request, response, { - bucket: globalS3Bucket, key: s3Key, version: obj.version, }) @@ -61,6 +60,11 @@ export async function publicRoutes(fastify: FastifyInstance) { tags: ['object'], response: { '4xx': { $ref: 'errorSchema#' } }, }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { return requestHandler(request, response, true) @@ -78,6 +82,11 @@ export async function publicRoutes(fastify: FastifyInstance) { tags: ['object'], response: { '4xx': { $ref: 'errorSchema#' } }, }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { return requestHandler(request, response, true) @@ -97,6 +106,11 @@ export async function authenticatedRoutes(fastify: FastifyInstance) { response: { '4xx': { $ref: 'errorSchema#', description: 'Error response' } }, tags: ['object'], }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { return requestHandler(request, response) @@ -113,6 +127,11 @@ export async function authenticatedRoutes(fastify: FastifyInstance) { response: { '4xx': { $ref: 'errorSchema#', description: 'Error response' } }, tags: ['object'], }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { return requestHandler(request, response) @@ -147,6 +166,11 @@ export async function authenticatedRoutes(fastify: FastifyInstance) { response: { '4xx': { $ref: 'errorSchema#' } }, tags: ['deprecated'], }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { return requestHandler(request, response) diff --git a/src/http/routes/object/getPublicObject.ts b/src/http/routes/object/getPublicObject.ts index c21e4478..13eb66d2 100644 --- a/src/http/routes/object/getPublicObject.ts +++ b/src/http/routes/object/getPublicObject.ts @@ -1,8 +1,6 @@ -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { FromSchema } from 'json-schema-to-ts' -import { getConfig } from '../../../config' - -const { globalS3Bucket } = getConfig() +import { StorageBackendError } from '../../../storage' const getPublicObjectParamsSchema = { type: 'object', @@ -41,25 +39,29 @@ export default async function routes(fastify: FastifyInstance) { response: { '4xx': { $ref: 'errorSchema#', description: 'Error response' } }, tags: ['object'], }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { - const { bucketName } = request.params const objectName = request.params['*'] const { download } = request.query - const [, obj] = await Promise.all([ - request.storage.asSuperUser().findBucket(bucketName, 'id,public', { - isPublic: true, - }), - request.storage.asSuperUser().from(bucketName).findObject(objectName, 'id,version'), - ]) + if (!request.bucket.public) { + throw new StorageBackendError('not_found', 400, 'Object not found') + } + + const obj = await request.storage + .asSuperUser() + .from(request.bucket) + .findObject(objectName, 'id,version') // send the object from s3 - const s3Key = `${request.tenantId}/${bucketName}/${objectName}` - request.log.info(s3Key) + const s3Key = request.storage.from(request.bucket).computeObjectPath(objectName) return request.storage.renderer('asset').render(request, response, { - bucket: globalS3Bucket, key: s3Key, version: obj.version, download, diff --git a/src/http/routes/object/getSignedObject.ts b/src/http/routes/object/getSignedObject.ts index 8b6b40bb..bd0172b2 100644 --- a/src/http/routes/object/getSignedObject.ts +++ b/src/http/routes/object/getSignedObject.ts @@ -1,11 +1,9 @@ -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { getConfig } from '../../../config' import { getJwtSecret, SignedToken, verifyJWT } from '../../../auth' import { StorageBackendError } from '../../../storage' -const { globalS3Bucket } = getConfig() - const getSignedObjectParamsSchema = { type: 'object', properties: { @@ -50,6 +48,11 @@ export default async function routes(fastify: FastifyInstance) { response: { '4xx': { $ref: 'errorSchema#', description: 'Error response' } }, tags: ['object'], }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { const { token } = request.query @@ -72,17 +75,15 @@ export default async function routes(fastify: FastifyInstance) { throw new StorageBackendError('InvalidSignature', 400, 'The url do not match the signature') } - const s3Key = `${request.tenantId}/${url}` - request.log.info(s3Key) - - const [bucketName, ...objParts] = url.split('/') + const [, ...objParts] = url.split('/') const obj = await request.storage .asSuperUser() - .from(bucketName) - .findObject(objParts.join('/'), 'id,version') + .from(request.bucket) + .findObject(objParts.join('/'), 'id,name,version') + + const s3Key = request.storage.from(request.bucket).computeObjectPath(obj.name) return request.storage.renderer('asset').render(request, response, { - bucket: globalS3Bucket, key: s3Key, version: obj.version, download, diff --git a/src/http/routes/object/getSignedURL.ts b/src/http/routes/object/getSignedURL.ts index 8185679d..3976b52c 100644 --- a/src/http/routes/object/getSignedURL.ts +++ b/src/http/routes/object/getSignedURL.ts @@ -1,4 +1,4 @@ -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema } from '../../generic-routes' import { AuthenticatedRequest } from '../../request' @@ -57,9 +57,13 @@ export default async function routes(fastify: FastifyInstance) { '/sign/:bucketName/*', { schema, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { - const { bucketName } = request.params const objectName = request.params['*'] const { expiresIn } = request.body @@ -77,7 +81,7 @@ export default async function routes(fastify: FastifyInstance) { : undefined const signedURL = await request.storage - .from(bucketName) + .from(request.bucket) .signObjectUrl(objectName, urlPath as string, expiresIn, transformationOptions) return response.status(200).send({ signedURL }) diff --git a/src/http/routes/object/getSignedURLs.ts b/src/http/routes/object/getSignedURLs.ts index 249aeb3e..2d8d3291 100644 --- a/src/http/routes/object/getSignedURLs.ts +++ b/src/http/routes/object/getSignedURLs.ts @@ -1,4 +1,4 @@ -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema } from '../../generic-routes' import { AuthenticatedRequest } from '../../request' @@ -65,12 +65,15 @@ export default async function routes(fastify: FastifyInstance) { '/sign/:bucketName', { schema, + config: { + getParentBucketId: (request: FastifyRequest) => + request.params.bucketName, + }, }, async (request, response) => { - const { bucketName } = request.params const { expiresIn, paths } = request.body - const signedURLs = await request.storage.from(bucketName).signObjectUrls(paths, expiresIn) + const signedURLs = await request.storage.from(request.bucket).signObjectUrls(paths, expiresIn) return response.status(200).send(signedURLs) } diff --git a/src/http/routes/object/getSignedUploadURL.ts b/src/http/routes/object/getSignedUploadURL.ts index cf595dbf..713bc889 100644 --- a/src/http/routes/object/getSignedUploadURL.ts +++ b/src/http/routes/object/getSignedUploadURL.ts @@ -1,4 +1,4 @@ -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema } from '../../generic-routes' import { AuthenticatedRequest } from '../../request' @@ -44,16 +44,19 @@ export default async function routes(fastify: FastifyInstance) { '/upload/sign/:bucketName/*', { schema, + config: { + getParentBucketId: (request: FastifyRequest) => + request.params.bucketName, + }, }, async (request, response) => { - const { bucketName } = request.params const objectName = request.params['*'] const owner = request.owner const urlPath = request.url.split('?').shift() const signedUploadURL = await request.storage - .from(bucketName) + .from(request.bucket) .signUploadObjectUrl(objectName, urlPath as string, signedUploadUrlExpirationTime, owner) return response.status(200).send({ url: signedUploadURL }) diff --git a/src/http/routes/object/index.ts b/src/http/routes/object/index.ts index c6ccf19b..27a8ef40 100644 --- a/src/http/routes/object/index.ts +++ b/src/http/routes/object/index.ts @@ -1,5 +1,5 @@ import { FastifyInstance } from 'fastify' -import { jwt, storage, dbSuperUser, db } from '../../plugins' +import { jwt, storage, dbSuperUser, db, parentBucket } from '../../plugins' import copyObject from './copyObject' import createObject from './createObject' import deleteObject from './deleteObject' @@ -23,6 +23,7 @@ export default async function routes(fastify: FastifyInstance) { fastify.register(async function authorizationContext(fastify) { fastify.register(jwt) fastify.register(db) + fastify.register(parentBucket) fastify.register(storage) fastify.register(deleteObject) @@ -41,6 +42,7 @@ export default async function routes(fastify: FastifyInstance) { fastify.register(async (fastify) => { fastify.register(dbSuperUser) + fastify.register(parentBucket) fastify.register(storage) fastify.register(getPublicObject) fastify.register(getSignedObject) diff --git a/src/http/routes/object/listObjects.ts b/src/http/routes/object/listObjects.ts index 237b0089..be25f484 100644 --- a/src/http/routes/object/listObjects.ts +++ b/src/http/routes/object/listObjects.ts @@ -1,4 +1,4 @@ -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema } from '../../generic-routes' import { AuthenticatedRequest } from '../../request' @@ -54,12 +54,15 @@ export default async function routes(fastify: FastifyInstance) { '/list/:bucketName', { schema, + config: { + getParentBucketId: (request: FastifyRequest) => + request.params.bucketName, + }, }, async (request, response) => { - const { bucketName } = request.params const { limit, offset, sortBy, search, prefix } = request.body - const results = await request.storage.from(bucketName).searchObjects(prefix, { + const results = await request.storage.from(request.bucket).searchObjects(prefix, { limit, offset, search, diff --git a/src/http/routes/object/moveObject.ts b/src/http/routes/object/moveObject.ts index 0b10ae96..6d8272b0 100644 --- a/src/http/routes/object/moveObject.ts +++ b/src/http/routes/object/moveObject.ts @@ -1,4 +1,4 @@ -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema, createResponse } from '../../generic-routes' import { AuthenticatedRequest } from '../../request' @@ -36,11 +36,18 @@ export default async function routes(fastify: FastifyInstance) { '/move', { schema, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.body.bucketId + }, + }, }, async (request, response) => { - const { destinationKey, sourceKey, bucketId } = request.body + const { destinationKey, sourceKey } = request.body - await request.storage.from(bucketId).moveObject(sourceKey, destinationKey, request.owner) + await request.storage + .from(request.bucket) + .moveObject(sourceKey, destinationKey, request.owner) return response.status(200).send(createResponse('Successfully moved')) } diff --git a/src/http/routes/object/updateObject.ts b/src/http/routes/object/updateObject.ts index 23d3b96b..30ededb9 100644 --- a/src/http/routes/object/updateObject.ts +++ b/src/http/routes/object/updateObject.ts @@ -1,4 +1,4 @@ -import { FastifyInstance, RequestGenericInterface } from 'fastify' +import { FastifyInstance, FastifyRequest, RequestGenericInterface } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { createDefaultSchema } from '../../generic-routes' @@ -50,17 +50,21 @@ export default async function routes(fastify: FastifyInstance) { '/:bucketName/*', { schema, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { const contentType = request.headers['content-type'] request.log.info(`content-type is ${contentType}`) - const { bucketName } = request.params const objectName = request.params['*'] const owner = request.owner as string const { objectMetadata, path, id } = await request.storage - .from(bucketName) + .from(request.bucket) .uploadOverridingObject(request, { owner, objectName: objectName, diff --git a/src/http/routes/object/uploadSignedObject.ts b/src/http/routes/object/uploadSignedObject.ts index 394745d3..47bdfab2 100644 --- a/src/http/routes/object/uploadSignedObject.ts +++ b/src/http/routes/object/uploadSignedObject.ts @@ -1,4 +1,4 @@ -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { getJwtSecret, SignedUploadToken, verifyJWT } from '../../../auth' import { StorageBackendError } from '../../../storage' @@ -65,6 +65,11 @@ export default async function routes(fastify: FastifyInstance) { }, tags: ['object'], }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { // Validate sender @@ -97,7 +102,7 @@ export default async function routes(fastify: FastifyInstance) { const { objectMetadata, path } = await request.storage .asSuperUser() - .from(bucketName) + .from(request.bucket) .uploadNewObject(request, { owner, objectName, diff --git a/src/http/routes/render/index.ts b/src/http/routes/render/index.ts index ec8e9e4d..9d3aee78 100644 --- a/src/http/routes/render/index.ts +++ b/src/http/routes/render/index.ts @@ -2,7 +2,7 @@ import { FastifyInstance } from 'fastify' import renderPublicImage from './renderPublicImage' import renderAuthenticatedImage from './renderAuthenticatedImage' import renderSignedImage from './renderSignedImage' -import { jwt, storage, requireTenantFeature, db, dbSuperUser } from '../../plugins' +import { jwt, storage, requireTenantFeature, db, dbSuperUser, parentBucket } from '../../plugins' import { getConfig } from '../../../config' import { rateLimiter } from './rate-limiter' @@ -22,6 +22,7 @@ export default async function routes(fastify: FastifyInstance) { fastify.register(jwt) fastify.register(db) + fastify.register(parentBucket) fastify.register(storage) fastify.register(renderAuthenticatedImage) }) @@ -34,6 +35,7 @@ export default async function routes(fastify: FastifyInstance) { } fastify.register(dbSuperUser) + fastify.register(parentBucket) fastify.register(storage) fastify.register(renderSignedImage) fastify.register(renderPublicImage) diff --git a/src/http/routes/render/renderAuthenticatedImage.ts b/src/http/routes/render/renderAuthenticatedImage.ts index e5a54176..72139db9 100644 --- a/src/http/routes/render/renderAuthenticatedImage.ts +++ b/src/http/routes/render/renderAuthenticatedImage.ts @@ -1,11 +1,8 @@ -import { getConfig } from '../../../config' import { FromSchema } from 'json-schema-to-ts' -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { ImageRenderer } from '../../../storage/renderer' import { transformationOptionsSchema } from '../../schemas/transformations' -const { globalS3Bucket } = getConfig() - const renderAuthenticatedImageParamsSchema = { type: 'object', properties: { @@ -40,20 +37,24 @@ export default async function routes(fastify: FastifyInstance) { response: { '4xx': { $ref: 'errorSchema#', description: 'Error response' } }, tags: ['object'], }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { const { download } = request.query const { bucketName } = request.params const objectName = request.params['*'] - const obj = await request.storage.from(bucketName).findObject(objectName, 'id,version') + const obj = await request.storage.from(request.bucket).findObject(objectName, 'id,version') const s3Key = `${request.tenantId}/${bucketName}/${objectName}` const renderer = request.storage.renderer('image') as ImageRenderer return renderer.setTransformations(request.query).render(request, response, { - bucket: globalS3Bucket, key: s3Key, version: obj.version, download, diff --git a/src/http/routes/render/renderPublicImage.ts b/src/http/routes/render/renderPublicImage.ts index 92e2fd2f..97adfbe6 100644 --- a/src/http/routes/render/renderPublicImage.ts +++ b/src/http/routes/render/renderPublicImage.ts @@ -1,10 +1,8 @@ -import { getConfig } from '../../../config' import { FromSchema } from 'json-schema-to-ts' -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { ImageRenderer } from '../../../storage/renderer' import { transformationOptionsSchema } from '../../schemas/transformations' - -const { globalS3Bucket } = getConfig() +import { StorageBackendError } from '../../../storage' const renderPublicImageParamsSchema = { type: 'object', @@ -40,25 +38,30 @@ export default async function routes(fastify: FastifyInstance) { response: { '4xx': { $ref: 'errorSchema#', description: 'Error response' } }, tags: ['object'], }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { const { download } = request.query - const { bucketName } = request.params const objectName = request.params['*'] - const [, obj] = await Promise.all([ - request.storage.asSuperUser().findBucket(bucketName, 'id,public', { - isPublic: true, - }), - request.storage.asSuperUser().from(bucketName).findObject(objectName, 'id,version'), - ]) + if (!request.bucket.public) { + throw new StorageBackendError('not_found', 400, 'Object not found') + } + + const obj = await request.storage + .asSuperUser() + .from(request.bucket) + .findObject(objectName, 'id,version') - const s3Key = `${request.tenantId}/${bucketName}/${objectName}` + const s3Key = request.storage.from(request.bucket).computeObjectPath(objectName) const renderer = request.storage.renderer('image') as ImageRenderer return renderer.setTransformations(request.query).render(request, response, { - bucket: globalS3Bucket, key: s3Key, version: obj.version, download, diff --git a/src/http/routes/render/renderSignedImage.ts b/src/http/routes/render/renderSignedImage.ts index f29c8627..0f60ab55 100644 --- a/src/http/routes/render/renderSignedImage.ts +++ b/src/http/routes/render/renderSignedImage.ts @@ -1,12 +1,9 @@ -import { getConfig } from '../../../config' import { FromSchema } from 'json-schema-to-ts' -import { FastifyInstance } from 'fastify' +import { FastifyInstance, FastifyRequest } from 'fastify' import { ImageRenderer } from '../../../storage/renderer' import { getJwtSecret, SignedToken, verifyJWT } from '../../../auth' import { StorageBackendError } from '../../../storage' -const { globalS3Bucket } = getConfig() - const renderAuthenticatedImageParamsSchema = { type: 'object', properties: { @@ -47,6 +44,11 @@ export default async function routes(fastify: FastifyInstance) { response: { '4xx': { $ref: 'errorSchema#', description: 'Error response' } }, tags: ['object'], }, + config: { + getParentBucketId: (request: FastifyRequest) => { + return request.params.bucketName + }, + }, }, async (request, response) => { const { token } = request.query @@ -70,20 +72,18 @@ export default async function routes(fastify: FastifyInstance) { throw new StorageBackendError('InvalidSignature', 400, 'The url do not match the signature') } - const s3Key = `${request.tenantId}/${url}` - request.log.info(s3Key) - - const [bucketName, ...objParts] = url.split('/') + const objectName = request.params['*'] const obj = await request.storage .asSuperUser() - .from(bucketName) - .findObject(objParts.join('/'), 'id,version') + .from(request.bucket) + .findObject(objectName, 'id,version') + + const s3Key = request.storage.from(request.bucket).computeObjectPath(obj.name) const renderer = request.storage.renderer('image') as ImageRenderer return renderer .setTransformationsFromString(transformations || '') .render(request, response, { - bucket: globalS3Bucket, key: s3Key, version: obj.version, download, diff --git a/src/http/routes/tus/file-store.ts b/src/http/routes/tus/file-store.ts index 86d003c1..03862aee 100644 --- a/src/http/routes/tus/file-store.ts +++ b/src/http/routes/tus/file-store.ts @@ -3,6 +3,7 @@ import { Upload } from '@tus/server' import fsExtra from 'fs-extra' import path from 'path' import { FileBackend } from '../../../storage/backend' +import { getConfig } from '../../../config' type Store = { get(key: string): Upload | undefined @@ -17,12 +18,16 @@ type FileStoreOptions = { expirationPeriodInMilliseconds?: number } +const { globalS3Bucket } = getConfig() + export class FileStore extends TusFileStore { protected fileAdapter: FileBackend constructor(protected readonly options: FileStoreOptions) { super(options) - this.fileAdapter = new FileBackend() + this.fileAdapter = new FileBackend({ + bucket: globalS3Bucket, + }) } async create(file: Upload): Promise { diff --git a/src/http/routes/tus/handlers.ts b/src/http/routes/tus/handlers.ts index d4b96901..160f4249 100644 --- a/src/http/routes/tus/handlers.ts +++ b/src/http/routes/tus/handlers.ts @@ -7,22 +7,24 @@ import { isRenderableError, Storage } from '../../../storage' import { MultiPartRequest } from './lifecycle' import { Database } from '../../../storage/database' import { OptionsHandler } from '@tus/server/handlers/OptionsHandler' -import { UploadId } from './upload-id' +import { UploadId, getFileIdFromRequest } from './upload-id' import { getFileSizeLimit } from '../../../storage/limits' import { Uploader } from '../../../storage/uploader' import { logger } from '../../../monitoring' -const reExtractFileID = /([^/]+)\/?$/ - export class Patch extends PatchHandler { getFileIdFromRequest(rawRwq: http.IncomingMessage) { - return getFileIdFromRequest(rawRwq, this.options.path) + const req = rawRwq as MultiPartRequest + return getFileIdFromRequest(rawRwq, this.options.path, { + isExternalBucket: Boolean(req.upload.bucket?.credential_id), + }) } async send(rawReq: http.IncomingMessage, res: http.ServerResponse) { const req = rawReq as MultiPartRequest - const id = this.getFileIdFromRequest(req) + const id = getFileIdFromRequest(req, this.options.path) + if (id === false) { throw ERRORS.FILE_NOT_FOUND } @@ -64,13 +66,16 @@ export class Patch extends PatchHandler { export class Head extends HeadHandler { getFileIdFromRequest(rawRwq: http.IncomingMessage) { - return getFileIdFromRequest(rawRwq, this.options.path) + const req = rawRwq as MultiPartRequest + return getFileIdFromRequest(rawRwq, this.options.path, { + isExternalBucket: Boolean(req.upload.bucket?.credential_id), + }) } async send(rawReq: http.IncomingMessage, res: http.ServerResponse) { const req = rawReq as MultiPartRequest - const id = this.getFileIdFromRequest(req) + const id = getFileIdFromRequest(req, this.options.path) if (id === false) { throw ERRORS.FILE_NOT_FOUND } @@ -116,7 +121,10 @@ export class Head extends HeadHandler { export class Post extends PostHandler { getFileIdFromRequest(rawRwq: http.IncomingMessage) { - return getFileIdFromRequest(rawRwq, this.options.path) + const req = rawRwq as MultiPartRequest + return getFileIdFromRequest(rawRwq, this.options.path, { + isExternalBucket: Boolean(req.upload.bucket?.credential_id), + }) } async send(rawReq: http.IncomingMessage, res: http.ServerResponse) { @@ -137,7 +145,9 @@ export class Post extends PostHandler { try { const id = this.options.namingFunction(req) - const uploadID = UploadId.fromString(id) + const uploadID = req.upload.bucket.credential_id + ? UploadId.fromString(`${req.upload.tenantId}/${req.upload.bucket.id}/${id}`) + : UploadId.fromString(id) return await lock(uploadID, req.upload.storage, (db) => { req.upload.storage = new Storage(req.upload.storage.backend, db) @@ -163,13 +173,20 @@ export class Post extends PostHandler { generateUrl(rawReq: http.IncomingMessage, id: string) { const req = rawReq as MultiPartRequest - id = id.split('/').slice(1).join('/') // Enforce https in production if (process.env.NODE_ENV === 'production') { req.headers['x-forwarded-proto'] = 'https' } + if (req.upload.bucket.credential_id) { + // add bucket id + id = `${req.upload.bucket.id}/${id}` + } else { + // remove tenant id + id = id.split('/').slice(1).join('/') + } + id = Buffer.from(id, 'utf-8').toString('base64url') return super.generateUrl(req, id) } @@ -177,7 +194,10 @@ export class Post extends PostHandler { export class Options extends OptionsHandler { getFileIdFromRequest(rawRwq: http.IncomingMessage) { - return getFileIdFromRequest(rawRwq, this.options.path) + const req = rawRwq as MultiPartRequest + return getFileIdFromRequest(rawRwq, this.options.path, { + isExternalBucket: Boolean(req.upload.bucket?.credential_id), + }) } async send(rawReq: http.IncomingMessage, res: http.ServerResponse) { @@ -201,7 +221,7 @@ export class Options extends OptionsHandler { const pathParts = this.options.path.split('/') if (urlParts.length > pathParts.length) { - const id = this.getFileIdFromRequest(rawReq) + const id = getFileIdFromRequest(rawReq, this.options.path) if (!id) { throw ERRORS.FILE_NOT_FOUND @@ -217,9 +237,7 @@ export class Options extends OptionsHandler { return this.write(res, 204) } - const bucket = await req.upload.storage - .asSuperUser() - .findBucket(uploadID.bucket, 'id, file_size_limit') + const bucket = await req.upload.bucket if (bucket.file_size_limit && bucket.file_size_limit > fileSizeLimit) { res.setHeader('Tus-Max-Size', fileSizeLimit.toString()) @@ -252,18 +270,6 @@ export class Options extends OptionsHandler { } } -function getFileIdFromRequest(rawRwq: http.IncomingMessage, path: string) { - const req = rawRwq as MultiPartRequest - const match = reExtractFileID.exec(req.url as string) - - if (!match || path.includes(match[1])) { - return false - } - - const idMatch = Buffer.from(match[1], 'base64url').toString('utf-8') - return req.upload.tenantId + '/' + idMatch -} - async function lock any>( id: UploadId, storage: Storage, diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index 40fce9ae..59b062b9 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -1,17 +1,19 @@ import { FastifyInstance } from 'fastify' -import { Server } from '@tus/server' -import { jwt, storage, db, dbSuperUser } from '../../plugins' +import { Metadata, Server } from '@tus/server' +import { jwt, storage, db, dbSuperUser, parentBucket } from '../../plugins' import { getConfig } from '../../../config' import * as http from 'http' -import { Storage } from '../../../storage' +import { Storage, StorageBackendError } from '../../../storage' import { S3Store } from './s3-store' import { Head, Options, Patch, Post } from './handlers' import { namingFunction, onCreate, onUploadFinish } from './lifecycle' import { ServerOptions } from '@tus/server/types' import { DataStore } from '@tus/server/models' import { getFileSizeLimit } from '../../../storage/limits' -import { UploadId } from './upload-id' import { FileStore } from './file-store' +import { BucketWithCredentials } from '../../../storage/schemas' +import { decrypt } from '../../../auth' +import { getFileIdFromRequest, UploadId } from './upload-id' import { TenantConnection } from '../../../database/connection' const { @@ -32,31 +34,53 @@ type MultiPartRequest = http.IncomingMessage & { owner?: string tenantId: string db: TenantConnection + bucket: BucketWithCredentials } } -function createTusStore() { - if (storageBackendType === 's3') { +const defaultTusStore = new S3Store({ + partSize: 6 * 1024 * 1024, // Each uploaded part will have ~6MB, + uploadExpiryMilliseconds: tusUrlExpiryMs, + s3ClientConfig: { + bucket: globalS3Bucket, + region: region, + endpoint: globalS3Endpoint, + sslEnabled: globalS3Protocol !== 'http', + s3ForcePathStyle: globalS3ForcePathStyle, + }, +}) + +function createTusStore(bucket: BucketWithCredentials) { + if (bucket.credential_id) { return new S3Store({ partSize: 6 * 1024 * 1024, // Each uploaded part will have ~6MB, uploadExpiryMilliseconds: tusUrlExpiryMs, s3ClientConfig: { - bucket: globalS3Bucket, - region: region, - endpoint: globalS3Endpoint, - sslEnabled: globalS3Protocol !== 'http', - s3ForcePathStyle: globalS3ForcePathStyle, + bucket: bucket.id, + endpoint: bucket.endpoint, + region: bucket.region, + s3ForcePathStyle: Boolean(bucket.force_path_style), + credentials: + bucket.access_key && bucket.secret_key + ? { + accessKeyId: decrypt(bucket.access_key), + secretAccessKey: decrypt(bucket.secret_key), + } + : undefined, }, }) } + if (storageBackendType === 's3') { + return defaultTusStore + } + return new FileStore({ directory: fileStoragePath + '/' + globalS3Bucket, }) } -function createTusServer() { - const datastore = createTusStore() +function createTusServer(datastore: DataStore) { const serverOptions: ServerOptions & { datastore: DataStore } = { @@ -69,11 +93,7 @@ function createTusServer() { maxFileSize: async (id, rawReq) => { const req = rawReq as MultiPartRequest - const resourceId = UploadId.fromString(id) - - const bucket = await req.upload.storage - .asSuperUser() - .findBucket(resourceId.bucket, 'id,file_size_limit') + const bucket = await req.upload.bucket const globalFileLimit = await getFileSizeLimit(req.upload.tenantId) @@ -96,15 +116,20 @@ function createTusServer() { } export default async function routes(fastify: FastifyInstance) { - const tusServer = createTusServer() - fastify.register(async function authorizationContext(fastify) { fastify.addContentTypeParser('application/offset+octet-stream', (request, payload, done) => done(null) ) + fastify.addHook('preHandler', async (req) => { + ;(req.raw as any).upload = { + tenantId: req.tenantId, + } + }) + fastify.register(jwt) fastify.register(db) + fastify.register(parentBucket) fastify.register(storage) fastify.addHook('preHandler', async (req) => { @@ -112,22 +137,57 @@ export default async function routes(fastify: FastifyInstance) { storage: req.storage, owner: req.owner, tenantId: req.tenantId, - db: req.db, + db: req.dbConnection, + bucket: req.bucket, } }) fastify.post( '/', - { schema: { summary: 'Handle POST request for TUS Resumable uploads', tags: ['object'] } }, + { + schema: { summary: 'Handle POST request for TUS Resumable uploads', tags: ['object'] }, + config: { + getParentBucketId: (req) => { + const metadataHeader = req.headers['upload-metadata'] + + if (typeof metadataHeader !== 'string') { + throw new StorageBackendError('invalid_metadata', 400, 'invalid metadata') + } + + const params = Metadata.parse(metadataHeader) + return params.bucketName || '' + }, + }, + }, (req, res) => { + const datastore = createTusStore(req.bucket) + const tusServer = createTusServer(datastore) + tusServer.handle(req.raw, res.raw) } ) fastify.post( '/*', - { schema: { summary: 'Handle POST request for TUS Resumable uploads', tags: ['object'] } }, + { + schema: { summary: 'Handle POST request for TUS Resumable uploads', tags: ['object'] }, + config: { + getParentBucketId: (req) => { + const metadataHeader = req.headers['upload-metadata'] + + if (typeof metadataHeader !== 'string') { + throw new StorageBackendError('invalid_metadata', 400, 'invalid metadata') + } + + const params = Metadata.parse(metadataHeader) + return params.bucketName || '' + }, + }, + }, (req, res) => { + const datastore = createTusStore(req.bucket) + const tusServer = createTusServer(datastore) + tusServer.handle(req.raw, res.raw) } ) @@ -136,20 +196,52 @@ export default async function routes(fastify: FastifyInstance) { '/*', { schema: { summary: 'Handle PUT request for TUS Resumable uploads', tags: ['object'] } }, (req, res) => { + const datastore = createTusStore(req.bucket) + const tusServer = createTusServer(datastore) tusServer.handle(req.raw, res.raw) } ) fastify.patch( '/*', - { schema: { summary: 'Handle PATCH request for TUS Resumable uploads', tags: ['object'] } }, + { + schema: { summary: 'Handle PATCH request for TUS Resumable uploads', tags: ['object'] }, + config: { + getParentBucketId: (req) => { + const id = getFileIdFromRequest(req.raw, tusPath) + + if (!id) { + throw new StorageBackendError('invalid_id', 400, 'invalid id') + } + const uploadId = UploadId.fromString(id) + return uploadId.bucket + }, + }, + }, (req, res) => { + const datastore = createTusStore(req.bucket) + const tusServer = createTusServer(datastore) tusServer.handle(req.raw, res.raw) } ) fastify.head( '/*', - { schema: { summary: 'Handle HEAD request for TUS Resumable uploads', tags: ['object'] } }, + { + schema: { summary: 'Handle HEAD request for TUS Resumable uploads', tags: ['object'] }, + config: { + getParentBucketId: (req) => { + const id = getFileIdFromRequest(req.raw, tusPath) + + if (!id) { + throw new StorageBackendError('invalid_id', 400, 'invalid id') + } + const uploadId = UploadId.fromString(id) + return uploadId.bucket + }, + }, + }, (req, res) => { + const datastore = createTusStore(req.bucket) + const tusServer = createTusServer(datastore) tusServer.handle(req.raw, res.raw) } ) @@ -160,15 +252,23 @@ export default async function routes(fastify: FastifyInstance) { done(null) ) + fastify.addHook('preHandler', async (req) => { + ;(req.raw as any).upload = { + tenantId: req.tenantId, + } + }) + fastify.register(dbSuperUser) + fastify.register(parentBucket) fastify.register(storage) fastify.addHook('preHandler', async (req) => { - ;(req.raw as MultiPartRequest).upload = { + ;(req.raw as Omit).upload = { storage: req.storage, owner: req.owner, tenantId: req.tenantId, - db: req.db, + db: req.dbConnection, + bucket: req.bucket, } }) @@ -180,8 +280,13 @@ export default async function routes(fastify: FastifyInstance) { summary: 'Handle OPTIONS request for TUS Resumable uploads', description: 'Handle OPTIONS request for TUS Resumable uploads', }, + config: { + getParentBucketId: false, + }, }, (req, res) => { + const datastore = createTusStore({} as BucketWithCredentials) + const tusServer = createTusServer(datastore) tusServer.handle(req.raw, res.raw) } ) @@ -194,8 +299,21 @@ export default async function routes(fastify: FastifyInstance) { summary: 'Handle OPTIONS request for TUS Resumable uploads', description: 'Handle OPTIONS request for TUS Resumable uploads', }, + config: { + getParentBucketId: (req) => { + const id = getFileIdFromRequest(req.raw, tusPath) + + if (!id) { + throw new StorageBackendError('invalid_id', 400, 'invalid id') + } + const uploadId = UploadId.fromString(id) + return uploadId.bucket + }, + }, }, (req, res) => { + const datastore = createTusStore(req.bucket) + const tusServer = createTusServer(datastore) tusServer.handle(req.raw, res.raw) } ) diff --git a/src/http/routes/tus/lifecycle.ts b/src/http/routes/tus/lifecycle.ts index bd1f180d..debb87f9 100644 --- a/src/http/routes/tus/lifecycle.ts +++ b/src/http/routes/tus/lifecycle.ts @@ -1,13 +1,11 @@ import http from 'http' import { isRenderableError, Storage } from '../../../storage' import { Metadata, Upload } from '@tus/server' -import { getConfig } from '../../../config' import { randomUUID } from 'crypto' import { UploadId } from './upload-id' import { Uploader } from '../../../storage/uploader' import { TenantConnection } from '../../../database/connection' - -const { globalS3Bucket } = getConfig() +import { BucketWithCredentials } from '../../../storage/schemas' export type MultiPartRequest = http.IncomingMessage & { upload: { @@ -16,6 +14,7 @@ export type MultiPartRequest = http.IncomingMessage & { owner?: string tenantId: string isNew: boolean + bucket: BucketWithCredentials } } @@ -37,12 +36,18 @@ export function namingFunction(rawReq: http.IncomingMessage) { const version = randomUUID() - return new UploadId({ + const uploadId = new UploadId({ tenant: req.upload.tenantId, bucket: params.bucketName || '', objectName: params.objectName || '', version, }).toString() + + if (req.upload.bucket?.credential_id) { + return uploadId.split('/').slice(2).join('/') + } + + return uploadId } catch (e) { throw e } @@ -54,15 +59,18 @@ export async function onCreate( upload: Upload ): Promise { try { - const uploadID = UploadId.fromString(upload.id) - const req = rawReq as MultiPartRequest + + const id = req.upload.bucket.credential_id + ? `${req.upload.tenantId}/${req.upload.bucket.id}/${upload.id}` + : upload.id + + const uploadID = UploadId.fromString(id) + const isUpsert = req.headers['x-upsert'] === 'true' const storage = req.upload.storage - const bucket = await storage - .asSuperUser() - .findBucket(uploadID.bucket, 'id, file_size_limit, allowed_mime_types') + const bucket = await req.upload.bucket const uploader = new Uploader(storage.backend, storage.db) @@ -101,16 +109,19 @@ export async function onUploadFinish( upload: Upload ) { const req = rawReq as MultiPartRequest - const resourceId = UploadId.fromString(upload.id) + const id = req.upload.bucket.credential_id + ? `${req.upload.tenantId}/${req.upload.bucket.id}/${upload.id}` + : upload.id + + const resourceId = UploadId.fromString(id) const isUpsert = req.headers['x-upsert'] === 'true' try { - const s3Key = `${req.upload.tenantId}/${resourceId.bucket}/${resourceId.objectName}` - const metadata = await req.upload.storage.backend.headObject( - globalS3Bucket, - s3Key, - resourceId.version - ) + const s3Key = req.upload.storage + .from(req.upload.bucket) + .computeObjectPath(resourceId.objectName) + + const metadata = await req.upload.storage.backend.headObject(s3Key, resourceId.version) const uploader = new Uploader(req.upload.storage.backend, req.upload.storage.db) diff --git a/src/http/routes/tus/s3-store.ts b/src/http/routes/tus/s3-store.ts index 40712055..b7b01f92 100644 --- a/src/http/routes/tus/s3-store.ts +++ b/src/http/routes/tus/s3-store.ts @@ -4,7 +4,6 @@ import fs from 'node:fs' import { Readable } from 'node:stream' import { TUS_RESUMABLE, Upload } from '@tus/server' import { S3UploadPart } from '../../../monitoring/metrics' -import { UploadId } from './upload-id' interface Options { partSize?: number @@ -53,11 +52,8 @@ export class S3Store extends BaseS3Store { const timer = S3UploadPart.startTimer() const result = await super.uploadPart(metadata, readStream, partNumber) - const resource = UploadId.fromString(metadata.file.id) - timer({ - tenant_id: resource.tenant, - }) + timer() return result } diff --git a/src/http/routes/tus/upload-id.ts b/src/http/routes/tus/upload-id.ts index 92777bad..49ecc66b 100644 --- a/src/http/routes/tus/upload-id.ts +++ b/src/http/routes/tus/upload-id.ts @@ -2,6 +2,8 @@ import { mustBeValidBucketName, mustBeValidKey } from '../../../storage/limits' import { StorageBackendError } from '../../../storage' import { getConfig } from '../../../config' import { FILE_VERSION_SEPARATOR, PATH_SEPARATOR, SEPARATOR } from '../../../storage/backend' +import http from 'http' +import { MultiPartRequest } from './lifecycle' interface ResourceIDOptions { tenant: string @@ -12,6 +14,8 @@ interface ResourceIDOptions { const { tusUseFileVersionSeparator } = getConfig() +const reExtractFileID = /([^/]+)\/?$/ + export class UploadId { public tenant: string public readonly bucket: string @@ -48,6 +52,37 @@ export class UploadId { } } +export function extractIdFromRequest(rawRwq: http.IncomingMessage, path: string) { + const req = rawRwq as MultiPartRequest + const match = reExtractFileID.exec(req.url as string) + + if (!match || path.includes(match[1])) { + return false + } + + return Buffer.from(match[1], 'base64url').toString('utf-8') +} + +export function getFileIdFromRequest( + rawRwq: http.IncomingMessage, + path: string, + options?: { isExternalBucket?: boolean } +) { + const req = rawRwq as MultiPartRequest + + const idMatch = extractIdFromRequest(rawRwq, path) + + if (!idMatch) { + return false + } + + if (options?.isExternalBucket) { + return idMatch.split('/').slice(1).join('/') + } + + return req.upload.tenantId + '/' + idMatch +} + function fromPathSeparator(id: string) { const idParts = id.split(PATH_SEPARATOR) diff --git a/src/monitoring/logger.ts b/src/monitoring/logger.ts index 651d846b..fc269105 100644 --- a/src/monitoring/logger.ts +++ b/src/monitoring/logger.ts @@ -63,6 +63,8 @@ const whitelistHeaders = (headers: Record) => { 'host', 'user-agent', 'x-forwarded-proto', + 'x-forwarded-host', + 'x-forwarded-port', 'referer', 'content-length', 'x-real-ip', diff --git a/src/monitoring/metrics.ts b/src/monitoring/metrics.ts index ac090985..fa1c0817 100644 --- a/src/monitoring/metrics.ts +++ b/src/monitoring/metrics.ts @@ -60,7 +60,7 @@ export const QueueJobError = new client.Gauge({ export const S3UploadPart = new client.Histogram({ name: 'storage_api_s3_upload_part', help: 'S3 upload part performance', - labelNames: ['tenant_id', 'region'], + labelNames: ['region'], }) export const DbActivePool = new client.Gauge({ diff --git a/src/queue/events/base-event.ts b/src/queue/events/base-event.ts index 36edb568..4d46bc97 100644 --- a/src/queue/events/base-event.ts +++ b/src/queue/events/base-event.ts @@ -4,10 +4,11 @@ import { getServiceKeyUser } from '../../database/tenant' import { getPostgresConnection } from '../../database' import { Storage } from '../../storage' import { StorageKnexDB } from '../../storage/database' -import { createStorageBackend } from '../../storage/backend' +import { createStorageBackend, StorageBackendAdapter } from '../../storage/backend' import { getConfig } from '../../config' import { QueueJobScheduled, QueueJobSchedulingTime } from '../../monitoring/metrics' import { logger } from '../../monitoring' +import { decrypt } from '../../auth' export interface BasePayload { $version: string @@ -19,7 +20,8 @@ export interface BasePayload { export type StaticThis = { new (...args: any): T } -const { enableQueueEvents } = getConfig() +const { enableQueueEvents, region, globalS3Endpoint, globalS3Bucket, globalS3ForcePathStyle } = + getConfig() export abstract class BaseEvent> { public static readonly version: string = 'v1' @@ -96,22 +98,55 @@ export abstract class BaseEvent> { throw new Error('not implemented') } + static getBucketId(payload: any): string | undefined { + return + } + protected static async createStorage(payload: BasePayload) { const adminUser = await getServiceKeyUser(payload.tenant.ref) - const client = await getPostgresConnection({ + const connection = await getPostgresConnection({ user: adminUser, superUser: adminUser, host: payload.tenant.host, tenantId: payload.tenant.ref, }) - const db = new StorageKnexDB(client, { + const db = new StorageKnexDB(connection, { tenantId: payload.tenant.ref, host: payload.tenant.host, }) - const storageBackend = createStorageBackend() + let storageBackend: StorageBackendAdapter | undefined + const bucketId = this.getBucketId(payload) + + if (bucketId) { + const bucket = await db.asSuperUser().findBucketById(bucketId, 'name,credential_id', { + includeCredentials: true, + }) + + if (bucket.credential_id) { + storageBackend = createStorageBackend({ + bucket: bucket.name, + role: bucket.role, + endpoint: bucket.endpoint, + region: bucket.region, + forcePathStyle: bucket.force_path_style, + accessKey: bucket.access_key ? decrypt(bucket.access_key) : undefined, + secretKey: bucket.secret_key ? decrypt(bucket.secret_key) : undefined, + }) + } + } + + if (!storageBackend) { + storageBackend = createStorageBackend({ + prefix: payload.tenant.ref, + bucket: globalS3Bucket, + endpoint: globalS3Endpoint, + region, + forcePathStyle: globalS3ForcePathStyle, + }) + } return new Storage(storageBackend, db) } diff --git a/src/queue/events/multipart-upload-completed.ts b/src/queue/events/multipart-upload-completed.ts index 31e0cf55..2bcd2b55 100644 --- a/src/queue/events/multipart-upload-completed.ts +++ b/src/queue/events/multipart-upload-completed.ts @@ -1,6 +1,5 @@ import { BaseEvent, BasePayload } from './base-event' import { Job } from 'pg-boss' -import { getConfig } from '../../config' import { S3Backend } from '../../storage/backend' import { isS3Error } from '../../storage' @@ -10,20 +9,24 @@ interface UploadCompleted extends BasePayload { version: string } -const { globalS3Bucket } = getConfig() - export class MultiPartUploadCompleted extends BaseEvent { static queueName = 'multipart:upload:completed' + static getBucketId(payload: UploadCompleted): string | undefined { + return payload.bucketName + } + static async handle(job: Job) { try { const storage = await this.createStorage(job.data) const version = job.data.version - const s3Key = `${job.data.tenant.ref}/${job.data.bucketName}/${job.data.objectName}/${version}` + const bucketStore = await storage.fromBucketId(job.data.bucketName) + + const s3Key = `${bucketStore.computeObjectPath(job.data.objectName)}/${version}` if (storage.backend instanceof S3Backend) { - await storage.backend.setMetadataToCompleted(globalS3Bucket, s3Key) + await storage.backend.setMetadataToCompleted(s3Key) } } catch (e) { if (isS3Error(e) && e.$metadata.httpStatusCode === 404) { diff --git a/src/queue/events/object-admin-delete.ts b/src/queue/events/object-admin-delete.ts index ad00d02f..ed11ed46 100644 --- a/src/queue/events/object-admin-delete.ts +++ b/src/queue/events/object-admin-delete.ts @@ -1,5 +1,4 @@ import { BaseEvent, BasePayload } from './base-event' -import { getConfig } from '../../config' import { Job } from 'pg-boss' import { withOptionalVersion } from '../../storage/backend' import { logger } from '../../monitoring' @@ -10,21 +9,25 @@ export interface ObjectDeleteEvent extends BasePayload { version?: string } -const { globalS3Bucket } = getConfig() - export class ObjectAdminDelete extends BaseEvent { static queueName = 'object:admin:delete' + static getBucketId(payload: ObjectDeleteEvent): string | undefined { + return payload.bucketId + } + static async handle(job: Job) { logger.info({ job: JSON.stringify(job) }, 'Handling ObjectAdminDelete') try { const storage = await this.createStorage(job.data) + const bucketStore = await storage.fromBucketId(job.data.bucketId) + const version = job.data.version - const s3Key = `${job.data.tenant.ref}/${job.data.bucketId}/${job.data.name}` + const s3Key = bucketStore.computeObjectPath(job.data.name) - await storage.backend.deleteObjects(globalS3Bucket, [ + await storage.backend.deleteObjects([ withOptionalVersion(s3Key, version), withOptionalVersion(s3Key, version) + '.info', ]) diff --git a/src/storage/backend/file.ts b/src/storage/backend/file.ts index 1be3df87..bcd7c5e2 100644 --- a/src/storage/backend/file.ts +++ b/src/storage/backend/file.ts @@ -10,6 +10,7 @@ import { ObjectMetadata, ObjectResponse, withOptionalVersion, + withPrefixAndVersion, } from './generic' import { StorageBackendError } from '../errors' const pipeline = promisify(stream.pipeline) @@ -31,6 +32,11 @@ const METADATA_ATTR_KEYS = { }, } +export interface FileBackendOptions { + bucket: string + prefix?: string +} + /** * FileBackend * Interacts with the file system with this FileBackend adapter @@ -38,27 +44,27 @@ const METADATA_ATTR_KEYS = { export class FileBackend implements StorageBackendAdapter { client = null filePath: string + protected prefix?: string - constructor() { + constructor(private readonly options: FileBackendOptions) { const { fileStoragePath } = getConfig() if (!fileStoragePath) { throw new Error('FILE_STORAGE_BACKEND_PATH env variable not set') } this.filePath = fileStoragePath + this.prefix = options.prefix } /** * Gets an object body and metadata - * @param bucketName * @param key * @param version */ - async getObject( - bucketName: string, - key: string, - version: string | undefined - ): Promise { - const file = path.resolve(this.filePath, withOptionalVersion(`${bucketName}/${key}`, version)) + async getObject(key: string, version: string | undefined): Promise { + const file = path.resolve( + this.filePath, + withPrefixAndVersion(`${this.options.bucket}/${key}`, this.prefix, version) + ) const body = fs.createReadStream(file) const data = await fs.stat(file) const { cacheControl, contentType } = await this.getFileMetadata(file) @@ -84,7 +90,6 @@ export class FileBackend implements StorageBackendAdapter { /** * Uploads and store an object - * @param bucketName * @param key * @param version * @param body @@ -92,7 +97,6 @@ export class FileBackend implements StorageBackendAdapter { * @param cacheControl */ async uploadObject( - bucketName: string, key: string, version: string | undefined, body: NodeJS.ReadableStream, @@ -100,7 +104,10 @@ export class FileBackend implements StorageBackendAdapter { cacheControl: string ): Promise { try { - const file = path.resolve(this.filePath, withOptionalVersion(`${bucketName}/${key}`, version)) + const file = path.resolve( + this.filePath, + withPrefixAndVersion(`${this.options.bucket}/${key}`, this.prefix, version) + ) await fs.ensureFile(file) const destFile = fs.createWriteStream(file) await pipeline(body, destFile) @@ -110,7 +117,7 @@ export class FileBackend implements StorageBackendAdapter { cacheControl, }) - const metadata = await this.headObject(bucketName, key, version) + const metadata = await this.headObject(key, version) return { ...metadata, @@ -123,13 +130,15 @@ export class FileBackend implements StorageBackendAdapter { /** * Deletes an object from the file system - * @param bucket * @param key * @param version */ - async deleteObject(bucket: string, key: string, version: string | undefined): Promise { + async deleteObject(key: string, version: string | undefined): Promise { try { - const file = path.resolve(this.filePath, withOptionalVersion(`${bucket}/${key}`, version)) + const file = path.resolve( + this.filePath, + withPrefixAndVersion(`${this.options.bucket}/${key}`, this.prefix, version) + ) await fs.remove(file) } catch (e) { if (e instanceof Error && 'code' in e) { @@ -143,23 +152,24 @@ export class FileBackend implements StorageBackendAdapter { /** * Copies an existing object to the given location - * @param bucket * @param source * @param version * @param destination * @param destinationVersion */ async copyObject( - bucket: string, source: string, version: string | undefined, destination: string, destinationVersion: string ): Promise> { - const srcFile = path.resolve(this.filePath, withOptionalVersion(`${bucket}/${source}`, version)) + const srcFile = path.resolve( + this.filePath, + withOptionalVersion(`${this.options.bucket}/${source}`, version) + ) const destFile = path.resolve( this.filePath, - withOptionalVersion(`${bucket}/${destination}`, destinationVersion) + withOptionalVersion(`${this.options.bucket}/${destination}`, destinationVersion) ) await fs.ensureFile(destFile) @@ -174,12 +184,16 @@ export class FileBackend implements StorageBackendAdapter { /** * Deletes multiple objects - * @param bucket * @param prefixes */ - async deleteObjects(bucket: string, prefixes: string[]): Promise { + async deleteObjects(prefixes: string[]): Promise { const promises = prefixes.map((prefix) => { - return fs.rm(path.resolve(this.filePath, bucket, prefix)) + return fs.rm( + path.resolve( + this.filePath, + withPrefixAndVersion(path.join(this.options.bucket, prefix), this.prefix) + ) + ) }) const results = await Promise.allSettled(promises) @@ -195,16 +209,14 @@ export class FileBackend implements StorageBackendAdapter { /** * Returns metadata information of a specific object - * @param bucket * @param key * @param version */ - async headObject( - bucket: string, - key: string, - version: string | undefined - ): Promise { - const file = path.join(this.filePath, withOptionalVersion(`${bucket}/${key}`, version)) + async headObject(key: string, version: string | undefined): Promise { + const file = path.join( + this.filePath, + withPrefixAndVersion(`${this.options.bucket}/${key}`, this.prefix, version) + ) const data = await fs.stat(file) const { cacheControl, contentType } = await this.getFileMetadata(file) @@ -226,12 +238,17 @@ export class FileBackend implements StorageBackendAdapter { /** * Returns a private url that can only be accessed internally by the system - * @param bucket * @param key * @param version */ - async privateAssetUrl(bucket: string, key: string, version: string | undefined): Promise { - return 'local:///' + path.join(this.filePath, withOptionalVersion(`${bucket}/${key}`, version)) + async privateAssetUrl(key: string, version: string | undefined): Promise { + return ( + 'local:///' + + path.join( + this.filePath, + withPrefixAndVersion(`${this.options.bucket}/${key}`, this.prefix, version) + ) + ) } async setFileMetadata(file: string, { contentType, cacheControl }: FileMetadata) { diff --git a/src/storage/backend/generic.ts b/src/storage/backend/generic.ts index b9e35710..2975a90d 100644 --- a/src/storage/backend/generic.ts +++ b/src/storage/backend/generic.ts @@ -43,12 +43,10 @@ export abstract class StorageBackendAdapter { /** * Gets an object body and metadata - * @param bucketName * @param key * @param headers */ async getObject( - bucketName: string, key: string, version: string | undefined, headers?: BrowserCacheHeaders @@ -65,7 +63,6 @@ export abstract class StorageBackendAdapter { * @param cacheControl */ async uploadObject( - bucketName: string, key: string, version: string | undefined, body: NodeJS.ReadableStream, @@ -77,24 +74,21 @@ export abstract class StorageBackendAdapter { /** * Deletes an object - * @param bucket * @param key * @param version */ - async deleteObject(bucket: string, key: string, version: string | undefined): Promise { + async deleteObject(key: string, version: string | undefined): Promise { throw new Error('deleteObject not implemented') } /** * Copies an existing object to the given location - * @param bucket * @param source * @param version * @param destination * @param destinationVersion */ async copyObject( - bucket: string, source: string, version: string | undefined, destination: string, @@ -105,34 +99,27 @@ export abstract class StorageBackendAdapter { /** * Deletes multiple objects - * @param bucket * @param prefixes */ - async deleteObjects(bucket: string, prefixes: string[]): Promise { + async deleteObjects(prefixes: string[]): Promise { throw new Error('deleteObjects not implemented') } /** * Returns metadata information of a specific object - * @param bucket * @param key * @param version */ - async headObject( - bucket: string, - key: string, - version: string | undefined - ): Promise { + async headObject(key: string, version: string | undefined): Promise { throw new Error('headObject not implemented') } /** * Returns a private url that can only be accessed internally by the system - * @param bucket * @param key * @param version */ - async privateAssetUrl(bucket: string, key: string, version: string | undefined): Promise { + async privateAssetUrl(key: string, version: string | undefined): Promise { throw new Error('privateAssetUrl not implemented') } } @@ -146,3 +133,7 @@ export const SEPARATOR = tusUseFileVersionSeparator ? FILE_VERSION_SEPARATOR : P export function withOptionalVersion(key: string, version?: string): string { return version ? `${key}${SEPARATOR}${version}` : key } + +export function withPrefixAndVersion(key: string, prefix?: string, version?: string): string { + return `${prefix ? prefix + '/' : ''}${withOptionalVersion(key, version)}` +} diff --git a/src/storage/backend/index.ts b/src/storage/backend/index.ts index e438c08e..6d82dfcb 100644 --- a/src/storage/backend/index.ts +++ b/src/storage/backend/index.ts @@ -1,22 +1,23 @@ -import { StorageBackendAdapter } from './generic' -import { FileBackend } from './file' -import { S3Backend } from './s3' +import { FileBackend, FileBackendOptions } from './file' +import { S3Backend, S3Options } from './s3' import { getConfig } from '../../config' export * from './s3' export * from './file' export * from './generic' -const { region, globalS3Endpoint, globalS3ForcePathStyle, storageBackendType } = getConfig() +const { globalS3Bucket, storageBackendType } = getConfig() -export function createStorageBackend() { - let storageBackend: StorageBackendAdapter - - if (storageBackendType === 'file') { - storageBackend = new FileBackend() - } else { - storageBackend = new S3Backend(region, globalS3Endpoint, globalS3ForcePathStyle) +export function createStorageBackend(options: S3Options | FileBackendOptions) { + switch (storageBackendType) { + case 'file': + return new FileBackend({ + prefix: options.prefix, + bucket: globalS3Bucket, + }) + case 's3': + return new S3Backend(options as S3Options) + default: + throw new Error(`unknown storage backend type ${storageBackendType}`) } - - return storageBackend } diff --git a/src/storage/backend/s3.ts b/src/storage/backend/s3.ts index d1df5d22..7c6730a8 100644 --- a/src/storage/backend/s3.ts +++ b/src/storage/backend/s3.ts @@ -1,13 +1,18 @@ import { CompleteMultipartUploadCommandOutput, CopyObjectCommand, + CreateBucketCommand, + CreateBucketCommandInput, DeleteObjectCommand, DeleteObjectsCommand, GetObjectCommand, GetObjectCommandInput, + HeadBucketCommand, + HeadBucketCommandInput, HeadObjectCommand, S3Client, S3ClientConfig, + S3ServiceException, } from '@aws-sdk/client-s3' import { Upload } from '@aws-sdk/lib-storage' import { NodeHttpHandler } from '@aws-sdk/node-http-handler' @@ -16,7 +21,7 @@ import { BrowserCacheHeaders, ObjectMetadata, ObjectResponse, - withOptionalVersion, + withPrefixAndVersion, } from './generic' import { getSignedUrl } from '@aws-sdk/s3-request-presigner' import { StorageBackendError } from '../errors' @@ -25,6 +30,30 @@ import Agent, { HttpsAgent } from 'agentkeepalive' const { globalS3Protocol, globalS3MaxSockets } = getConfig() +export interface S3Options { + bucket: string + prefix?: string + endpoint?: string + region?: string + forcePathStyle?: boolean + accessKey?: string + secretKey?: string + role?: string +} + +function createAgent() { + const agentOptions = { + maxSockets: globalS3MaxSockets, + keepAlive: true, + } + + return globalS3Protocol === 'http' + ? { httpAgent: new Agent(agentOptions) } + : { httpsAgent: new HttpsAgent(agentOptions) } +} + +const defaultAgent = createAgent() + /** * S3Backend * Interacts with an s3-compatible file system with this S3Adapter @@ -32,50 +61,79 @@ const { globalS3Protocol, globalS3MaxSockets } = getConfig() export class S3Backend implements StorageBackendAdapter { client: S3Client - constructor(region: string, endpoint?: string | undefined, globalS3ForcePathStyle?: boolean) { - const agentOptions = { - maxSockets: globalS3MaxSockets, - keepAlive: true, - } - - const agent = - globalS3Protocol === 'http' - ? { httpAgent: new Agent(agentOptions) } - : { httpsAgent: new HttpsAgent(agentOptions) } - + constructor(private readonly options: S3Options) { const params: S3ClientConfig = { - region, + region: this.options.region, runtime: 'node', requestHandler: new NodeHttpHandler({ - ...agent, + ...defaultAgent, }), } - if (endpoint) { - params.endpoint = endpoint + if (this.options.endpoint) { + params.endpoint = this.options.endpoint } - if (globalS3ForcePathStyle) { + if (this.options.forcePathStyle) { params.forcePathStyle = true } + if (this.options.accessKey && this.options.secretKey) { + params.credentials = { + accessKeyId: this.options.accessKey, + secretAccessKey: this.options.secretKey, + } + } + + if (this.options.role) { + // TODO: assume role + } + this.client = new S3Client(params) } + async createBucketIfDoesntExists(bucketName: string) { + const bucketExists = await this.checkBucketExists(bucketName) + + if (bucketExists) { + return + } + const input: CreateBucketCommandInput = { + Bucket: bucketName, + } + + return this.client.send(new CreateBucketCommand(input)) + } + + async checkBucketExists(bucketName: string) { + const input: HeadBucketCommandInput = { + Bucket: bucketName, + } + + try { + await this.client.send(new HeadBucketCommand(input)) + + return true + } catch (e: unknown) { + if (e instanceof S3ServiceException && e.$metadata.httpStatusCode === 404) { + return false + } + throw e + } + } + /** * Gets an object body and metadata - * @param bucketName * @param key * @param version * @param headers */ async getObject( - bucketName: string, key: string, version: string | undefined, headers?: BrowserCacheHeaders ): Promise { const input: GetObjectCommandInput = { - Bucket: bucketName, + Bucket: this.options.bucket, IfNoneMatch: headers?.ifNoneMatch, - Key: withOptionalVersion(key, version), + Key: withPrefixAndVersion(key, this.options.prefix, version), Range: headers?.range, } if (headers?.ifModifiedSince) { @@ -101,7 +159,6 @@ export class S3Backend implements StorageBackendAdapter { /** * Uploads and store an object - * @param bucketName * @param key * @param version * @param body @@ -109,7 +166,6 @@ export class S3Backend implements StorageBackendAdapter { * @param cacheControl */ async uploadObject( - bucketName: string, key: string, version: string | undefined, body: NodeJS.ReadableStream, @@ -120,8 +176,8 @@ export class S3Backend implements StorageBackendAdapter { const paralellUploadS3 = new Upload({ client: this.client, params: { - Bucket: bucketName, - Key: withOptionalVersion(key, version), + Bucket: this.options.bucket, + Key: withPrefixAndVersion(key, this.options.prefix, version), /* @ts-expect-error: https://github.com/aws/aws-sdk-js-v3/issues/2085 */ Body: body, ContentType: contentType, @@ -131,7 +187,7 @@ export class S3Backend implements StorageBackendAdapter { const data = (await paralellUploadS3.done()) as CompleteMultipartUploadCommandOutput - const metadata = await this.headObject(bucketName, key, version) + const metadata = await this.headObject(key, version) return { httpStatusCode: data.$metadata.httpStatusCode || metadata.httpStatusCode, @@ -148,17 +204,17 @@ export class S3Backend implements StorageBackendAdapter { } } - async setMetadataToCompleted(bucketName: string, key: string) { + async setMetadataToCompleted(key: string) { const headObject = new HeadObjectCommand({ - Bucket: bucketName, + Bucket: this.options.bucket, Key: `${key}.info`, }) const findObjResp = await this.client.send(headObject) const copyCmd = new CopyObjectCommand({ - Bucket: bucketName, - CopySource: `${bucketName}/${key}.info`, - Key: `${key}.info`, + Bucket: this.options.bucket, + CopySource: `${this.options.bucket}/${withPrefixAndVersion(key, this.options.prefix)}.info`, + Key: `${withPrefixAndVersion(key, this.options.prefix)}.info`, Metadata: { ...findObjResp.Metadata, tus_completed: 'true', @@ -171,14 +227,13 @@ export class S3Backend implements StorageBackendAdapter { /** * Deletes an object - * @param bucket * @param key * @param version */ - async deleteObject(bucket: string, key: string, version: string | undefined): Promise { + async deleteObject(key: string, version: string | undefined): Promise { const command = new DeleteObjectCommand({ - Bucket: bucket, - Key: withOptionalVersion(key, version), + Bucket: this.options.bucket, + Key: withPrefixAndVersion(key, this.options.prefix, version), }) await this.client.send(command) } @@ -192,7 +247,6 @@ export class S3Backend implements StorageBackendAdapter { * @param destinationVersion */ async copyObject( - bucket: string, source: string, version: string | undefined, destination: string, @@ -200,9 +254,13 @@ export class S3Backend implements StorageBackendAdapter { ): Promise> { try { const command = new CopyObjectCommand({ - Bucket: bucket, - CopySource: `${bucket}/${withOptionalVersion(source, version)}`, - Key: withOptionalVersion(destination, destinationVersion), + Bucket: this.options.bucket, + CopySource: `${this.options.bucket}/${withPrefixAndVersion( + source, + this.options.prefix, + version + )}`, + Key: withPrefixAndVersion(destination, this.options.prefix, destinationVersion), }) const data = await this.client.send(command) return { @@ -218,14 +276,14 @@ export class S3Backend implements StorageBackendAdapter { * @param bucket * @param prefixes */ - async deleteObjects(bucket: string, prefixes: string[]): Promise { + async deleteObjects(prefixes: string[]): Promise { try { const s3Prefixes = prefixes.map((ele) => { - return { Key: ele } + return { Key: withPrefixAndVersion(ele, this.options.prefix) } }) const command = new DeleteObjectsCommand({ - Bucket: bucket, + Bucket: this.options.bucket, Delete: { Objects: s3Prefixes, }, @@ -242,15 +300,11 @@ export class S3Backend implements StorageBackendAdapter { * @param key * @param version */ - async headObject( - bucket: string, - key: string, - version: string | undefined - ): Promise { + async headObject(key: string, version: string | undefined): Promise { try { const command = new HeadObjectCommand({ - Bucket: bucket, - Key: withOptionalVersion(key, version), + Bucket: this.options.bucket, + Key: withPrefixAndVersion(key, this.options.prefix, version), }) const data = await this.client.send(command) return { @@ -273,10 +327,10 @@ export class S3Backend implements StorageBackendAdapter { * @param key * @param version */ - async privateAssetUrl(bucket: string, key: string, version: string | undefined): Promise { + async privateAssetUrl(key: string, version: string | undefined): Promise { const input: GetObjectCommandInput = { - Bucket: bucket, - Key: withOptionalVersion(key, version), + Bucket: this.options.bucket, + Key: withPrefixAndVersion(key, this.options.prefix, version), } const command = new GetObjectCommand(input) diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index 92920751..eedd65b4 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -1,4 +1,4 @@ -import { Bucket, Obj } from '../schemas' +import { Bucket, Obj, BucketWithCredentials, Credential } from '../schemas' import { ObjectMetadata } from '../backend' import { TenantConnection } from '../../database/connection' @@ -17,6 +17,7 @@ export interface FindBucketFilters { forUpdate?: boolean forShare?: boolean dontErrorOnEmpty?: boolean + includeCredentials?: boolean } export interface FindObjectFilters { @@ -28,7 +29,6 @@ export interface FindObjectFilters { } export interface TransactionOptions { - isolation?: string retry?: number readOnly?: boolean } @@ -41,6 +41,8 @@ export interface DatabaseOptions { parentConnection?: TenantConnection } +type MaybeType = Condition extends true ? T | undefined : T + export interface Database { tenantHost: string tenantId: string @@ -60,15 +62,31 @@ export interface Database { createBucket( data: Pick< Bucket, - 'id' | 'name' | 'public' | 'owner' | 'file_size_limit' | 'allowed_mime_types' + | 'id' + | 'name' + | 'public' + | 'owner' + | 'file_size_limit' + | 'allowed_mime_types' + | 'credential_id' > ): Promise> - findBucketById( + findBucketById( bucketId: string, columns: string, filters?: Filters - ): Promise + ): Promise< + Filters['dontErrorOnEmpty'] extends true + ? Filters['includeCredentials'] extends true + ? BucketWithCredentials | undefined + : Bucket | undefined + : Filters['includeCredentials'] extends true + ? BucketWithCredentials + : Bucket + > + + listBucketByExternalCredential(credentialId: string, columns: string): Promise countObjectsInBucket(bucketId: string): Promise @@ -82,7 +100,7 @@ export interface Database { updateBucket( bucketId: string, - fields: Pick + fields: Pick ): Promise upsertObject( @@ -115,4 +133,8 @@ export interface Database { ): Promise searchObjects(bucketId: string, prefix: string, options: SearchObjectOption): Promise + + listCredentials(): Promise[]> + createCredential(credential: Omit): Promise + deleteCredential(credentialId: string): Promise> } diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index d3104880..587ff11a 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -1,4 +1,4 @@ -import { Bucket, Obj } from '../schemas' +import { Bucket, BucketWithCredentials, Credential, Obj } from '../schemas' import { RenderableError, StorageBackendError, StorageError } from '../errors' import { ObjectMetadata } from '../backend' import { Knex } from 'knex' @@ -13,6 +13,8 @@ import { import { DatabaseError } from 'pg' import { TenantConnection } from '../../database/connection' import { DbQueryPerformance } from '../../monitoring/metrics' +import { isUuid } from '../limits' +import { encrypt } from '../../auth' /** * Database @@ -41,10 +43,7 @@ export class StorageKnexDB implements Database { while (retryLeft > 0) { try { - const tnx = await this.connection.transaction( - transactionOptions?.isolation as Knex.IsolationLevels, - this.options.tnx - )() + const tnx = await this.connection.transaction(this.options.tnx)() try { await this.connection.setScope(tnx) @@ -114,17 +113,18 @@ export class StorageKnexDB implements Database { | 'owner' | 'file_size_limit' | 'allowed_mime_types' - | 'external_credential_id' + | 'credential_id' > ) { const bucketData = { id: data.id, name: data.name, - owner: data.owner, + owner: isUuid(data.owner || '') ? data.owner : undefined, + owner_id: data.owner, public: data.public, allowed_mime_types: data.allowed_mime_types, file_size_limit: data.file_size_limit, - external_credential_id: data.external_credential_id, + credential_id: data.credential_id, } const bucket = await this.runQuery('CreateBucket', async (knex) => { @@ -140,9 +140,31 @@ export class StorageKnexDB implements Database { return bucketData } - async findBucketById(bucketId: string, columns = 'id', filters?: FindBucketFilters) { + async findBucketById( + bucketId: string, + columns = 'id', + filters?: Filters + ) { const result = await this.runQuery('FindBucketById', async (knex) => { - const query = knex.from('buckets').select(columns.split(',')).where('id', bucketId) + let fields = columns.split(',') + + if (filters?.includeCredentials) { + fields = fields.map((field) => { + if (field.startsWith('buckets.')) { + return field + } + return `buckets.${field}` + }) + + fields.push('bucket_credentials.access_key') + fields.push('bucket_credentials.secret_key') + fields.push('bucket_credentials.region') + fields.push('bucket_credentials.role') + fields.push('bucket_credentials.force_path_style') + fields.push('bucket_credentials.endpoint') + } + + const query = knex.from('buckets').select(fields).where('buckets.id', bucketId) if (typeof filters?.isPublic !== 'undefined') { query.where('public', filters.isPublic) @@ -156,7 +178,13 @@ export class StorageKnexDB implements Database { query.forShare() } - return query.first() as Promise + if (filters?.includeCredentials) { + query.leftJoin('bucket_credentials', 'bucket_credentials.id', 'buckets.credential_id') + } + + return query.first() as Promise< + Filters['includeCredentials'] extends true ? BucketWithCredentials : Bucket + > }) if (!result && !filters?.dontErrorOnEmpty) { @@ -168,6 +196,15 @@ export class StorageKnexDB implements Database { return result } + async listBucketByExternalCredential(credentialId: string, columns = 'id') { + return this.runQuery('FindBucketByExternalCredentialId', async (knex) => { + return knex + .from('buckets') + .select(columns.split(',')) + .where('credential_id', credentialId) as Promise + }) + } + async countObjectsInBucket(bucketId: string) { const result = await this.runQuery('CountObjectsInBucket', (knex) => { return knex @@ -211,17 +248,14 @@ export class StorageKnexDB implements Database { async updateBucket( bucketId: string, - fields: Pick< - Bucket, - 'public' | 'file_size_limit' | 'allowed_mime_types' | 'external_credential_id' - > + fields: Pick ) { const bucket = await this.runQuery('UpdateBucket', (knex) => { return knex.from('buckets').where('id', bucketId).update({ public: fields.public, file_size_limit: fields.file_size_limit, allowed_mime_types: fields.allowed_mime_types, - external_credential_id: fields.external_credential_id, + credential_id: fields.credential_id, }) }) @@ -237,7 +271,8 @@ export class StorageKnexDB implements Database { async upsertObject(data: Pick) { const objectData = { name: data.name, - owner: data.owner, + owner: isUuid(data.owner || '') ? data.owner : undefined, + owner_id: data.owner, bucket_id: data.bucket_id, metadata: data.metadata, version: data.version, @@ -250,7 +285,8 @@ export class StorageKnexDB implements Database { .merge({ metadata: data.metadata, version: data.version, - owner: data.owner, + owner: isUuid(data.owner || '') ? data.owner : undefined, + owner_id: data.owner, }) .returning('*') }) @@ -264,15 +300,20 @@ export class StorageKnexDB implements Database { data: Pick ) { const [object] = await this.runQuery('UpdateObject', (knex) => { - return knex.from('objects').where('bucket_id', bucketId).where('name', name).update( - { - name: data.name, - owner: data.owner, - metadata: data.metadata, - version: data.version, - }, - '*' - ) + return knex + .from('objects') + .where('bucket_id', bucketId) + .where('name', name) + .update( + { + name: data.name, + owner: isUuid(data.owner || '') ? data.owner : undefined, + owner_id: data.owner, + metadata: data.metadata, + version: data.version, + }, + '*' + ) }) if (!object) { @@ -285,7 +326,8 @@ export class StorageKnexDB implements Database { async createObject(data: Pick) { const object = { name: data.name, - owner: data.owner, + owner: isUuid(data.owner || '') ? data.owner : undefined, + owner_id: data.owner, bucket_id: data.bucket_id, metadata: data.metadata, version: data.version, @@ -346,7 +388,8 @@ export class StorageKnexDB implements Database { .from('objects') .update({ last_accessed_at: new Date().toISOString(), - owner, + owner: isUuid(owner || '') ? owner : undefined, + owner_id: owner, }) .returning('*') .where({ bucket_id: bucketId, name: objectName }) @@ -456,10 +499,49 @@ export class StorageKnexDB implements Database { }) } + async listCredentials() { + return this.runQuery('CreateCredential', async (knex) => { + const result = await knex('bucket_credentials').select('id', 'name') + return result as Pick[] + }) + } + + async createCredential(credential: Omit) { + return this.runQuery('CreateCredential', async (knex) => { + const [result] = await knex('bucket_credentials') + .insert({ + name: credential.name, + access_key: credential.access_key ? encrypt(credential.access_key) : undefined, + secret_key: credential.secret_key ? encrypt(credential.secret_key) : undefined, + role: credential.role, + endpoint: credential.endpoint, + region: credential.region, + force_path_style: Boolean(credential.force_path_style), + }) + .returning('id') + + return result + }) + } + + async deleteCredential(credentialId: string) { + return this.runQuery('CreateCredential', async (knex) => { + const [result] = await knex('bucket_credentials') + .where({ id: credentialId }) + .delete() + .returning('id') + + if (!result) { + throw new StorageBackendError('Credential not found', 404, 'not_found') + } + + return result as Promise> + }) + } + protected async runQuery Promise>( queryName: string, - fn: T, - isolation?: Knex.IsolationLevels + fn: T ): Promise>> { const timer = DbQueryPerformance.startTimer({ name: queryName, @@ -475,7 +557,7 @@ export class StorageKnexDB implements Database { const needsNewTransaction = !tnx || differentScopes if (!tnx || needsNewTransaction) { - tnx = await this.connection.transaction(isolation, this.options.tnx)() + tnx = await this.connection.transaction(this.options.tnx)() tnx.on('query-error', (error: DatabaseError) => { throw DBError.fromDBError(error) }) diff --git a/src/storage/limits.ts b/src/storage/limits.ts index d5f7829f..7f841598 100644 --- a/src/storage/limits.ts +++ b/src/storage/limits.ts @@ -109,3 +109,7 @@ export function parseFileSizeToBytes(valueWithUnit: string) { ) } } + +export function isUuid(value: string) { + return /^[0-9a-f]{8}-[0-9a-f]{4}-[0-5][0-9a-f]{3}-[089ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(value) +} diff --git a/src/storage/object.ts b/src/storage/object.ts index 6dbbcf9f..11b86052 100644 --- a/src/storage/object.ts +++ b/src/storage/object.ts @@ -15,10 +15,11 @@ import { } from '../queue' import { randomUUID } from 'crypto' import { StorageBackendError } from './errors' +import { Bucket } from './schemas' export interface UploadObjectOptions { objectName: string - owner?: string + owner: string | undefined isUpsert?: boolean version?: string } @@ -35,7 +36,7 @@ export class ObjectStorage { constructor( private readonly backend: StorageBackendAdapter, private readonly db: Database, - private readonly bucketId: string + private readonly bucket: Bucket ) { this.uploader = new Uploader(backend, db) } @@ -45,7 +46,14 @@ export class ObjectStorage { * as superUser bypassing RLS rules */ asSuperUser() { - return new ObjectStorage(this.backend, this.db.asSuperUser(), this.bucketId) + return new ObjectStorage(this.backend, this.db.asSuperUser(), this.bucket) + } + + computeObjectPath(objectName: string) { + if (this.bucket.credential_id) { + return objectName + } + return `${this.bucket.id}/${objectName}` } /** @@ -56,17 +64,14 @@ export class ObjectStorage { async uploadNewObject(request: FastifyRequest, options: UploadObjectOptions) { mustBeValidKey(options.objectName, 'The object name contains invalid characters') - const path = `${this.bucketId}/${options.objectName}` - - const bucket = await this.db - .asSuperUser() - .findBucketById(this.bucketId, 'id, file_size_limit, allowed_mime_types') + const path = this.computeObjectPath(options.objectName) const { metadata, obj } = await this.uploader.upload(request, { ...options, - bucketId: this.bucketId, - fileSizeLimit: bucket.file_size_limit, - allowedMimeTypes: bucket.allowed_mime_types, + uploadPath: path, + bucketId: this.bucket.id, + fileSizeLimit: this.bucket.file_size_limit, + allowedMimeTypes: this.bucket.allowed_mime_types, }) return { objectMetadata: metadata, path, id: obj.id } @@ -75,14 +80,10 @@ export class ObjectStorage { public async uploadOverridingObject(request: FastifyRequest, options: UploadObjectOptions) { mustBeValidKey(options.objectName, 'The object name contains invalid characters') - const path = `${this.bucketId}/${options.objectName}` - - const bucket = await this.db - .asSuperUser() - .findBucketById(this.bucketId, 'id, file_size_limit, allowed_mime_types') + const path = this.computeObjectPath(options.objectName) await this.db.testPermission((db) => { - return db.updateObject(this.bucketId, options.objectName, { + return db.updateObject(this.bucket.id, options.objectName, { name: options.objectName, owner: options.owner, version: '1', @@ -91,9 +92,10 @@ export class ObjectStorage { const { metadata, obj } = await this.uploader.upload(request, { ...options, - bucketId: this.bucketId, - fileSizeLimit: bucket.file_size_limit, - allowedMimeTypes: bucket.allowed_mime_types, + uploadPath: path, + bucketId: this.bucket.id, + fileSizeLimit: this.bucket.file_size_limit, + allowedMimeTypes: this.bucket.allowed_mime_types, isUpsert: true, }) @@ -107,11 +109,11 @@ export class ObjectStorage { */ async deleteObject(objectName: string) { await this.db.withTransaction(async (db) => { - const obj = await db.asSuperUser().findObject(this.bucketId, objectName, 'id,version', { + const obj = await db.asSuperUser().findObject(this.bucket.id, objectName, 'id,version', { forUpdate: true, }) - const deleted = await db.deleteObject(this.bucketId, objectName) + const deleted = await db.deleteObject(this.bucket.id, objectName) if (!deleted) { throw new StorageBackendError('not_found', 404, 'Object Not Found') @@ -120,7 +122,7 @@ export class ObjectStorage { await ObjectAdminDelete.send({ tenant: this.db.tenant(), name: objectName, - bucketId: this.bucketId, + bucketId: this.bucket.id, version: obj.version, }) }) @@ -128,7 +130,7 @@ export class ObjectStorage { await ObjectRemoved.sendWebhook({ tenant: this.db.tenant(), name: objectName, - bucketId: this.bucketId, + bucketId: this.bucket.id, }) } @@ -151,7 +153,7 @@ export class ObjectStorage { } await this.db.withTransaction(async (db) => { - const data = await db.deleteObjects(this.bucketId, prefixesSubset, 'name') + const data = await db.deleteObjects(this.bucket.id, prefixesSubset, 'name') if (data.length > 0) { results = results.concat(data) @@ -159,10 +161,11 @@ export class ObjectStorage { // if successfully deleted, delete from s3 too // todo: consider moving this to a queue const prefixesToDelete = data.reduce((all, { name, version }) => { - all.push(withOptionalVersion(`${this.bucketId}/${name}`, version)) + const path = this.computeObjectPath(name) + all.push(withOptionalVersion(path, version)) if (version) { - all.push(withOptionalVersion(`${this.bucketId}/${name}`, version) + '.info') + all.push(withOptionalVersion(path, version) + '.info') } return all }, [] as string[]) @@ -174,7 +177,7 @@ export class ObjectStorage { ObjectRemoved.sendWebhook({ tenant: db.tenant(), name: object.name, - bucketId: this.bucketId, + bucketId: this.bucket.id, }) ) ) @@ -193,12 +196,12 @@ export class ObjectStorage { async updateObjectMetadata(objectName: string, metadata: ObjectMetadata) { mustBeValidKey(objectName, 'The object name contains invalid characters') - const result = await this.db.updateObjectMetadata(this.bucketId, objectName, metadata) + const result = await this.db.updateObjectMetadata(this.bucket.id, objectName, metadata) await ObjectUpdatedMetadata.sendWebhook({ tenant: this.db.tenant(), name: objectName, - bucketId: this.bucketId, + bucketId: this.bucket.id, metadata, }) @@ -211,7 +214,7 @@ export class ObjectStorage { * @param owner */ updateObjectOwner(objectName: string, owner?: string) { - return this.db.updateObjectOwner(this.bucketId, objectName, owner) + return this.db.updateObjectOwner(this.bucket.id, objectName, owner) } /** @@ -223,7 +226,7 @@ export class ObjectStorage { async findObject(objectName: string, columns = 'id', filters?: FindObjectFilters) { mustBeValidKey(objectName, 'The object name contains invalid characters') - return this.db.findObject(this.bucketId, objectName, columns, filters) + return this.db.findObject(this.bucket.id, objectName, columns, filters) } /** @@ -232,7 +235,7 @@ export class ObjectStorage { * @param columns */ async findObjects(objectNames: string[], columns = 'id') { - return this.db.findObjects(this.bucketId, objectNames, columns) + return this.db.findObjects(this.bucket.id, objectNames, columns) } /** @@ -252,24 +255,24 @@ export class ObjectStorage { } const newVersion = randomUUID() - const bucketId = this.bucketId - const s3SourceKey = `${bucketId}/${sourceKey}` - const s3DestinationKey = `${bucketId}/${destinationKey}` - - // We check if the user has permission to copy the object to the destination key - const originObject = await this.db.findObject( - this.bucketId, - sourceKey, - 'bucket_id,metadata,version' - ) - - await this.uploader.canUpload({ - bucketId: this.bucketId, - objectName: destinationKey, - isUpsert: false, - }) + const s3SourceKey = this.computeObjectPath(sourceKey) + const s3DestinationKey = this.computeObjectPath(destinationKey) try { + // We check if the user has permission to copy the object to the destination key + const originObject = await this.db.findObject( + this.bucket.id, + sourceKey, + 'bucket_id,metadata,version' + ) + + await this.uploader.canUpload({ + bucketId: this.bucket.id, + objectName: destinationKey, + owner, + isUpsert: false, + }) + const copyResult = await this.backend.copyObject( s3SourceKey, originObject.version, @@ -290,7 +293,7 @@ export class ObjectStorage { await ObjectCreatedCopyEvent.sendWebhook({ tenant: this.db.tenant(), name: destinationKey, - bucketId: this.bucketId, + bucketId: this.bucket.id, metadata, }) @@ -301,7 +304,7 @@ export class ObjectStorage { } catch (e) { await ObjectAdminDelete.send({ name: destinationKey, - bucketId: this.bucketId, + bucketId: this.bucket.id, tenant: this.db.tenant(), version: newVersion, }) @@ -323,13 +326,13 @@ export class ObjectStorage { } const newVersion = randomUUID() - const s3SourceKey = `${this.bucketId}/${sourceObjectName}` - const s3DestinationKey = `${this.bucketId}/${destinationObjectName}` + const s3SourceKey = this.computeObjectPath(sourceObjectName) + const s3DestinationKey = this.computeObjectPath(destinationObjectName) await this.db.testPermission((db) => { return Promise.all([ - db.findObject(this.bucketId, sourceObjectName, 'id'), - db.updateObject(this.bucketId, sourceObjectName, { + db.findObject(this.bucket.id, sourceObjectName, 'id'), + db.updateObject(this.bucket.id, sourceObjectName, { name: sourceObjectName, version: '1', owner, @@ -339,7 +342,7 @@ export class ObjectStorage { db.asSuperUser().createObject({ name: destinationObjectName, version: newVersion, - bucket_id: this.bucketId, + bucket_id: this.bucket.id, owner, }), ]) @@ -347,7 +350,7 @@ export class ObjectStorage { const sourceObj = await this.db .asSuperUser() - .findObject(this.bucketId, sourceObjectName, 'id, version') + .findObject(this.bucket.id, sourceObjectName, 'id, version') try { await this.backend.copyObject(s3SourceKey, sourceObj.version, s3DestinationKey, newVersion) @@ -358,33 +361,33 @@ export class ObjectStorage { await db.createObject({ name: destinationObjectName, version: newVersion, - bucket_id: this.bucketId, + bucket_id: this.bucket.id, owner: sourceObj.owner, metadata, }) - await db.deleteObject(this.bucketId, sourceObjectName, sourceObj.version) - await ObjectAdminDelete.send({ - name: sourceObjectName, - bucketId: this.bucketId, - tenant: this.db.tenant(), - version: sourceObj.version, - }) + await db.deleteObject(this.bucket.id, sourceObjectName, sourceObj.version) - await Promise.allSettled([ + await Promise.all([ + ObjectAdminDelete.send({ + name: sourceObjectName, + bucketId: this.bucket.id, + tenant: this.db.tenant(), + version: sourceObj.version, + }), ObjectRemovedMove.sendWebhook({ tenant: this.db.tenant(), name: sourceObjectName, - bucketId: this.bucketId, + bucketId: this.bucket.id, }), ObjectCreatedMove.sendWebhook({ tenant: this.db.tenant(), name: destinationObjectName, - bucketId: this.bucketId, + bucketId: this.bucket.id, metadata: metadata, oldObject: { name: sourceObjectName, - bucketId: this.bucketId, + bucketId: this.bucket.id, }, }), ]) @@ -392,7 +395,7 @@ export class ObjectStorage { } catch (e) { await ObjectAdminDelete.send({ name: destinationObjectName, - bucketId: this.bucketId, + bucketId: this.bucket.id, tenant: this.db.tenant(), version: newVersion, }) @@ -411,7 +414,7 @@ export class ObjectStorage { prefix = `${prefix}/` } - return this.db.searchObjects(this.bucketId, prefix, options) + return this.db.searchObjects(this.bucket.id, prefix, options) } /** @@ -482,7 +485,7 @@ export class ObjectStorage { let error = null let signedURL = null if (nameSet.has(path)) { - const urlToSign = `${this.bucketId}/${path}` + const urlToSign = `${this.bucket}/${path}` const token = await signJWT({ url: urlToSign }, jwtSecret, expiresIn) signedURL = `/object/sign/${urlToSign}?token=${token}` } else { @@ -517,7 +520,7 @@ export class ObjectStorage { // check if user has INSERT permissions await this.db.testPermission((db) => { return db.createObject({ - bucket_id: this.bucketId, + bucket_id: this.bucket.id, name: objectName, owner, metadata: {}, diff --git a/src/storage/renderer/asset.ts b/src/storage/renderer/asset.ts index 92ada546..7a914c66 100644 --- a/src/storage/renderer/asset.ts +++ b/src/storage/renderer/asset.ts @@ -12,7 +12,7 @@ export class AssetRenderer extends Renderer { } getAsset(request: FastifyRequest, options: RenderOptions) { - return this.backend.getObject(options.bucket, options.key, options.version, { + return this.backend.getObject(options.key, options.version, { ifModifiedSince: request.headers['if-modified-since'], ifNoneMatch: request.headers['if-none-match'], range: request.headers.range, diff --git a/src/storage/renderer/head.ts b/src/storage/renderer/head.ts index 2b17f4d0..d2eaafb9 100644 --- a/src/storage/renderer/head.ts +++ b/src/storage/renderer/head.ts @@ -13,7 +13,7 @@ export class HeadRenderer extends Renderer { } async getAsset(request: FastifyRequest, options: RenderOptions): Promise { - const metadata = await this.backend.headObject(options.bucket, options.key, options.version) + const metadata = await this.backend.headObject(options.key, options.version) return { metadata, diff --git a/src/storage/renderer/image.ts b/src/storage/renderer/image.ts index d09bdcdf..89581547 100644 --- a/src/storage/renderer/image.ts +++ b/src/storage/renderer/image.ts @@ -186,8 +186,8 @@ export class ImageRenderer extends Renderer { */ async getAsset(request: FastifyRequest, options: RenderOptions) { const [privateURL, headObj] = await Promise.all([ - this.backend.privateAssetUrl(options.bucket, options.key, options.version), - this.backend.headObject(options.bucket, options.key, options.version), + this.backend.privateAssetUrl(options.key, options.version), + this.backend.headObject(options.key, options.version), ]) const transformations = ImageRenderer.applyTransformation(this.transformOptions || {}) diff --git a/src/storage/renderer/renderer.ts b/src/storage/renderer/renderer.ts index 03da06a1..318c95d5 100644 --- a/src/storage/renderer/renderer.ts +++ b/src/storage/renderer/renderer.ts @@ -3,7 +3,6 @@ import { ObjectMetadata } from '../backend' import { Readable } from 'stream' export interface RenderOptions { - bucket: string key: string version: string | undefined download?: string diff --git a/src/storage/schemas/bucket.ts b/src/storage/schemas/bucket.ts index d5ddb81d..73295f67 100644 --- a/src/storage/schemas/bucket.ts +++ b/src/storage/schemas/bucket.ts @@ -1,4 +1,5 @@ import { FromSchema } from 'json-schema-to-ts' +import { Credential } from './credentials' export const bucketSchema = { $id: 'bucketSchema', @@ -9,6 +10,7 @@ export const bucketSchema = { owner: { type: 'string' }, public: { type: 'boolean' }, file_size_limit: { type: ['integer', 'null'] }, + credential_id: { type: ['string', 'null'] }, allowed_mime_types: { type: ['array', 'null'], items: { type: 'string' } }, created_at: { type: 'string' }, updated_at: { type: 'string' }, @@ -30,3 +32,7 @@ export const bucketSchema = { } as const export type Bucket = FromSchema +export type BucketWithCredentials = Bucket & + Partial> & { + credential_id?: string + } diff --git a/src/storage/schemas/credentials.ts b/src/storage/schemas/credentials.ts new file mode 100644 index 00000000..f5023d61 --- /dev/null +++ b/src/storage/schemas/credentials.ts @@ -0,0 +1,21 @@ +import { FromSchema } from 'json-schema-to-ts' + +export const credentialSchema = { + $id: 'credentialSchema', + type: 'object', + properties: { + id: { type: 'string' }, + name: { type: 'string' }, + force_path_style: { type: 'boolean' }, + access_key: { type: 'string' }, + secret_key: { type: 'string' }, + role: { type: 'string' }, + endpoint: { type: 'string' }, + region: { type: 'string', min: 1 }, + created_at: { type: 'string' }, + }, + required: ['id', 'name'], + additionalProperties: false, +} as const + +export type Credential = FromSchema diff --git a/src/storage/schemas/index.ts b/src/storage/schemas/index.ts index 90c69575..30b08979 100644 --- a/src/storage/schemas/index.ts +++ b/src/storage/schemas/index.ts @@ -1,2 +1,3 @@ export * from './object' export * from './bucket' +export * from './credentials' diff --git a/src/storage/schemas/object.ts b/src/storage/schemas/object.ts index b71846ba..e0ddbf23 100644 --- a/src/storage/schemas/object.ts +++ b/src/storage/schemas/object.ts @@ -8,6 +8,7 @@ export const objectSchema = { name: { type: 'string' }, bucket_id: { type: 'string' }, owner: { type: 'string' }, + owner_id: { type: 'string' }, version: { type: 'string' }, id: { anyOf: [{ type: 'string' }, { type: 'null' }] }, updated_at: { anyOf: [{ type: 'string' }, { type: 'null' }] }, diff --git a/src/storage/storage.ts b/src/storage/storage.ts index 3fcbae78..7dff3d9c 100644 --- a/src/storage/storage.ts +++ b/src/storage/storage.ts @@ -1,12 +1,14 @@ -import { StorageBackendAdapter, withOptionalVersion } from './backend' +import { S3Backend, StorageBackendAdapter, withOptionalVersion } from './backend' import { Database, FindBucketFilters } from './database' import { StorageBackendError } from './errors' import { AssetRenderer, HeadRenderer, ImageRenderer } from './renderer' import { getFileSizeLimit, mustBeValidBucketName, parseFileSizeToBytes } from './limits' import { getConfig } from '../config' import { ObjectStorage } from './object' +import { Bucket } from './schemas' +import { logger } from '../monitoring' -const { urlLengthLimit, globalS3Bucket } = getConfig() +const { urlLengthLimit } = getConfig() /** * Storage @@ -18,12 +20,16 @@ export class Storage { /** * Access object related functionality on a specific bucket - * @param bucketId + * @param bucket */ - from(bucketId: string) { - mustBeValidBucketName(bucketId, 'The bucketId name contains invalid characters') + from(bucket: Bucket) { + return new ObjectStorage(this.backend, this.db, bucket) + } - return new ObjectStorage(this.backend, this.db, bucketId) + async fromBucketId(bucketId: string) { + mustBeValidBucketName(bucketId, 'The bucketId name contains invalid characters') + const bucket = await this.db.asSuperUser().findBucketById(bucketId, '*') + return this.from(bucket) } /** @@ -80,6 +86,7 @@ export class Storage { > & { fileSizeLimit?: number | string | null allowedMimeTypes?: null | string[] + credentialId?: string } ) { mustBeValidBucketName(data.name, 'Bucket name invalid') @@ -97,8 +104,19 @@ export class Storage { if (data.allowedMimeTypes) { this.validateMimeType(data.allowedMimeTypes) } + bucketData.allowed_mime_types = data.allowedMimeTypes + if (this.backend instanceof S3Backend && data.credentialId) { + const backend = this.backend as S3Backend + bucketData.credential_id = data.credentialId + + return this.db.withTransaction(async (db) => { + await backend.createBucketIfDoesntExists(data.name) + return db.createBucket(bucketData) + }) + } + return this.db.createBucket(bucketData) } @@ -115,6 +133,7 @@ export class Storage { > & { fileSizeLimit?: number | string | null allowedMimeTypes?: null | string[] + credentialId?: string } ) { mustBeValidBucketName(id, 'Bucket name invalid') @@ -134,6 +153,28 @@ export class Storage { } bucketData.allowed_mime_types = data.allowedMimeTypes + if (this.backend instanceof S3Backend && data.credentialId) { + const backend = this.backend as S3Backend + bucketData.credential_id = data.credentialId + + return this.db.withTransaction(async (db) => { + const bucket = await db.findBucketById(id, 'id,name,credential_id', { + forUpdate: true, + }) + + if (!bucket.credential_id) { + throw new StorageBackendError( + 'update_credential_error', + 400, + 'cannot add credentials to an existing bucket' + ) + } + + await backend.createBucketIfDoesntExists(bucket.name) + return db.updateBucket(id, bucketData) + }) + } + return this.db.updateBucket(id, bucketData) } @@ -180,7 +221,8 @@ export class Storage { * @param bucketId */ async emptyBucket(bucketId: string) { - await this.findBucket(bucketId, 'name') + const bucket = await this.findBucket(bucketId, '*') + const objectStore = this.from(bucket) while (true) { const objects = await this.db.listObjects( @@ -201,13 +243,17 @@ export class Storage { if (deleted && deleted.length > 0) { const params = deleted.reduce((all, { name, version }) => { - const fileName = withOptionalVersion(`${this.db.tenantId}/${bucketId}/${name}`, version) + // TODO: fix this + const path = objectStore.computeObjectPath(name) + const fileName = withOptionalVersion(path, version) all.push(fileName) all.push(fileName + '.info') return all }, [] as string[]) // delete files from s3 asynchronously - this.backend.deleteObjects(globalS3Bucket, params) + this.backend.deleteObjects(params).catch((err) => { + logger.error(err, 'Error deleting objects from s3') + }) } if (deleted?.length !== objects.length) { @@ -227,6 +273,29 @@ export class Storage { } } + /** + * List credentials + */ + listCredentials() { + return this.db.listCredentials() + } + + /** + * Create credential for external access + * @param credential + */ + createCredential(credential: Parameters[0]) { + return this.db.createCredential(credential) + } + + /** + * Delete credential + * @param credentialId + */ + deleteCredential(credentialId: string) { + return this.db.deleteCredential(credentialId) + } + validateMimeType(mimeType: string[]) { for (const type of mimeType) { if (type.length > 1000) { diff --git a/src/storage/uploader.ts b/src/storage/uploader.ts index 620ec383..43f7845e 100644 --- a/src/storage/uploader.ts +++ b/src/storage/uploader.ts @@ -1,7 +1,6 @@ import { FastifyRequest } from 'fastify' import { getFileSizeLimit } from './limits' import { ObjectMetadata, StorageBackendAdapter } from './backend' -import { getConfig } from '../config' import { StorageBackendError } from './errors' import { Database } from './database' import { @@ -18,13 +17,12 @@ interface UploaderOptions extends UploadObjectOptions { allowedMimeTypes?: string[] | null } -const { globalS3Bucket } = getConfig() - export interface UploadObjectOptions { bucketId: string objectName: string + uploadPath: string id?: string - owner?: string + owner: string | undefined isUpsert?: boolean isMultipart?: boolean } @@ -67,7 +65,7 @@ export class Uploader { * We check RLS policies before proceeding * @param options */ - async prepareUpload(options: UploadObjectOptions) { + async prepareUpload(options: Omit) { await this.canUpload(options) FileUploadStarted.inc({ tenant_id: this.db.tenantId, @@ -93,11 +91,9 @@ export class Uploader { this.validateMimeType(file.mimeType, options.allowedMimeTypes) } - const path = `${options.bucketId}/${options.objectName}` - const s3Key = `${this.db.tenantId}/${path}` + const s3Key = options.uploadPath const objectMetadata = await this.backend.uploadObject( - globalS3Bucket, s3Key, version, file.body, @@ -137,7 +133,7 @@ export class Uploader { objectMetadata, isMultipart, isUpsert, - }: UploadObjectOptions & { + }: Omit & { objectMetadata: ObjectMetadata id: string emitEvent?: boolean diff --git a/src/test/bucket.test.ts b/src/test/bucket.test.ts index 998fefd7..93a30446 100644 --- a/src/test/bucket.test.ts +++ b/src/test/bucket.test.ts @@ -256,7 +256,7 @@ describe('testing public bucket functionality', () => { }, }) expect(notModifiedResponse.statusCode).toBe(304) - expect(mockGetObject.mock.calls[1][3]).toMatchObject({ + expect(mockGetObject.mock.calls[1][2]).toMatchObject({ ifModifiedSince: 'Thu, 12 Aug 2021 16:00:00 GMT', ifNoneMatch: 'abc', }) diff --git a/src/test/db/docker-compose.yml b/src/test/db/docker-compose.yml index e86ff10e..0a5f9a13 100644 --- a/src/test/db/docker-compose.yml +++ b/src/test/db/docker-compose.yml @@ -8,6 +8,7 @@ services: - '5432:5432' volumes: - ./src/test/db:/docker-entrypoint-initdb.d/ + shm_size: 1g environment: POSTGRES_DB: postgres POSTGRES_USER: postgres diff --git a/src/test/object.test.ts b/src/test/object.test.ts index 4b54c480..b46751df 100644 --- a/src/test/object.test.ts +++ b/src/test/object.test.ts @@ -69,7 +69,7 @@ describe('testing GET object', () => { }, }) expect(response.statusCode).toBe(304) - expect(mockGetObject.mock.calls[0][3]).toMatchObject({ + expect(mockGetObject.mock.calls[0][2]).toMatchObject({ ifModifiedSince: 'Thu, 12 Aug 2021 16:00:00 GMT', ifNoneMatch: 'abc', }) @@ -1143,7 +1143,7 @@ describe('testing deleting multiple objects', () => { prefixes: ['authenticated/delete-multiple3.png', 'authenticated/delete-multiple4.png'], }, }) - expect(response.statusCode).toBe(200) + expect(response.statusCode).toBe(400) expect(S3Backend.prototype.deleteObjects).not.toHaveBeenCalled() }) @@ -1531,9 +1531,9 @@ describe('testing generating signed URLs', () => { paths: [...Array(10001).keys()].map((i) => `authenticated/${i}`), }, }) - expect(response.statusCode).toBe(200) + expect(response.statusCode).toBe(400) const result = JSON.parse(response.body) - expect(result[0].error).toBe('Either the object does not exist or you do not have access to it') + expect(result.error).toBe('Bucket not found') }) test('signing url of a non existent key', async () => { @@ -1548,9 +1548,9 @@ describe('testing generating signed URLs', () => { paths: ['authenticated/notfound.jpg'], }, }) - expect(response.statusCode).toBe(200) + expect(response.statusCode).toBe(400) const result = JSON.parse(response.body) - expect(result[0].error).toBe('Either the object does not exist or you do not have access to it') + expect(result.error).toBe('Bucket not found') }) }) @@ -1593,7 +1593,7 @@ describe('testing retrieving signed URL', () => { }, }) expect(response.statusCode).toBe(304) - expect(mockGetObject.mock.calls[0][3]).toMatchObject({ + expect(mockGetObject.mock.calls[0][2]).toMatchObject({ ifModifiedSince: 'Thu, 12 Aug 2021 16:00:00 GMT', ifNoneMatch: 'abc', }) diff --git a/src/test/render-routes.test.ts b/src/test/render-routes.test.ts index 7826d8f0..c1e90698 100644 --- a/src/test/render-routes.test.ts +++ b/src/test/render-routes.test.ts @@ -9,7 +9,6 @@ import axios from 'axios' import { useMockObject } from './common' dotenv.config({ path: '.env.test' }) -const ENV = process.env const { imgProxyURL } = getConfig() describe('image rendering routes', () => { diff --git a/src/test/rls.test.ts b/src/test/rls.test.ts index cf283edf..22c6a10c 100644 --- a/src/test/rls.test.ts +++ b/src/test/rls.test.ts @@ -66,8 +66,23 @@ const testSpec = yaml.load( fs.readFileSync(path.resolve(__dirname, 'rls_tests.yaml'), 'utf8') ) as RlsTestSpec -const { serviceKey, tenantId, jwtSecret, databaseURL, globalS3Bucket } = getConfig() -const backend = createStorageBackend() +const { + serviceKey, + tenantId, + jwtSecret, + databaseURL, + globalS3Bucket, + globalS3Endpoint, + globalS3ForcePathStyle, + region, +} = getConfig() +const backend = createStorageBackend({ + prefix: tenantId, + bucket: globalS3Bucket, + endpoint: globalS3Endpoint, + region, + forcePathStyle: globalS3ForcePathStyle, +}) const client = backend.client jest.setTimeout(10000) diff --git a/src/test/tus.test.ts b/src/test/tus.test.ts index c02381ee..3fe94774 100644 --- a/src/test/tus.test.ts +++ b/src/test/tus.test.ts @@ -10,7 +10,7 @@ import * as tus from 'tus-js-client' import fs from 'fs' import app from '../app' import { FastifyInstance } from 'fastify' -import { isS3Error, Storage } from '../storage' +import { Storage } from '../storage' import { createStorageBackend } from '../storage/backend' import { CreateBucketCommand, S3Client } from '@aws-sdk/client-s3' import { logger } from '../monitoring' @@ -18,11 +18,18 @@ import { DetailedError } from 'tus-js-client' import { getServiceKeyUser } from '../database/tenant' import { checkBucketExists } from './common' -const { serviceKey, tenantId, globalS3Bucket } = getConfig() +const { serviceKey, tenantId, globalS3Bucket, globalS3Endpoint, globalS3ForcePathStyle, region } = + getConfig() const oneChunkFile = fs.createReadStream(path.resolve(__dirname, 'assets', 'sadcat.jpg')) const localServerAddress = 'http://127.0.0.1:8999' -const backend = createStorageBackend() +const backend = createStorageBackend({ + prefix: tenantId, + bucket: globalS3Bucket, + endpoint: globalS3Endpoint, + region, + forcePathStyle: globalS3ForcePathStyle, +}) const client = backend.client describe('Tus multipart', () => { @@ -112,7 +119,9 @@ describe('Tus multipart', () => { expect(result).toEqual(true) - const dbAsset = await storage.from(bucket.id).findObject(objectName, '*') + const bucketStore = await storage.fromBucketId(bucket.id) + const dbAsset = await bucketStore.findObject(objectName, '*') + expect(dbAsset).toEqual({ bucket_id: bucket.id, created_at: expect.any(Date), @@ -129,6 +138,7 @@ describe('Tus multipart', () => { }, name: objectName, owner: null, + owner_id: null, path_tokens: [objectName], updated_at: expect.any(Date), version: expect.any(String), @@ -180,7 +190,7 @@ describe('Tus multipart', () => { const err = e as DetailedError expect(err.originalResponse.getBody()).toEqual('Bucket not found') - expect(err.originalResponse.getStatus()).toEqual(404) + expect(err.originalResponse.getStatus()).toEqual(400) } })