Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reconcile orphan objects from admin endpoint #606

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/admin-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

const build = (opts: FastifyServerOptions = {}, appInstance?: FastifyInstance): FastifyInstance => {
const app = fastify(opts)
app.register(plugins.signals)
app.register(plugins.adminTenantId)
app.register(plugins.logRequest({ excludeUrls: ['/status', '/metrics', '/health'] }))
app.register(routes.tenants, { prefix: 'tenants' })
app.register(routes.objects, { prefix: 'tenants' })
app.register(routes.migrations, { prefix: 'migrations' })
app.register(routes.s3Credentials, { prefix: 's3' })

Expand All @@ -16,8 +18,8 @@
app.get('/metrics', async (req, reply) => {
if (registriesToMerge.length === 0) {
const globalRegistry = appInstance.metrics.client.register
const defaultRegistries = (appInstance.metrics as any).getCustomDefaultMetricsRegistries()

Check warning on line 21 in src/admin-app.ts

View workflow job for this annotation

GitHub Actions / Test / OS ubuntu-20.04 / Node 20

Unexpected any. Specify a different type
const routeRegistries = (appInstance.metrics as any).getCustomRouteMetricsRegistries()

Check warning on line 22 in src/admin-app.ts

View workflow job for this annotation

GitHub Actions / Test / OS ubuntu-20.04 / Node 20

Unexpected any. Specify a different type

registriesToMerge = Array.from(
new Set([globalRegistry, ...defaultRegistries, ...routeRegistries])
Expand Down
2 changes: 2 additions & 0 deletions src/http/plugins/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export const db = fastifyPlugin(

interface DbSuperUserPluginOptions {
disableHostCheck?: boolean
maxConnections?: number
}

export const dbSuperUser = fastifyPlugin<DbSuperUserPluginOptions>(
Expand All @@ -113,6 +114,7 @@ export const dbSuperUser = fastifyPlugin<DbSuperUserPluginOptions>(
method: request.method,
headers: request.headers,
disableHostCheck: opts.disableHostCheck,
maxConnections: opts.maxConnections,
operation: () => request.operation?.type,
})
})
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/admin/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { default as migrations } from './migrations'
export { default as tenants } from './tenants'
export { default as s3Credentials } from './s3'
export { default as objects } from './objects'
207 changes: 207 additions & 0 deletions src/http/routes/admin/objects.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import { FastifyInstance, RequestGenericInterface } from 'fastify'
import apiKey from '../../plugins/apikey'
import { dbSuperUser, storage } from '../../plugins'
import { ObjectScanner } from '@storage/scanner/scanner'
import { FastifyReply } from 'fastify/types/reply'

const listOrphanedObjects = {
description: 'List Orphaned Objects',
params: {
type: 'object',
properties: {
tenantId: { type: 'string' },
bucketId: { type: 'string' },
},
required: ['tenantId', 'bucketId'],
},
query: {
type: 'object',
properties: {
before: { type: 'string' },
keepTmpTable: { type: 'boolean' },
},
},
} as const

const syncOrphanedObjects = {
description: 'Sync Orphaned Objects',
params: {
type: 'object',
properties: {
tenantId: { type: 'string' },
bucketId: { type: 'string' },
},
required: ['tenantId', 'bucketId'],
},
body: {
type: 'object',
properties: {
deleteDbKeys: { type: 'boolean' },
deleteS3Keys: { type: 'boolean' },
tmpTable: { type: 'string' },
},
},
optional: ['deleteDbKeys', 'deleteS3Keys'],
} as const

interface ListOrphanObjectsRequest extends RequestGenericInterface {
Params: {
tenantId: string
bucketId: string
}
Querystring: {
before?: string
keepTmpTable?: boolean
}
}

interface SyncOrphanObjectsRequest extends RequestGenericInterface {
Params: {
tenantId: string
bucketId: string
}
Body: {
deleteDbKeys?: boolean
deleteS3Keys?: boolean
before?: string
tmpTable?: string
keepTmpTable?: boolean
}
}

export default async function routes(fastify: FastifyInstance) {
fastify.register(apiKey)
fastify.register(dbSuperUser, {
disableHostCheck: true,
maxConnections: 5,
})
fastify.register(storage)

fastify.get<ListOrphanObjectsRequest>(
'/:tenantId/buckets/:bucketId/orphan-objects',
{
schema: listOrphanedObjects,
},
async (req, reply) => {
const bucket = req.params.bucketId
let before = req.query.before ? new Date(req.query.before as string) : undefined

if (before && isNaN(before.getTime())) {
return reply.status(400).send({
error: 'Invalid date format',
})
}
if (!before) {
before = new Date()
before.setHours(before.getHours() - 1)
}

const scanner = new ObjectScanner(req.storage)
const orphanObjects = scanner.listOrphaned(bucket, {
signal: req.signals.disconnect.signal,
before: before,
keepTmpTable: Boolean(req.query.keepTmpTable),
})

reply.header('Content-Type', 'application/json; charset=utf-8')

// Do not let the connection time out, periodically send
// a ping message to keep the connection alive
const respPing = ping(reply)

try {
for await (const result of orphanObjects) {
if (result.value.length > 0) {
respPing.update()
reply.raw.write(
JSON.stringify({
...result,
event: 'data',
})
)
}
}
} catch (e) {
throw e
} finally {
respPing.clear()
reply.raw.end()
}
}
)

fastify.delete<SyncOrphanObjectsRequest>(
'/:tenantId/buckets/:bucketId/orphan-objects',
{
schema: syncOrphanedObjects,
},
async (req, reply) => {
if (!req.body.deleteDbKeys && !req.body.deleteS3Keys) {
return reply.status(400).send({
error: 'At least one of deleteDbKeys or deleteS3Keys must be set to true',
})
}

const bucket = `${req.params.bucketId}`
let before = req.body.before ? new Date(req.body.before as string) : undefined

if (!before) {
before = new Date()
before.setHours(before.getHours() - 1)
}

const respPing = ping(reply)

try {
const scanner = new ObjectScanner(req.storage)
const result = scanner.deleteOrphans(bucket, {
deleteDbKeys: req.body.deleteDbKeys,
deleteS3Keys: req.body.deleteS3Keys,
signal: req.signals.disconnect.signal,
before,
tmpTable: req.body.tmpTable,
})

for await (const deleted of result) {
respPing.update()
reply.raw.write(
JSON.stringify({
...deleted,
event: 'data',
})
)
}
} catch (e) {
throw e
} finally {
respPing.clear()
reply.raw.end()
}
}
)
}

// Occasionally write a ping message to the response stream
function ping(reply: FastifyReply) {
let lastSend = undefined as Date | undefined
const clearPing = setInterval(() => {
const fiveSecondsEarly = new Date()
fiveSecondsEarly.setSeconds(fiveSecondsEarly.getSeconds() - 5)

if (!lastSend || (lastSend && lastSend < fiveSecondsEarly)) {
lastSend = new Date()
reply.raw.write(
JSON.stringify({
event: 'ping',
})
)
}
}, 1000 * 10)

return {
clear: () => clearInterval(clearPing),
update: () => {
lastSend = new Date()
},
}
}
1 change: 1 addition & 0 deletions src/internal/concurrency/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './mutex'
export * from './async-abort-controller'
export * from './merge-async-itertor'
44 changes: 44 additions & 0 deletions src/internal/concurrency/merge-async-itertor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
type MergedYield<Gens extends Record<string, AsyncGenerator<any>>> = {
[K in keyof Gens]: Gens[K] extends AsyncGenerator<infer V> ? { type: K; value: V } : never
}[keyof Gens]

export async function* mergeAsyncGenerators<Gens extends Record<string, AsyncGenerator<any>>>(
gens: Gens
): AsyncGenerator<MergedYield<Gens>> {
// Convert the input object into an array of [name, generator] tuples
const entries = Object.entries(gens) as [keyof Gens, Gens[keyof Gens]][]

// Initialize an array to keep track of each generator's state
const iterators = entries.map(([name, gen]) => ({
name,
iterator: gen[Symbol.asyncIterator](),
done: false,
}))

// Continue looping as long as at least one generator is not done
while (iterators.some((it) => !it.done)) {
// Prepare an array of promises to fetch the next value from each generator
const nextPromises = iterators.map((it) =>
it.done ? Promise.resolve({ done: true, value: undefined }) : it.iterator.next()
)

// Await all the next() promises concurrently
const results = await Promise.all(nextPromises)

// Iterate through the results and yield values with their corresponding names
for (let i = 0; i < iterators.length; i++) {
const it = iterators[i]
const result = results[i]

if (!it.done && !result.done) {
// Yield an object containing the generator's name and the yielded value
yield { type: it.name, value: result.value } as MergedYield<Gens>
}

if (!it.done && result.done) {
// Mark the generator as done if it has no more values
it.done = true
}
}
}
}
1 change: 1 addition & 0 deletions src/internal/database/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { ERRORS } from '@internal/errors'
interface ConnectionOptions {
host: string
tenantId: string
maxConnections?: number
headers?: Record<string, string | undefined | string[]>
method?: string
path?: string
Expand Down
6 changes: 4 additions & 2 deletions src/internal/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export const searchPath = ['storage', 'public', 'extensions', ...dbSearchPath.sp
export class TenantConnection {
public readonly role: string

constructor(protected readonly pool: Knex, protected readonly options: TenantConnectionOptions) {
constructor(public readonly pool: Knex, protected readonly options: TenantConnectionOptions) {
this.role = options.user.payload.role || 'anon'
}

Expand Down Expand Up @@ -101,7 +101,9 @@ export class TenantConnection {
searchPath: isExternalPool ? undefined : searchPath,
pool: {
min: 0,
max: isExternalPool ? 1 : options.maxConnections || databaseMaxConnections,
max: isExternalPool
? options.maxConnections || 1
: options.maxConnections || databaseMaxConnections,
acquireTimeoutMillis: databaseConnectionTimeout,
idleTimeoutMillis: isExternalPool
? options.idleTimeoutMillis || 100
Expand Down
2 changes: 1 addition & 1 deletion src/internal/monitoring/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const baseLogger = pino({
headers: whitelistHeaders(request.headers),
hostname: request.hostname,
remoteAddress: request.ip,
remotePort: request.socket.remotePort,
remotePort: request.socket?.remotePort,
}
},
},
Expand Down
4 changes: 4 additions & 0 deletions src/internal/queue/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ export class Event<T extends Omit<BasePayload, '$version'>> {
}

static batchSend<T extends Event<any>[]>(messages: T) {
if (!pgQueueEnable) {
return Promise.all(messages.map((message) => message.send()))
}

return Queue.getInstance().insert(
messages.map((message) => {
const sendOptions = (this.getQueueOptions(message.payload) as PgBoss.JobInsert) || {}
Expand Down
25 changes: 25 additions & 0 deletions src/internal/testing/generators/array.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
export async function eachParallel<T>(times: number, fn: (index: number) => Promise<T>) {
const promises = []
for (let i = 0; i < times; i++) {
promises.push(fn(i))
}

return Promise.all(promises)
}

export function pickRandomFromArray<T>(arr: T[]): T {
return arr[Math.floor(Math.random() * arr.length)]
}

export function pickRandomRangeFromArray<T>(arr: T[], range: number): T[] {
if (arr.length <= range) {
return arr
}

const result = new Set<T>()
while (result.size < range) {
result.add(pickRandomFromArray(arr))
}

return Array.from(result)
}
Loading
Loading