Skip to content

Commit

Permalink
feat: tus with linked buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jul 5, 2023
2 parents 197cdf0 + 0cf1f70 commit f1a3b29
Show file tree
Hide file tree
Showing 73 changed files with 1,301 additions and 553 deletions.
38 changes: 0 additions & 38 deletions migrations/multitenant/0006-add-tenants-external-credentials.sql

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
alter table storage.objects add column if not exists owner_id text default null;
alter table storage.buckets add column if not exists owner_id text default null;

comment on column storage.objects.owner is 'Field is deprecated, use owner_id instead';
comment on column storage.buckets.owner is 'Field is deprecated, use owner_id instead';

ALTER TABLE storage.buckets
DROP CONSTRAINT IF EXISTS buckets_owner_fkey;
14 changes: 14 additions & 0 deletions migrations/tenant/0018-external-buckets.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE bucket_credentials (
"id" uuid NOT NULL DEFAULT extensions.uuid_generate_v4(),
name text NOT NULL unique,
access_key text NULL,
secret_key text NULL,
role text null,
region text not null,
endpoint text NULL,
force_path_style boolean NOT NULL default false,
PRIMARY KEY (id)
);

ALTER TABLE storage.buckets ADD COLUMN credential_id uuid DEFAULT NULL;
ALTER TABLE storage.buckets ADD CONSTRAINT fk_bucket_credential FOREIGN KEY (credential_id) REFERENCES bucket_credentials(id);
1 change: 1 addition & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const build = (opts: buildOpts = {}): FastifyInstance => {
app.register(plugins.logTenantId)
app.register(plugins.logRequest({ excludeUrls: ['/status', '/metrics'] }))
app.register(routes.multiPart, { prefix: 'upload/resumable' })
app.register(routes.credentials, { prefix: 'credentials' })
app.register(routes.bucket, { prefix: 'bucket' })
app.register(routes.object, { prefix: 'object' })
app.register(routes.render, { prefix: 'render/image' })
Expand Down
21 changes: 19 additions & 2 deletions src/auth/jwt.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { getJwtSecret as getJwtSecretForTenant } from '../database/tenant'
import { getJwtSecret as getJwtSecretForTenant, getTenantConfig } from '../database/tenant'
import jwt from 'jsonwebtoken'
import crypto from 'crypto'
import { getConfig } from '../config'
import { StorageBackendError } from '../storage'

const { isMultitenant, jwtSecret, jwtAlgorithm } = getConfig()
const { isMultitenant, jwtSecret, jwtAlgorithm, serviceKey } = getConfig()

interface jwtInterface {
sub?: string
Expand All @@ -21,6 +23,21 @@ export type SignedUploadToken = {
exp: number
}

export async function compareServiceKey(tenantId: string, jwt: string) {
if (isMultitenant) {
const { serviceKey } = await getTenantConfig(tenantId)
return crypto.timingSafeEqual(Buffer.from(serviceKey), Buffer.from(jwt))
}

return crypto.timingSafeEqual(Buffer.from(serviceKey), Buffer.from(jwt))
}

export async function mustBeServiceKey(tenantId: string, jwt: string) {
if (!(await compareServiceKey(tenantId, jwt))) {
throw new StorageBackendError('unauthorized', 401, 'Unauthorized')
}
}

/**
* Gets the JWT secret key from the env PGRST_JWT_SECRET when running in single-tenant
* or querying the multi-tenant database by the given tenantId
Expand Down
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export function getConfig(): StorageConfigType {
10
),
databaseConnectionTimeout: parseInt(
getOptionalConfigFromEnv('DATABASE_CONNECTION_TIMEOUT') || '30000',
getOptionalConfigFromEnv('DATABASE_CONNECTION_TIMEOUT') || '3000',
10
),
region: getConfigFromEnv('REGION'),
Expand Down
46 changes: 13 additions & 33 deletions src/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import pg from 'pg'
import { Knex, knex } from 'knex'
import { JwtPayload } from 'jsonwebtoken'
import { getConfig } from '../config'
import { logger } from '../monitoring'
import { DbActiveConnection, DbActivePool } from '../monitoring/metrics'

// https://github.com/knex/knex/issues/387#issuecomment-51554522
Expand Down Expand Up @@ -30,6 +29,7 @@ export interface User {
}

export const connections = new Map<string, Knex>()
const searchPath = ['storage', 'public', 'extensions']

export class TenantConnection {
public readonly role: string
Expand Down Expand Up @@ -63,13 +63,13 @@ export class TenantConnection {

knexPool = knex({
client: 'pg',
searchPath: ['public', 'storage', 'extensions'],
searchPath: isExternalPool ? undefined : searchPath,
pool: {
min: 0,
max: isExternalPool ? 1 : options.maxConnections || databaseMaxConnections,
propagateCreateError: false,
acquireTimeoutMillis: databaseConnectionTimeout,
idleTimeoutMillis: isExternalPool ? 100 : undefined,
idleTimeoutMillis: isExternalPool ? 100 : databaseFreePoolAfterInactivity,
reapIntervalMillis: isExternalPool ? 110 : undefined,
},
connection: connectionString,
Expand Down Expand Up @@ -97,38 +97,12 @@ export class TenantConnection {
})

if (!isExternalPool) {
let freePoolIntervalFn: NodeJS.Timeout | undefined

knexPool.client.pool.on('poolDestroySuccess', () => {
if (freePoolIntervalFn) {
clearTimeout(freePoolIntervalFn)
}

if (connections.get(connectionString) === knexPool) {
connections.delete(connectionString)
}
})

knexPool.client.pool.on('stopReaping', () => {
if (freePoolIntervalFn) {
clearTimeout(freePoolIntervalFn)
}

freePoolIntervalFn = setTimeout(async () => {
connections.delete(connectionString)
knexPool?.destroy().catch((e) => {
logger.error(e, 'Error destroying pool')
})
clearTimeout(freePoolIntervalFn)
}, databaseFreePoolAfterInactivity)
})

knexPool.client.pool.on('startReaping', () => {
if (freePoolIntervalFn) {
clearTimeout(freePoolIntervalFn)
freePoolIntervalFn = undefined
}
})
connections.set(connectionString, knexPool)
}

Expand All @@ -141,10 +115,16 @@ export class TenantConnection {
}
}

transaction(isolation?: Knex.IsolationLevels, instance?: Knex) {
return (instance || this.pool).transactionProvider({
isolationLevel: isolation,
})
transaction(instance?: Knex): Knex.TransactionProvider {
return async () => {
const pool = instance || this.pool
const tnx = await pool.transaction()

if (!instance) {
await tnx.raw(`set search_path to ${searchPath.join(', ')}`)
}
return tnx
}
}

asSuperUser() {
Expand Down
1 change: 1 addition & 0 deletions src/database/tenant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { runMigrationsOnTenant } from './migrate'
import { knex } from './multitenant-db'
import { StorageBackendError } from '../storage'
import { JwtPayload } from 'jsonwebtoken'
import { Credential } from '../storage/schemas'

interface TenantConfig {
anonKey: string
Expand Down
6 changes: 5 additions & 1 deletion src/http/error-handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { FastifyInstance } from 'fastify'
import { isRenderableError } from '../storage'
import { FastifyError } from '@fastify/error'
import { getConfig } from '../config'

const { tusPath } = getConfig()

/**
* The global error handler for all the uncaught exceptions within a request.
Expand All @@ -20,7 +23,8 @@ export const setErrorHandler = (app: FastifyInstance) => {

if (isRenderableError(error)) {
const renderableError = error.render()
return reply.status(renderableError.statusCode === '500' ? 500 : 400).send(renderableError)
const body = request.routerPath.includes(tusPath) ? renderableError.error : renderableError
return reply.status(renderableError.statusCode === '500' ? 500 : 400).send(body)
}

// Fastify errors
Expand Down
41 changes: 41 additions & 0 deletions src/http/plugins/bucket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import fastifyPlugin from 'fastify-plugin'
import { RouteGenericInterface } from 'fastify/types/route'
import { BucketWithCredentials } from '../../storage/schemas'
import { StorageBackendError } from '../../storage'

declare module 'fastify' {
interface FastifyRequest<RouteGeneric extends RouteGenericInterface = RouteGenericInterface> {
bucket: BucketWithCredentials
}

interface FastifyContextConfig {
getParentBucketId?: ((request: FastifyRequest<any>) => string) | false
}
}

export const parentBucket = fastifyPlugin(async (fastify) => {
fastify.decorateRequest('bucket', undefined)
fastify.addHook('preHandler', async (request) => {
if (typeof request.routeConfig.getParentBucketId === 'undefined') {
throw new Error(
`getParentBucketId not defined in route ${request.routerPath} ${request.routerPath} config`
)
}

if (request.routeConfig.getParentBucketId === false) {
return
}

const bucketId = request.routeConfig.getParentBucketId(request)

if (!bucketId) {
throw new StorageBackendError('invalid_bucket', 400, 'bucket name is invalid or not provided')
}

const bucket = await request.db.asSuperUser().findBucketById(bucketId, '*', {
includeCredentials: true,
})

request.bucket = bucket
})
})
32 changes: 23 additions & 9 deletions src/http/plugins/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@ import { TenantConnection } from '../../database/connection'
import { getServiceKeyUser } from '../../database/tenant'
import { getPostgresConnection } from '../../database'
import { verifyJWT } from '../../auth'
import { Database, StorageKnexDB } from '../../storage/database'

declare module 'fastify' {
interface FastifyRequest {
db: TenantConnection
dbConnection: TenantConnection
db: Database
}
}

export const db = fastifyPlugin(async (fastify) => {
fastify.decorateRequest('db', null)
fastify.decorateRequest('dbConnection', null)

fastify.addHook('preHandler', async (request) => {
const adminUser = await getServiceKeyUser(request.tenantId)
const userPayload = await verifyJWT<{ role?: string }>(request.jwt, adminUser.jwtSecret)

request.db = await getPostgresConnection({
request.dbConnection = await getPostgresConnection({
user: {
payload: userPayload,
jwt: request.jwt,
Expand All @@ -28,11 +32,16 @@ export const db = fastifyPlugin(async (fastify) => {
path: request.url,
method: request.method,
})

request.db = new StorageKnexDB(request.dbConnection, {
tenantId: request.tenantId,
host: request.headers['x-forwarded-host'] as string,
})
})

fastify.addHook('onSend', async (request, reply, payload) => {
if (request.db) {
request.db.dispose().catch((e) => {
request.dbConnection.dispose().catch((e) => {
request.log.error(e, 'Error disposing db connection')
})
}
Expand All @@ -41,13 +50,13 @@ export const db = fastifyPlugin(async (fastify) => {

fastify.addHook('onTimeout', async (request) => {
if (request.db) {
await request.db.dispose()
await request.dbConnection.dispose()
}
})

fastify.addHook('onRequestAbort', async (request) => {
if (request.db) {
await request.db.dispose()
await request.dbConnection.dispose()
}
})
})
Expand All @@ -58,7 +67,7 @@ export const dbSuperUser = fastifyPlugin(async (fastify) => {
fastify.addHook('preHandler', async (request) => {
const adminUser = await getServiceKeyUser(request.tenantId)

request.db = await getPostgresConnection({
request.dbConnection = await getPostgresConnection({
user: adminUser,
superUser: adminUser,
tenantId: request.tenantId,
Expand All @@ -67,11 +76,16 @@ export const dbSuperUser = fastifyPlugin(async (fastify) => {
method: request.method,
headers: request.headers,
})

request.db = new StorageKnexDB(request.dbConnection, {
tenantId: request.tenantId,
host: request.headers['x-forwarded-host'] as string,
})
})

fastify.addHook('onSend', async (request, reply, payload) => {
if (request.db) {
request.db.dispose().catch((e) => {
request.dbConnection.dispose().catch((e) => {
request.log.error(e, 'Error disposing db connection')
})
}
Expand All @@ -81,13 +95,13 @@ export const dbSuperUser = fastifyPlugin(async (fastify) => {

fastify.addHook('onTimeout', async (request) => {
if (request.db) {
await request.db.dispose()
await request.dbConnection.dispose()
}
})

fastify.addHook('onRequestAbort', async (request) => {
if (request.db) {
await request.db.dispose()
await request.dbConnection.dispose()
}
})
})
1 change: 1 addition & 0 deletions src/http/plugins/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ export * from './db'
export * from './storage'
export * from './tenant-id'
export * from './tenant-feature'
export * from './bucket'
export * from './metrics'
Loading

0 comments on commit f1a3b29

Please sign in to comment.