From ca62e66dbec748d1a3230eb2d93026bd2d700c02 Mon Sep 17 00:00:00 2001 From: fenos Date: Fri, 24 Jan 2025 15:39:03 +0100 Subject: [PATCH] feat: search objects v2 --- .docker/docker-compose-infra.yml | 1 + migrations/tenant/0025-custom-metadata.sql | 4 +- migrations/tenant/0026-objects-prefixes.sql | 207 ++++++++++++ migrations/tenant/0027-search-v2.sql | 57 ++++ .../0028-object-bucket-name-sorting.sql | 2 + migrations/tenant/0029-create-prefixes.sql | 54 ++++ .../tenant/0030-update-object-levels.sql | 50 +++ .../tenant/0031-objects-level-index.sql | 3 + ...2-backward-compatible-index-on-objects.sql | 2 + ...-backward-compatible-index-on-prefixes.sql | 2 + .../0034-optimize-search-function-v1.sql | 77 +++++ .../0035-add-insert-trigger-prefixes.sql | 8 + package.json | 1 + src/http/plugins/db.ts | 57 +--- src/http/routes/admin/migrations.ts | 43 ++- src/http/routes/admin/s3.ts | 6 +- src/http/routes/admin/tenants.ts | 56 +++- src/http/routes/object/index.ts | 2 + src/http/routes/object/listObjectsV2.ts | 72 +++++ src/http/routes/operations.ts | 1 + src/internal/database/migrations/migrate.ts | 295 ++++++++++++++---- .../database/migrations/progressive.ts | 14 +- src/internal/database/migrations/types.ts | 62 ++-- src/internal/monitoring/logflare.ts | 3 +- src/internal/queue/event.ts | 12 +- src/scripts/migrations-types.ts | 14 +- src/storage/database/knex.ts | 29 +- src/storage/events/index.ts | 1 + src/storage/events/reset-migrations.ts | 69 ++++ src/storage/events/run-migrations.ts | 13 +- src/storage/events/workers.ts | 9 +- src/storage/object.ts | 83 ++++- src/storage/protocols/s3/s3-handler.ts | 91 ++---- src/test/tenant.test.ts | 4 +- 34 files changed, 1160 insertions(+), 244 deletions(-) create mode 100644 migrations/tenant/0026-objects-prefixes.sql create mode 100644 migrations/tenant/0027-search-v2.sql create mode 100644 migrations/tenant/0028-object-bucket-name-sorting.sql create mode 100644 migrations/tenant/0029-create-prefixes.sql create mode 100644 migrations/tenant/0030-update-object-levels.sql create mode 100644 migrations/tenant/0031-objects-level-index.sql create mode 100644 migrations/tenant/0032-backward-compatible-index-on-objects.sql create mode 100644 migrations/tenant/0033-backward-compatible-index-on-prefixes.sql create mode 100644 migrations/tenant/0034-optimize-search-function-v1.sql create mode 100644 migrations/tenant/0035-add-insert-trigger-prefixes.sql create mode 100644 src/http/routes/object/listObjectsV2.ts create mode 100644 src/storage/events/reset-migrations.ts diff --git a/.docker/docker-compose-infra.yml b/.docker/docker-compose-infra.yml index 6b5e3853..5af3ba5a 100644 --- a/.docker/docker-compose-infra.yml +++ b/.docker/docker-compose-infra.yml @@ -5,6 +5,7 @@ services: tenant_db: image: postgres:15 + shm_size: '1gb' ports: - '5432:5432' healthcheck: diff --git a/migrations/tenant/0025-custom-metadata.sql b/migrations/tenant/0025-custom-metadata.sql index b18d92f6..a379a5f8 100644 --- a/migrations/tenant/0025-custom-metadata.sql +++ b/migrations/tenant/0025-custom-metadata.sql @@ -1,2 +1,2 @@ -ALTER TABLE storage.objects ADD COLUMN user_metadata jsonb NULL; -ALTER TABLE storage.s3_multipart_uploads ADD COLUMN user_metadata jsonb NULL; \ No newline at end of file +ALTER TABLE storage.objects ADD COLUMN IF NOT EXISTS user_metadata jsonb NULL; +ALTER TABLE storage.s3_multipart_uploads ADD COLUMN IF NOT EXISTS user_metadata jsonb NULL; \ No newline at end of file diff --git a/migrations/tenant/0026-objects-prefixes.sql b/migrations/tenant/0026-objects-prefixes.sql new file mode 100644 index 00000000..2034cb05 --- /dev/null +++ b/migrations/tenant/0026-objects-prefixes.sql @@ -0,0 +1,207 @@ +-- Add level column to objects +ALTER TABLE storage.objects ADD COLUMN IF NOT EXISTS level INT NULL; + +--- Index Functions +CREATE OR REPLACE FUNCTION "storage"."get_level"("name" text) + RETURNS int +AS $func$ +SELECT array_length(string_to_array("name", '/'), 1); +$func$ LANGUAGE SQL IMMUTABLE STRICT; + + +-- Function to check if object with prefix exists +CREATE OR REPLACE FUNCTION storage.object_exists_with_prefix( + p_bucket_id TEXT, + p_name TEXT +) + RETURNS BOOLEAN + LANGUAGE plpgsql + STABLE + SECURITY INVOKER +AS $$ +BEGIN + RETURN EXISTS ( + SELECT 1 + FROM storage.objects o + WHERE o.bucket_id = p_bucket_id + AND o.name LIKE p_name || '%' + ); +END; +$$; + +-- Table +CREATE TABLE IF NOT EXISTS "storage"."prefixes" ( + "bucket_id" text, + "name" text COLLATE "C" NOT NULL, + "level" int GENERATED ALWAYS AS ("storage"."get_level"("name")) STORED, + "created_at" timestamptz DEFAULT now(), + "updated_at" timestamptz DEFAULT now(), + CONSTRAINT "prefixes_bucketId_fkey" FOREIGN KEY ("bucket_id") REFERENCES "storage"."buckets"("id"), + PRIMARY KEY ("bucket_id", "level", "name") +); + +ALTER TABLE storage.prefixes ENABLE ROW LEVEL SECURITY; + +DROP POLICY IF EXISTS "prefixes_allow_select_for_owned_objects" ON "storage"."prefixes"; +CREATE POLICY "prefixes_allow_select_for_owned_objects" ON "storage"."prefixes" + FOR SELECT + USING ( + (storage.object_exists_with_prefix("bucket_id", "name")) + ); + +-- Functions +CREATE OR REPLACE FUNCTION "storage"."get_prefix"("name" text) + RETURNS text +AS $func$ +SELECT + CASE WHEN strpos("name", '/') > 0 THEN + regexp_replace("name", '[\/]{1}[^\/]+\/?$', '') + ELSE + '' + END; +$func$ LANGUAGE SQL IMMUTABLE STRICT; + +CREATE OR REPLACE FUNCTION "storage"."get_prefixes"("name" text) + RETURNS text[] +AS $func$ +DECLARE + parts text[]; + prefixes text[]; + prefix text; +BEGIN + -- Split the name into parts by '/' + parts := string_to_array("name", '/'); + prefixes := '{}'; + + -- Construct the prefixes, stopping one level below the last part + FOR i IN 1..array_length(parts, 1) - 1 LOOP + prefix := array_to_string(parts[1:i], '/'); + prefixes := array_append(prefixes, prefix); + END LOOP; + + RETURN prefixes; +END; +$func$ LANGUAGE plpgsql IMMUTABLE STRICT; + +CREATE OR REPLACE FUNCTION "storage"."add_prefixes"( + "_bucket_id" TEXT, + "_name" TEXT +) +RETURNS void +SECURITY DEFINER +AS $func$ +DECLARE + prefixes text[]; +BEGIN + prefixes := "storage"."get_prefixes"("_name"); + + IF array_length(prefixes, 1) > 0 THEN + INSERT INTO storage.prefixes (name, bucket_id) + SELECT UNNEST(prefixes) as name, "_bucket_id" ON CONFLICT DO NOTHING; + END IF; +END; +$func$ LANGUAGE plpgsql VOLATILE; + +CREATE OR REPLACE FUNCTION "storage"."delete_prefix" ( + "_bucket_id" TEXT, + "_name" TEXT +) RETURNS boolean +SECURITY DEFINER +AS $func$ +BEGIN + -- Check if we can delete the prefix + IF EXISTS( + SELECT FROM "storage"."prefixes" + WHERE "prefixes"."bucket_id" = "_bucket_id" + AND level = "storage"."get_level"("_name") + 1 + AND "prefixes"."name" COLLATE "C" LIKE "_name" || '/%' + LIMIT 1 + ) + OR EXISTS( + SELECT FROM "storage"."objects" + WHERE "objects"."bucket_id" = "_bucket_id" + AND "storage"."get_level"("objects"."name") = "storage"."get_level"("_name") + 1 + AND "objects"."name" COLLATE "C" LIKE "_name" || '/%' + LIMIT 1 + ) THEN + -- There are sub-objects, skip deletion + RETURN false; + ELSE + DELETE FROM "storage"."prefixes" + WHERE "prefixes"."bucket_id" = "_bucket_id" + AND level = "storage"."get_level"("_name") + AND "prefixes"."name" = "_name"; + RETURN true; + END IF; +END; +$func$ LANGUAGE plpgsql VOLATILE; + +-- Triggers +CREATE OR REPLACE FUNCTION "storage"."prefixes_insert_trigger"() + RETURNS trigger +AS $func$ +BEGIN + PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name"); + RETURN NEW; +END; +$func$ LANGUAGE plpgsql VOLATILE; + +CREATE OR REPLACE FUNCTION "storage"."objects_insert_prefix_trigger"() + RETURNS trigger +AS $func$ +BEGIN + PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name"); + NEW.level := "storage"."get_level"(NEW."name"); + + RETURN NEW; +END; +$func$ LANGUAGE plpgsql VOLATILE; + +CREATE OR REPLACE FUNCTION "storage"."delete_prefix_hierarchy_trigger"() + RETURNS trigger +AS $func$ +DECLARE + prefix text; +BEGIN + prefix := "storage"."get_prefix"(OLD."name"); + + IF coalesce(prefix, '') != '' THEN + PERFORM "storage"."delete_prefix"(OLD."bucket_id", prefix); + END IF; + + RETURN OLD; +END; +$func$ LANGUAGE plpgsql VOLATILE; + +-- "storage"."prefixes" +CREATE OR REPLACE TRIGGER "prefixes_delete_hierarchy" + AFTER DELETE ON "storage"."prefixes" + FOR EACH ROW +EXECUTE FUNCTION "storage"."delete_prefix_hierarchy_trigger"(); + +-- "storage"."objects" +CREATE OR REPLACE TRIGGER "objects_insert_create_prefix" + BEFORE INSERT ON "storage"."objects" + FOR EACH ROW +EXECUTE FUNCTION "storage"."objects_insert_prefix_trigger"(); + +CREATE OR REPLACE TRIGGER "objects_update_create_prefix" + BEFORE UPDATE ON "storage"."objects" + FOR EACH ROW + WHEN (NEW.name != OLD.name) +EXECUTE FUNCTION "storage"."objects_insert_prefix_trigger"(); + +CREATE OR REPLACE TRIGGER "objects_delete_delete_prefix" + AFTER DELETE ON "storage"."objects" + FOR EACH ROW +EXECUTE FUNCTION "storage"."delete_prefix_hierarchy_trigger"(); + +-- Permissions +DO $$ + DECLARE + anon_role text = COALESCE(current_setting('storage.anon_role', true), 'anon'); + authenticated_role text = COALESCE(current_setting('storage.authenticated_role', true), 'authenticated'); + service_role text = COALESCE(current_setting('storage.service_role', true), 'service_role'); + BEGIN + EXECUTE 'GRANT ALL ON TABLE storage.prefixes TO ' || service_role || ',' || authenticated_role || ', ' || anon_role; +END$$; diff --git a/migrations/tenant/0027-search-v2.sql b/migrations/tenant/0027-search-v2.sql new file mode 100644 index 00000000..7e9f06bf --- /dev/null +++ b/migrations/tenant/0027-search-v2.sql @@ -0,0 +1,57 @@ + +CREATE OR REPLACE FUNCTION storage.search_v2 ( + prefix text, + bucket_name text, + limits int DEFAULT 100, + levels int default 1, + start_after text DEFAULT '' +) RETURNS TABLE ( + key text, + name text, + id uuid, + updated_at timestamptz, + created_at timestamptz, + metadata jsonb +) +SECURITY INVOKER +AS $func$ +BEGIN + RETURN query EXECUTE + $sql$ + SELECT * FROM ( + ( + SELECT + split_part(name, '/', $4) AS key, + name || '/' AS name, + NULL::uuid AS id, + NULL::timestamptz AS updated_at, + NULL::timestamptz AS created_at, + NULL::jsonb AS metadata + FROM storage.prefixes + WHERE name COLLATE "C" LIKE $1 || '%' + AND bucket_id = $2 + AND level = $4 + AND name COLLATE "C" > $5 + ORDER BY name COLLATE "C" LIMIT $3 + ) + UNION ALL + (SELECT split_part(name, '/', $4) AS key, + name, + id, + updated_at, + created_at, + metadata + FROM storage.objects + WHERE name COLLATE "C" LIKE $1 || '%' + AND bucket_id = $2 + AND level = $4 + AND name COLLATE "C" > $5 + ORDER BY name COLLATE "C" LIMIT $3) + ) obj + ORDER BY name COLLATE "C" LIMIT $3; + $sql$ + USING prefix, bucket_name, limits, levels, start_after; +END; +$func$ LANGUAGE plpgsql STABLE; + + diff --git a/migrations/tenant/0028-object-bucket-name-sorting.sql b/migrations/tenant/0028-object-bucket-name-sorting.sql new file mode 100644 index 00000000..aa32b2b4 --- /dev/null +++ b/migrations/tenant/0028-object-bucket-name-sorting.sql @@ -0,0 +1,2 @@ +-- postgres-migrations disable-transaction +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx_name_bucket_unique on storage.objects (name COLLATE "C", bucket_id); \ No newline at end of file diff --git a/migrations/tenant/0029-create-prefixes.sql b/migrations/tenant/0029-create-prefixes.sql new file mode 100644 index 00000000..fa7b0290 --- /dev/null +++ b/migrations/tenant/0029-create-prefixes.sql @@ -0,0 +1,54 @@ +-- postgres-migrations disable-transaction +-- Backfill prefixes table records +-- We run this with 50k batch size to avoid long running transaction +DO $$ + DECLARE + batch_size INTEGER := 50000; + total_scanned INTEGER := 0; + row_returned INTEGER := 0; + last_name TEXT COLLATE "C" := NULL; + last_bucket_id TEXT COLLATE "C" := NULL; + BEGIN + LOOP + -- Fetch a batch of objects ordered by name COLLATE "C" + WITH batch as ( + SELECT id, bucket_id, name, owner + FROM storage.objects + WHERE (last_name IS NULL OR ((name COLLATE "C", bucket_id) > (last_name, last_bucket_id))) + ORDER BY name COLLATE "C", bucket_id + LIMIT batch_size + ), + batch_count as ( + SELECT COUNT(*) as count FROM batch + ), + cursor as ( + SELECT name as last_name, bucket_id as last_bucket FROM batch b + ORDER BY name COLLATE "C" DESC, bucket_id DESC LIMIT 1 + ), + all_prefixes as ( + SELECT UNNEST(storage.get_prefixes(name)) as prefix, bucket_id + FROM batch + ), + insert_prefixes as ( + INSERT INTO storage.prefixes (bucket_id, name) + SELECT bucket_id, prefix FROM all_prefixes + WHERE coalesce(prefix, '') != '' + ON CONFLICT DO NOTHING + ) + SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id; + + RAISE NOTICE 'Object Row returned: %', row_returned; + RAISE NOTICE 'Last Object: %', last_name; + + total_scanned := total_scanned + row_returned; + + IF row_returned IS NULL OR row_returned < batch_size THEN + RAISE NOTICE 'Total Object scanned: %', coalesce(total_scanned, 0); + COMMIT; + EXIT; + ELSE + COMMIT; + END IF; + END LOOP; +END; +$$; \ No newline at end of file diff --git a/migrations/tenant/0030-update-object-levels.sql b/migrations/tenant/0030-update-object-levels.sql new file mode 100644 index 00000000..fa89c7f4 --- /dev/null +++ b/migrations/tenant/0030-update-object-levels.sql @@ -0,0 +1,50 @@ +-- postgres-migrations disable-transaction +-- Backfill prefixes table records +-- We run this with 10k batch size to avoid long running transaction +DO $$ + DECLARE + batch_size INTEGER := 10000; + total_scanned INTEGER := 0; + row_returned INTEGER := 0; + last_name TEXT COLLATE "C" := NULL; + last_bucket_id TEXT COLLATE "C" := NULL; + BEGIN + LOOP + -- Fetch a batch of objects ordered by name COLLATE "C" + WITH batch as ( + SELECT id, bucket_id, name, storage.get_level(name) as level + FROM storage.objects + WHERE level IS NULL AND (last_name IS NULL OR (name COLLATE "C", bucket_id) > (last_name, last_bucket_id)) + ORDER BY name COLLATE "C", bucket_id + LIMIT batch_size + ), + batch_count as ( + SELECT COUNT(*) as count FROM batch + ), + cursor as ( + SELECT name as last_name, bucket_id as last_bucket FROM batch b + ORDER BY name COLLATE "C" DESC, bucket_id DESC LIMIT 1 + ), + update_level as ( + UPDATE storage.objects o + SET level = b.level + FROM batch b + WHERE o.id = b.id + ) + SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id; + + RAISE NOTICE 'Object Row returned: %', row_returned; + RAISE NOTICE 'Last Object: %', last_name; + + total_scanned := total_scanned + row_returned; + + IF row_returned IS NULL OR row_returned < batch_size THEN + RAISE NOTICE 'Total Object scanned: %', coalesce(total_scanned, 0); + COMMIT; + EXIT; + ELSE + COMMIT; + END IF; + END LOOP; + END; +$$; \ No newline at end of file diff --git a/migrations/tenant/0031-objects-level-index.sql b/migrations/tenant/0031-objects-level-index.sql new file mode 100644 index 00000000..8650e58f --- /dev/null +++ b/migrations/tenant/0031-objects-level-index.sql @@ -0,0 +1,3 @@ +-- postgres-migrations disable-transaction +CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "objects_bucket_id_level_idx" + ON "storage"."objects" ("bucket_id", level, "name" COLLATE "C"); diff --git a/migrations/tenant/0032-backward-compatible-index-on-objects.sql b/migrations/tenant/0032-backward-compatible-index-on-objects.sql new file mode 100644 index 00000000..00989a16 --- /dev/null +++ b/migrations/tenant/0032-backward-compatible-index-on-objects.sql @@ -0,0 +1,2 @@ +-- postgres-migrations disable-transaction +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_objects_lower_name ON storage.objects ((path_tokens[level]), lower(name) text_pattern_ops, bucket_id, level); diff --git a/migrations/tenant/0033-backward-compatible-index-on-prefixes.sql b/migrations/tenant/0033-backward-compatible-index-on-prefixes.sql new file mode 100644 index 00000000..90fa2f65 --- /dev/null +++ b/migrations/tenant/0033-backward-compatible-index-on-prefixes.sql @@ -0,0 +1,2 @@ +-- postgres-migrations disable-transaction +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_prefixes_lower_name ON storage.prefixes ("bucket_id", "level", "name", lower(name) text_pattern_ops); diff --git a/migrations/tenant/0034-optimize-search-function-v1.sql b/migrations/tenant/0034-optimize-search-function-v1.sql new file mode 100644 index 00000000..6b1aac05 --- /dev/null +++ b/migrations/tenant/0034-optimize-search-function-v1.sql @@ -0,0 +1,77 @@ +create or replace function storage.search ( + prefix text, + bucketname text, + limits int default 100, + levels int default 1, + offsets int default 0, + search text default '', + sortcolumn text default 'name', + sortorder text default 'asc' +) returns table ( + name text, + id uuid, + updated_at timestamptz, + created_at timestamptz, + last_accessed_at timestamptz, + metadata jsonb +) +as $$ +declare + v_order_by text; + v_sort_order text; +begin + case + when sortcolumn = 'name' then + v_order_by = 'name'; + when sortcolumn = 'updated_at' then + v_order_by = 'updated_at'; + when sortcolumn = 'created_at' then + v_order_by = 'created_at'; + when sortcolumn = 'last_accessed_at' then + v_order_by = 'last_accessed_at'; + else + v_order_by = 'name'; + end case; + + case + when sortorder = 'asc' then + v_sort_order = 'asc'; + when sortorder = 'desc' then + v_sort_order = 'desc'; + else + v_sort_order = 'asc'; + end case; + + v_order_by = v_order_by || ' ' || v_sort_order; + + return query execute + 'with folders as ( + select name + from storage.prefixes + where lower(prefixes.name) like lower($2 || $3) || ''%'' + and bucket_id = $4 + and level = $1 + order by name ' || v_sort_order || ' + ) + (select name, + null as id, + null as updated_at, + null as created_at, + null as last_accessed_at, + null as metadata from folders) + union all + (select path_tokens[level] as "name", + id, + updated_at, + created_at, + last_accessed_at, + metadata + from storage.objects + where lower(objects.name) like lower($2 || $3) || ''%'' + and bucket_id = $4 + and level = $1 + order by ' || v_order_by || ') + limit $5 + offset $6' using levels, prefix, search, bucketname, limits, offsets; +end; +$$ language plpgsql stable; \ No newline at end of file diff --git a/migrations/tenant/0035-add-insert-trigger-prefixes.sql b/migrations/tenant/0035-add-insert-trigger-prefixes.sql new file mode 100644 index 00000000..5f573eff --- /dev/null +++ b/migrations/tenant/0035-add-insert-trigger-prefixes.sql @@ -0,0 +1,8 @@ + +-- This trigger is used to create the hierarchy of prefixes +-- When writing directly in the prefixes table +CREATE OR REPLACE TRIGGER "prefixes_create_hierarchy" + BEFORE INSERT ON "storage"."prefixes" + FOR EACH ROW + WHEN (pg_trigger_depth() < 1) +EXECUTE FUNCTION "storage"."prefixes_insert_trigger"(); \ No newline at end of file diff --git a/package.json b/package.json index 6a5326c4..1bce9253 100644 --- a/package.json +++ b/package.json @@ -10,6 +10,7 @@ "format": "prettier -c --write src/**", "lint": "prettier -v && prettier -c src/**", "migration:run": "tsx src/scripts/migrate-call.ts", + "migrations:types": "tsx src/scripts/migrations-types.ts", "docs:export": "tsx ./src/scripts/export-docs.ts", "test:dummy-data": "tsx -r dotenv/config ./src/test/db/import-dummy-data.ts", "test": "npm run infra:restart && npm run test:dummy-data && jest --runInBand --forceExit", diff --git a/src/http/plugins/db.ts b/src/http/plugins/db.ts index 9fc3074d..1130fc6e 100644 --- a/src/http/plugins/db.ts +++ b/src/http/plugins/db.ts @@ -3,7 +3,6 @@ import { getConfig, MultitenantMigrationStrategy } from '../../config' import { getServiceKeyUser, getTenantConfig, - TenantMigrationStatus, TenantConnection, getPostgresConnection, } from '@internal/database' @@ -13,8 +12,7 @@ import { createMutexByKey } from '@internal/concurrency' import { areMigrationsUpToDate, DBMigration, - hasMissingSyncMigration, - lastMigrationName, + lastLocalMigrationName, progressiveMigrations, runMigrationsOnTenant, updateTenantMigrationsState, @@ -169,7 +167,7 @@ export const migrations = fastifyPlugin( return } - req.latestMigration = await lastMigrationName() + req.latestMigration = await lastLocalMigrationName() }) if (dbMigrationStrategy === MultitenantMigrationStrategy.ON_REQUEST) { @@ -203,7 +201,6 @@ export const migrations = fastifyPlugin( } if (dbMigrationStrategy === MultitenantMigrationStrategy.PROGRESSIVE) { - const migrationsMutex = createMutexByKey() fastify.addHook('preHandler', async (request) => { if (!isMultitenant) { return @@ -217,55 +214,7 @@ export const migrations = fastifyPlugin( return } - const needsToRunMigrationsNow = await hasMissingSyncMigration(request.tenantId) - - // if the tenant is not marked as stale, add it to the progressive migrations queue - if ( - !needsToRunMigrationsNow && - tenant.migrationStatus !== TenantMigrationStatus.FAILED_STALE - ) { - progressiveMigrations.addTenant(request.tenantId) - return - } - - // if the tenant is marked as stale or there are pending SYNC migrations, - // try running the migrations - await migrationsMutex(request.tenantId, async () => { - if (tenant.syncMigrationsDone || migrationsUpToDate) { - return - } - - await runMigrationsOnTenant(tenant.databaseUrl, request.tenantId, needsToRunMigrationsNow) - .then(async () => { - await updateTenantMigrationsState(request.tenantId) - tenant.syncMigrationsDone = true - }) - .catch((e) => { - logSchema.error( - fastify.log, - `[Migrations] Error running migrations ${request.tenantId} `, - { - type: 'migrations', - error: e, - metadata: JSON.stringify({ - strategy: 'progressive', - }), - } - ) - }) - }).catch((e) => { - logSchema.error( - fastify.log, - `[Migrations] Error running migrations ${request.tenantId} `, - { - type: 'migrations', - error: e, - metadata: JSON.stringify({ - strategy: 'progressive', - }), - } - ) - }) + progressiveMigrations.addTenant(request.tenantId) }) } }, diff --git a/src/http/routes/admin/migrations.ts b/src/http/routes/admin/migrations.ts index ac495ff6..aa106b35 100644 --- a/src/http/routes/admin/migrations.ts +++ b/src/http/routes/admin/migrations.ts @@ -4,7 +4,11 @@ import { multitenantKnex } from '@internal/database' import { RunMigrationsOnTenants } from '@storage/events' import apiKey from '../../plugins/apikey' import { getConfig } from '../../../config' -import { runMigrationsOnAllTenants } from '@internal/database/migrations' +import { + DBMigration, + resetMigrationsOnTenants, + runMigrationsOnAllTenants, +} from '@internal/database/migrations' const { pgQueueEnable } = getConfig() @@ -15,13 +19,40 @@ export default async function routes(fastify: FastifyInstance) { if (!pgQueueEnable) { return reply.status(400).send({ message: 'Queue is not enabled' }) } - const abortController = new AbortController() - req.raw.on('error', () => { - abortController.abort() - }) + await runMigrationsOnAllTenants(req.signals.disconnect.signal) + + return reply.send({ message: 'Migrations scheduled' }) + }) + + fastify.post('/reset/fleet', async (req, reply) => { + if (!pgQueueEnable) { + return reply.status(400).send({ message: 'Queue is not enabled' }) + } + + const { untilMigration, markCompletedTillMigration } = req.body as any + + if ( + typeof untilMigration !== 'string' || + !DBMigration[untilMigration as keyof typeof DBMigration] + ) { + return reply.status(400).send({ message: 'Invalid migration' }) + } - await runMigrationsOnAllTenants(abortController.signal) + if ( + typeof markCompletedTillMigration === 'string' && + !DBMigration[untilMigration as keyof typeof DBMigration] + ) { + return reply.status(400).send({ message: 'Invalid migration' }) + } + + await resetMigrationsOnTenants({ + till: untilMigration as keyof typeof DBMigration, + markCompletedTillMigration: markCompletedTillMigration + ? markCompletedTillMigration + : undefined, + signal: req.signals.disconnect.signal, + }) return reply.send({ message: 'Migrations scheduled' }) }) diff --git a/src/http/routes/admin/s3.ts b/src/http/routes/admin/s3.ts index 6e78f7c8..d7e04213 100644 --- a/src/http/routes/admin/s3.ts +++ b/src/http/routes/admin/s3.ts @@ -1,10 +1,6 @@ import { FastifyInstance, RequestGenericInterface } from 'fastify' import apiKey from '../../plugins/apikey' -import { - createS3Credentials, - deleteS3Credential, - listS3Credentials, -} from '../../../internal/database' +import { createS3Credentials, deleteS3Credential, listS3Credentials } from '@internal/database' import { FromSchema } from 'json-schema-to-ts' const createCredentialsSchema = { diff --git a/src/http/routes/admin/tenants.ts b/src/http/routes/admin/tenants.ts index 1f6ceda6..132715b2 100644 --- a/src/http/routes/admin/tenants.ts +++ b/src/http/routes/admin/tenants.ts @@ -2,11 +2,18 @@ import { FastifyInstance, RequestGenericInterface } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import apiKey from '../../plugins/apikey' import { decrypt, encrypt } from '@internal/auth' -import { deleteTenantConfig, TenantMigrationStatus, multitenantKnex } from '@internal/database' +import { + deleteTenantConfig, + TenantMigrationStatus, + multitenantKnex, + getTenantConfig, +} from '@internal/database' import { dbSuperUser, storage } from '../../plugins' import { - lastMigrationName, + DBMigration, + lastLocalMigrationName, progressiveMigrations, + resetMigration, runMigrationsOnTenant, } from '@internal/database/migrations' @@ -226,7 +233,7 @@ export default async function routes(fastify: FastifyInstance) { await multitenantKnex('tenants') .where('id', tenantId) .update({ - migrations_version: await lastMigrationName(), + migrations_version: await lastLocalMigrationName(), migrations_status: TenantMigrationStatus.COMPLETED, }) } catch (e) { @@ -286,7 +293,7 @@ export default async function routes(fastify: FastifyInstance) { await multitenantKnex('tenants') .where('id', tenantId) .update({ - migrations_version: await lastMigrationName(), + migrations_version: await lastLocalMigrationName(), migrations_status: TenantMigrationStatus.COMPLETED, }) } catch (e) { @@ -363,7 +370,7 @@ export default async function routes(fastify: FastifyInstance) { await multitenantKnex('tenants') .where('id', tenantId) .update({ - migrations_version: await lastMigrationName(), + migrations_version: await lastLocalMigrationName(), migrations_status: TenantMigrationStatus.COMPLETED, }) } catch (e) { @@ -394,7 +401,7 @@ export default async function routes(fastify: FastifyInstance) { } reply.send({ - isLatest: (await lastMigrationName()) === migrationsInfo?.migrations_version, + isLatest: (await lastLocalMigrationName()) === migrationsInfo?.migrations_version, migrationsVersion: migrationsInfo?.migrations_version, migrationsStatus: migrationsInfo?.migrations_status, }) @@ -423,6 +430,7 @@ export default async function routes(fastify: FastifyInstance) { migrated: true, }) } catch (e) { + req.executionError = e as Error reply.status(400).send({ migrated: false, error: JSON.stringify(e), @@ -430,6 +438,42 @@ export default async function routes(fastify: FastifyInstance) { } }) + fastify.post('/:tenantId/migrations/reset', async (req, reply) => { + const { untilMigration, markCompletedTillMigration } = req.body as any + + const { databaseUrl } = await getTenantConfig(req.params.tenantId) + + if ( + typeof untilMigration !== 'string' || + !DBMigration[untilMigration as keyof typeof DBMigration] + ) { + return reply.status(400).send({ message: 'Invalid migration' }) + } + + if ( + typeof markCompletedTillMigration === 'string' && + !DBMigration[untilMigration as keyof typeof DBMigration] + ) { + return reply.status(400).send({ message: 'Invalid migration' }) + } + + try { + await resetMigration({ + tenantId: req.params.tenantId, + databaseUrl, + untilMigration: untilMigration as keyof typeof DBMigration, + markCompletedTillMigration: markCompletedTillMigration + ? (markCompletedTillMigration as keyof typeof DBMigration) + : undefined, + }) + + return reply.send({ message: 'Migrations reset' }) + } catch (e) { + req.executionError = e as Error + return reply.status(400).send({ message: 'Failed to reset migration' }) + } + }) + fastify.register(async (fastify) => { fastify.register(dbSuperUser, { disableHostCheck: true, diff --git a/src/http/routes/object/index.ts b/src/http/routes/object/index.ts index a390974a..8ab6d75b 100644 --- a/src/http/routes/object/index.ts +++ b/src/http/routes/object/index.ts @@ -10,6 +10,7 @@ import getSignedObject from './getSignedObject' import getSignedURL from './getSignedURL' import getSignedURLs from './getSignedURLs' import listObjects from './listObjects' +import listObjectsV2 from './listObjectsV2' import moveObject from './moveObject' import updateObject from './updateObject' import { @@ -33,6 +34,7 @@ export default async function routes(fastify: FastifyInstance) { fastify.register(getSignedURLs) fastify.register(moveObject) fastify.register(updateObject) + fastify.register(listObjectsV2) fastify.register(listObjects) fastify.register(getObjectInfoAuth) fastify.register(copyObject) diff --git a/src/http/routes/object/listObjectsV2.ts b/src/http/routes/object/listObjectsV2.ts new file mode 100644 index 00000000..5daabdf5 --- /dev/null +++ b/src/http/routes/object/listObjectsV2.ts @@ -0,0 +1,72 @@ +import { FastifyInstance } from 'fastify' +import { FromSchema } from 'json-schema-to-ts' +import { AuthenticatedRequest } from '../../types' +import { ROUTE_OPERATIONS } from '../operations' +import { getConfig } from '../../../config' +import { getTenantConfig } from '@internal/database' +import { DBMigration } from '@internal/database/migrations' + +const { isMultitenant } = getConfig() + +const searchRequestParamsSchema = { + type: 'object', + properties: { + bucketName: { type: 'string' }, + }, + required: ['bucketName'], +} as const + +const searchRequestBodySchema = { + type: 'object', + properties: { + prefix: { type: 'string', examples: ['folder/subfolder'] }, + limit: { type: 'integer', minimum: 1, examples: [10] }, + cursor: { type: 'string' }, + with_delimiter: { type: 'boolean' }, + }, +} as const +interface searchRequestInterface extends AuthenticatedRequest { + Body: FromSchema + Params: FromSchema +} +// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types +export default async function routes(fastify: FastifyInstance) { + const summary = 'Search for objects under a prefix' + + fastify.post( + '/list-v2/:bucketName', + { + schema: { + body: searchRequestBodySchema, + params: searchRequestParamsSchema, + summary, + tags: ['object'], + }, + config: { + operation: { type: ROUTE_OPERATIONS.LIST_OBJECTS_V2 }, + }, + }, + async (request, response) => { + if (isMultitenant) { + const { migrationVersion } = await getTenantConfig(request.tenantId) + if (migrationVersion && DBMigration[migrationVersion] < DBMigration['create-prefixes']) { + return response.status(400).send({ + message: 'This feature is not available for your tenant', + }) + } + } + + const { bucketName } = request.params + const { limit, with_delimiter, cursor, prefix } = request.body + + const results = await request.storage.from(bucketName).listObjectsV2({ + prefix, + delimiter: with_delimiter ? '/' : undefined, + maxKeys: limit, + cursor, + }) + + return response.status(200).send(results) + } + ) +} diff --git a/src/http/routes/operations.ts b/src/http/routes/operations.ts index 8fc06d11..b7445a84 100644 --- a/src/http/routes/operations.ts +++ b/src/http/routes/operations.ts @@ -21,6 +21,7 @@ export const ROUTE_OPERATIONS = { SIGN_OBJECT_URL: 'storage.object.sign', SIGN_OBJECT_URLS: 'storage.object.sign_many', LIST_OBJECTS: 'storage.object.list', + LIST_OBJECTS_V2: 'storage.object.list_v2', MOVE_OBJECT: 'storage.object.move', UPDATE_OBJECT: 'storage.object.upload_update', UPLOAD_SIGN_OBJECT: 'storage.object.upload_signed', diff --git a/src/internal/database/migrations/migrate.ts b/src/internal/database/migrations/migrate.ts index 32a188f6..eaaa1c1f 100644 --- a/src/internal/database/migrations/migrate.ts +++ b/src/internal/database/migrations/migrate.ts @@ -10,7 +10,7 @@ import { searchPath } from '../connection' import { getTenantConfig, TenantMigrationStatus } from '../tenant' import { multitenantKnex } from '../multitenant-db' import { ProgressiveMigrations } from './progressive' -import { RunMigrationsOnTenants } from '@storage/events' +import { RunMigrationsOnTenants, ResetMigrationsOnTenant } from '@storage/events' import { ERRORS } from '@internal/errors' import { DBMigration } from './types' @@ -76,7 +76,7 @@ export function startAsyncMigrations(signal: AbortSignal) { } } -export async function lastMigrationName() { +export async function lastLocalMigrationName() { const migrations = await loadMigrationFilesCached('./migrations/tenant') return migrations[migrations.length - 1].name as keyof typeof DBMigration } @@ -92,7 +92,7 @@ export async function* listTenantsToMigrate(signal: AbortSignal) { break } - const migrationVersion = await lastMigrationName() + const migrationVersion = await lastLocalMigrationName() const data = await multitenantKnex .table<{ id: string; cursor_id: number }>('tenants') @@ -122,6 +122,38 @@ export async function* listTenantsToMigrate(signal: AbortSignal) { } } +export async function* listTenantsToResetMigrations( + migration: keyof typeof DBMigration, + signal: AbortSignal +) { + let lastCursor = 0 + + while (true) { + if (signal.aborted) { + break + } + + const afterMigrations = Object.keys(DBMigration).filter((migrationName) => { + return DBMigration[migrationName as keyof typeof DBMigration] > DBMigration[migration] + }) + + const data = await multitenantKnex + .table<{ id: string; cursor_id: number }>('tenants') + .select('id', 'cursor_id') + .where('cursor_id', '>', lastCursor) + .whereIn('migrations_version', afterMigrations) + .orderBy('cursor_id', 'asc') + .limit(200) + + if (data.length === 0) { + break + } + + lastCursor = data[data.length - 1].cursor_id + yield data.map((tenant) => tenant.id) + } +} + /** * Update tenant migration version and status * @param tenantId @@ -129,9 +161,9 @@ export async function* listTenantsToMigrate(signal: AbortSignal) { */ export async function updateTenantMigrationsState( tenantId: string, - options?: { state: TenantMigrationStatus } + options?: { migration?: keyof typeof DBMigration; state: TenantMigrationStatus } ) { - const migrationVersion = await lastMigrationName() + const migrationVersion = options?.migration || (await lastLocalMigrationName()) const state = options?.state || TenantMigrationStatus.COMPLETED return multitenantKnex .table('tenants') @@ -152,7 +184,7 @@ export async function updateTenantMigrationsState( * @param tenantId */ export async function areMigrationsUpToDate(tenantId: string) { - const latestMigrationVersion = await lastMigrationName() + const latestMigrationVersion = await lastLocalMigrationName() const tenant = await getTenantConfig(tenantId) return ( @@ -161,25 +193,60 @@ export async function areMigrationsUpToDate(tenantId: string) { ) } -export async function hasMissingSyncMigration(tenantId: string) { - const { migrationVersion, migrationStatus } = await getTenantConfig(tenantId) - const migrations = await loadMigrationFilesCached('./migrations/tenant') +export async function obtainLockOnMultitenantDB(fn: () => Promise) { + try { + const result = await multitenantKnex.raw(`SELECT pg_try_advisory_lock(?);`, [ + '-8575985245963000605', + ]) + const lockAcquired = result.rows.shift()?.pg_try_advisory_lock || false - if (!migrationStatus) { - return migrations.some((m) => { - return m.contents.includes('---SYNC---') + if (!lockAcquired) { + return + } + + logSchema.info(logger, '[Migrations] Instance acquired the lock', { + type: 'migrations', }) + + return await fn() + } finally { + try { + await multitenantKnex.raw(`SELECT pg_advisory_unlock(?);`, ['-8575985245963000605']) + } catch (e) {} } +} + +export async function resetMigrationsOnTenants(options: { + till: keyof typeof DBMigration + markCompletedTillMigration?: keyof typeof DBMigration + signal: AbortSignal +}) { + await obtainLockOnMultitenantDB(async () => { + logSchema.info(logger, '[Migrations] Listing all tenants', { + type: 'migrations', + }) - const indexLastMigration = migrations.findIndex((m) => m.name === migrationVersion) + const tenants = listTenantsToResetMigrations(options.till, options.signal) - if (indexLastMigration === -1) { - return true - } + for await (const tenantBatch of tenants) { + await ResetMigrationsOnTenant.batchSend( + tenantBatch.map((tenant) => { + return new ResetMigrationsOnTenant({ + tenantId: tenant, + untilMigration: options.till, + markCompletedTillMigration: options.markCompletedTillMigration, + tenant: { + host: '', + ref: tenant, + }, + }) + }) + ) + } - const migrationAfterLast = migrations.slice(indexLastMigration + 1) - return migrationAfterLast.some((m) => { - return m.contents.includes('---SYNC---') + logSchema.info(logger, '[Migrations] reset migrations jobs scheduled', { + type: 'migrations', + }) }) } @@ -191,20 +258,10 @@ export async function runMigrationsOnAllTenants(signal: AbortSignal) { if (!pgQueueEnable) { return } - const result = await multitenantKnex.raw(`SELECT pg_try_advisory_lock(?);`, [ - '-8575985245963000605', - ]) - const lockAcquired = result.rows.shift()?.pg_try_advisory_lock || false - - if (!lockAcquired) { - return - } - - logSchema.info(logger, '[Migrations] Instance acquired the lock', { - type: 'migrations', - }) - - try { + await obtainLockOnMultitenantDB(async () => { + logSchema.info(logger, '[Migrations] Listing all tenants', { + type: 'migrations', + }) const tenants = listTenantsToMigrate(signal) for await (const tenantBatch of tenants) { await RunMigrationsOnTenants.batchSend( @@ -223,11 +280,7 @@ export async function runMigrationsOnAllTenants(signal: AbortSignal) { logSchema.info(logger, '[Migrations] Async migrations jobs completed', { type: 'migrations', }) - } finally { - try { - await multitenantKnex.raw(`SELECT pg_advisory_unlock(?);`, ['-8575985245963000605']) - } catch (e) {} - } + }) } /** @@ -275,29 +328,131 @@ export async function runMigrationsOnTenant( }) } +export async function resetMigration(options: { + tenantId: string + untilMigration: keyof typeof DBMigration + markCompletedTillMigration?: keyof typeof DBMigration + databaseUrl: string | undefined +}): Promise { + const dbConfig: ClientConfig = { + connectionString: options.databaseUrl, + connectionTimeoutMillis: 60_000, + options: `-c search_path=${searchPath}`, + } + + if (databaseSSLRootCert) { + dbConfig.ssl = { ca: databaseSSLRootCert } + } + + const client = await connect(dbConfig) + + try { + const queryWithAdvisory = withAdvisoryLock(false, async (pgClient) => { + await pgClient.query(`BEGIN`) + + try { + await client.query(`SET search_path TO ${searchPath.join(',')}`) + + const migrationsRowsResult = await pgClient.query(`SELECT * from migrations`) + const currentTenantMigrations = migrationsRowsResult.rows as { id: number; name: string }[] + + if (!currentTenantMigrations.length) { + return false + } + + const currentLastMigration = currentTenantMigrations[currentTenantMigrations.length - 1] + const localMigration = DBMigration[options.untilMigration] + + // This tenant migration is already at the desired migration + if (currentLastMigration.id === localMigration) { + return false + } + + // This tenant migration is behind of the desired migration + if (currentLastMigration.id < localMigration) { + return false + } + + // This tenant migration is ahead the desired migration + await pgClient.query(SQL`DELETE FROM migrations WHERE id > ${localMigration}`) + + // latest run migration + let latestRunMigration = options.untilMigration + + // If we want to prevent the migrations to run in the future + // we need to update the tenant migration state + if (options.markCompletedTillMigration) { + const markCompletedTillMigration = DBMigration[options.markCompletedTillMigration] + + const aheadMigrations = Object.keys(DBMigration).filter((migrationName) => { + return ( + DBMigration[migrationName as keyof typeof DBMigration] > + DBMigration[options.untilMigration] && + DBMigration[migrationName as keyof typeof DBMigration] <= markCompletedTillMigration + ) + }) + + if (aheadMigrations.length) { + const localFileMigrations = await loadMigrationFilesCached('./migrations/tenant') + + const query = SQL`INSERT INTO ` + .append('migrations') + .append('(id, name, hash, executed_at) VALUES ') + + aheadMigrations.forEach((migrationName, index) => { + const migration = localFileMigrations.find( + (m) => m.id === DBMigration[migrationName as keyof typeof DBMigration] + ) + + if (!migration) { + throw Error(`Migration ${migrationName} not found`) + } + + query.append(SQL`(${migration.id}, ${migration.name}, ${migration.hash}, NOW())`) + if (index !== aheadMigrations.length - 1) { + query.append(',') + } + }) + + await pgClient.query(query) + + latestRunMigration = options.markCompletedTillMigration + } + } + + await updateTenantMigrationsState(options.tenantId, { + migration: latestRunMigration, + state: TenantMigrationStatus.COMPLETED, + }) + + await pgClient.query(`COMMIT`) + + return true + } catch (e) { + await pgClient.query(`ROLLBACK`) + throw e + } + }) + + return await queryWithAdvisory(client) + } finally { + await client.end() + } +} + /** - * Connect and migrate the database + * Connect to the database * @param options */ -async function connectAndMigrate(options: { - databaseUrl: string | undefined - migrationsDirectory: string +async function connect(options: { + connectionString?: string | undefined ssl?: ClientConfig['ssl'] - shouldCreateStorageSchema?: boolean tenantId?: string - waitForLock?: boolean }) { - const { - shouldCreateStorageSchema, - migrationsDirectory, - ssl, - tenantId, - databaseUrl, - waitForLock, - } = options + const { ssl, tenantId, connectionString } = options const dbConfig: ClientConfig = { - connectionString: databaseUrl, + connectionString: connectionString, connectionTimeoutMillis: 60_000, options: `-c search_path=${searchPath}`, ssl, @@ -311,8 +466,36 @@ async function connectAndMigrate(options: { project: tenantId, }) }) + await client.connect() + return client +} + +/** + * Connect and migrate the database + * @param options + */ +async function connectAndMigrate(options: { + databaseUrl: string | undefined + migrationsDirectory: string + ssl?: ClientConfig['ssl'] + shouldCreateStorageSchema?: boolean + tenantId?: string + waitForLock?: boolean +}) { + const { shouldCreateStorageSchema, migrationsDirectory, ssl, databaseUrl, waitForLock } = options + + const dbConfig: ClientConfig = { + connectionString: databaseUrl, + connectionTimeoutMillis: 60_000, + options: `-c search_path=${searchPath}`, + statement_timeout: 1000 * 60 * 60 * 3, // 3 hours + ssl, + } + + const client = await connect(dbConfig) + try { - await client.connect() + await client.query(`SET statement_timeout TO '3h'`) await migrate({ client }, migrationsDirectory, Boolean(waitForLock), shouldCreateStorageSchema) } finally { await client.end() @@ -492,7 +675,7 @@ function withAdvisoryLock( if (waitForLock) { await new Promise((res) => setTimeout(res, 20 * tries)) } else { - return [] as unknown as Promise + throw ERRORS.LockTimeout() } } @@ -547,6 +730,8 @@ async function refreshMigrationHash( throw e } } + + return invalidHashes } /** diff --git a/src/internal/database/migrations/progressive.ts b/src/internal/database/migrations/progressive.ts index d6b11dfc..89b06f34 100644 --- a/src/internal/database/migrations/progressive.ts +++ b/src/internal/database/migrations/progressive.ts @@ -101,16 +101,20 @@ export class ProgressiveMigrations { const tenantConfig = await getTenantConfig(tenant) const migrationsUpToDate = await areMigrationsUpToDate(tenant) - if ( - migrationsUpToDate || - tenantConfig.syncMigrationsDone || - tenantConfig.migrationStatus === TenantMigrationStatus.FAILED_STALE - ) { + if (migrationsUpToDate || tenantConfig.syncMigrationsDone) { return } + const scheduleAt = new Date() + scheduleAt.setMinutes(scheduleAt.getMinutes() + 5) + const scheduleForLater = + tenantConfig.migrationStatus === TenantMigrationStatus.FAILED_STALE + ? scheduleAt + : undefined + return new RunMigrationsOnTenants({ tenantId: tenant, + scheduleAt: scheduleForLater, tenant: { host: '', ref: tenant, diff --git a/src/internal/database/migrations/types.ts b/src/internal/database/migrations/types.ts index b283fc2d..0378a962 100644 --- a/src/internal/database/migrations/types.ts +++ b/src/internal/database/migrations/types.ts @@ -1,27 +1,37 @@ export const DBMigration = { - initialmigration: 0, - 'search-files-search-function': 1, - 'storage-schema': 2, - 'pathtoken-column': 3, - 'add-migrations-rls': 4, - 'add-size-functions': 5, - 'change-column-name-in-get-size': 6, - 'add-rls-to-buckets': 7, - 'add-public-to-buckets': 8, - 'fix-search-function': 9, - 'add-trigger-to-auto-update-updated_at-column': 10, - 'add-automatic-avif-detection-flag': 11, - 'add-bucket-custom-limits': 12, - 'use-bytes-for-max-size': 13, - 'add-can-insert-object-function': 14, - 'add-version': 15, - 'drop-owner-foreign-key': 16, - add_owner_id_column_deprecate_owner: 17, - 'alter-default-value-objects-id': 18, - 'list-objects-with-delimiter': 19, - 's3-multipart-uploads': 20, - 's3-multipart-uploads-big-ints': 21, - 'optimize-search-function': 22, - 'operation-function': 23, - 'custom-metadata': 24, -} as const + initialmigration: 1, + 'search-files-search-function': 2, + 'storage-schema': 3, + 'pathtoken-column': 4, + 'add-migrations-rls': 5, + 'add-size-functions': 6, + 'change-column-name-in-get-size': 7, + 'add-rls-to-buckets': 8, + 'add-public-to-buckets': 9, + 'fix-search-function': 10, + 'add-trigger-to-auto-update-updated_at-column': 11, + 'add-automatic-avif-detection-flag': 12, + 'add-bucket-custom-limits': 13, + 'use-bytes-for-max-size': 14, + 'add-can-insert-object-function': 15, + 'add-version': 16, + 'drop-owner-foreign-key': 17, + add_owner_id_column_deprecate_owner: 18, + 'alter-default-value-objects-id': 19, + 'list-objects-with-delimiter': 20, + 's3-multipart-uploads': 21, + 's3-multipart-uploads-big-ints': 22, + 'optimize-search-function': 23, + 'operation-function': 24, + 'custom-metadata': 25, + 'objects-prefixes': 26, + 'search-v2': 27, + 'object-bucket-name-sorting': 28, + 'create-prefixes': 29, + 'update-object-levels': 30, + 'objects-level-index': 31, + 'backward-compatible-index-on-objects': 32, + 'backward-compatible-index-on-prefixes': 33, + 'optimize-search-function-v1': 34, + 'add-insert-trigger-prefixes': 35, +} diff --git a/src/internal/monitoring/logflare.ts b/src/internal/monitoring/logflare.ts index 9ca0f331..0029e760 100644 --- a/src/internal/monitoring/logflare.ts +++ b/src/internal/monitoring/logflare.ts @@ -10,6 +10,7 @@ export default function () { const logflareApiKey = process.env.LOGFLARE_API_KEY const logflareSourceToken = process.env.LOGFLARE_SOURCE_TOKEN + const batchSizeEnv = process.env.LOGFLARE_BATCH_SIZE if (!logflareApiKey) { throw new Error('must provide a logflare api key') @@ -22,7 +23,7 @@ export default function () { return createLogFlareWriteStream({ apiKey: logflareApiKey, sourceToken: logflareSourceToken, - size: 100, + size: batchSizeEnv ? parseInt(batchSizeEnv, 10) : 100, onError: (err: Error) => { console.error(`[Logflare][Error] ${err.message} - ${err.stack}`) }, diff --git a/src/internal/queue/event.ts b/src/internal/queue/event.ts index df5cd76c..8c6b59fb 100644 --- a/src/internal/queue/event.ts +++ b/src/internal/queue/event.ts @@ -8,6 +8,7 @@ import { getTenantConfig } from '@internal/database' export interface BasePayload { $version?: string singletonKey?: string + scheduleAt?: Date reqId?: string tenant: { ref: string @@ -98,6 +99,11 @@ export class Event> { if (!message.payload.$version) { ;(message.payload as (typeof message)['payload']).$version = this.version } + + if (message.payload.scheduleAt) { + sendOptions.startAfter = new Date(message.payload.scheduleAt) + } + return { ...sendOptions, name: this.getQueueName(), @@ -164,7 +170,11 @@ export class Event> { } const timer = QueueJobSchedulingTime.startTimer() - const sendOptions = constructor.getQueueOptions(this.payload) + const sendOptions = constructor.getQueueOptions(this.payload) || {} + + if (this.payload.scheduleAt) { + sendOptions.startAfter = new Date(this.payload.scheduleAt) + } try { const res = await Queue.getInstance().send({ diff --git a/src/scripts/migrations-types.ts b/src/scripts/migrations-types.ts index 8ba0fef3..708e36a6 100644 --- a/src/scripts/migrations-types.ts +++ b/src/scripts/migrations-types.ts @@ -7,13 +7,15 @@ function main() { const files = glob.sync(migrationsPath).sort() const migrations = files.map((file, index) => { + const fileName = file + .split(path.sep) + .pop() + ?.replace(/[0-9]+-/, '') + .replace('.sql', '') + return { - file: file - .split(path.sep) - .pop() - ?.replace(/[0-9]+-/, '') - .replace('.sql', ''), - index, + file: fileName, + index: index + 1, } }) diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index a8609a10..93c354a7 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -17,10 +17,13 @@ import { SearchObjectOption, } from './adapter' import { DatabaseError } from 'pg' -import { TenantConnection } from '@internal/database' +import { getTenantConfig, TenantConnection } from '@internal/database' import { DbQueryPerformance } from '@internal/monitoring/metrics' import { isUuid } from '../limits' import { DBMigration } from '@internal/database/migrations' +import { getConfig } from '../../config' + +const { isMultitenant } = getConfig() /** * Database @@ -232,7 +235,7 @@ export class StorageKnexDB implements Database { query.orderBy(knex.raw('name COLLATE "C"')) if (options?.prefix) { - query.where('name', 'ilike', `${options.prefix}%`) + query.where('name', 'like', `${options.prefix}%`) } if (options?.nextToken) { @@ -242,6 +245,28 @@ export class StorageKnexDB implements Database { return query } + let useNewSearchVersion2 = true + + if (isMultitenant) { + const { migrationVersion } = await getTenantConfig(this.tenantId) + if (migrationVersion) { + useNewSearchVersion2 = DBMigration[migrationVersion] >= DBMigration['create-prefixes'] + } + } + + if (useNewSearchVersion2 && options?.delimiter === '/') { + const levels = !options?.prefix ? 1 : options.prefix.split('/').length + const query = await knex.raw('select * from storage.search_v2(?,?,?,?,?)', [ + options?.prefix || '', + bucketId, + options?.maxKeys || 1000, + levels, + options?.startAfter || '', + ]) + + return query.rows + } + const query = await knex.raw( 'select * from storage.list_objects_with_delimiter(?,?,?,?,?,?)', [ diff --git a/src/storage/events/index.ts b/src/storage/events/index.ts index 8fc58aed..3f79ae31 100644 --- a/src/storage/events/index.ts +++ b/src/storage/events/index.ts @@ -6,4 +6,5 @@ export * from './object-removed' export * from './object-admin-delete' export * from './backup-object' export * from './run-migrations' +export * from './reset-migrations' export * from './workers' diff --git a/src/storage/events/reset-migrations.ts b/src/storage/events/reset-migrations.ts new file mode 100644 index 00000000..c3f33e74 --- /dev/null +++ b/src/storage/events/reset-migrations.ts @@ -0,0 +1,69 @@ +import { BaseEvent } from './base-event' +import { getTenantConfig } from '@internal/database' +import { JobWithMetadata, SendOptions, WorkOptions } from 'pg-boss' +import { BasePayload } from '@internal/queue' +import { DBMigration, resetMigration } from '@internal/database/migrations' +import { RunMigrationsOnTenants } from './run-migrations' +import { logger, logSchema } from '@internal/monitoring' + +interface ResetMigrationsPayload extends BasePayload { + tenantId: string + untilMigration: keyof typeof DBMigration + markCompletedTillMigration?: keyof typeof DBMigration +} + +export class ResetMigrationsOnTenant extends BaseEvent { + static queueName = 'tenants-migrations-reset' + + static getWorkerOptions(): WorkOptions { + return { + enforceSingletonQueueActiveLimit: true, + teamSize: 200, + teamConcurrency: 10, + includeMetadata: true, + } + } + + static getQueueOptions(payload: ResetMigrationsPayload): SendOptions { + return { + expireInHours: 2, + singletonKey: payload.tenantId, + useSingletonQueue: true, + retryLimit: 3, + retryDelay: 5, + priority: 10, + } + } + + static async handle(job: JobWithMetadata) { + const tenantId = job.data.tenant.ref + const tenant = await getTenantConfig(tenantId) + + logSchema.info(logger, `[Migrations] resetting migrations for ${tenantId}`, { + type: 'migrations', + project: tenantId, + }) + + const reset = await resetMigration({ + tenantId: tenantId, + markCompletedTillMigration: job.data.markCompletedTillMigration, + untilMigration: job.data.untilMigration, + databaseUrl: tenant.databaseUrl, + }) + + if (reset) { + await RunMigrationsOnTenants.send({ + tenantId: tenantId, + tenant: { + ref: tenantId, + }, + singletonKey: tenantId, + }) + } + + logSchema.info(logger, `[Migrations] reset successful for ${tenantId}`, { + type: 'migrations', + project: tenantId, + }) + } +} diff --git a/src/storage/events/run-migrations.ts b/src/storage/events/run-migrations.ts index 631f2e20..97448a36 100644 --- a/src/storage/events/run-migrations.ts +++ b/src/storage/events/run-migrations.ts @@ -8,6 +8,7 @@ import { runMigrationsOnTenant, updateTenantMigrationsState, } from '@internal/database/migrations' +import { ErrorCode, StorageBackendError } from '@internal/errors' interface RunMigrationsPayload extends BasePayload { tenantId: string @@ -26,7 +27,9 @@ export class RunMigrationsOnTenants extends BaseEvent { static getQueueOptions(payload: RunMigrationsPayload): SendOptions { return { + expireInHours: 2, singletonKey: payload.tenantId, + useSingletonQueue: true, retryLimit: 3, retryDelay: 5, priority: 10, @@ -48,7 +51,7 @@ export class RunMigrationsOnTenants extends BaseEvent { type: 'migrations', project: tenantId, }) - await runMigrationsOnTenant(tenant.databaseUrl, tenantId) + await runMigrationsOnTenant(tenant.databaseUrl, tenantId, false) await updateTenantMigrationsState(tenantId) logSchema.info(logger, `[Migrations] completed for tenant ${tenantId}`, { @@ -56,6 +59,14 @@ export class RunMigrationsOnTenants extends BaseEvent { project: tenantId, }) } catch (e) { + if (e instanceof StorageBackendError && e.code === ErrorCode.LockTimeout) { + logSchema.info(logger, `[Migrations] lock timeout for tenant ${tenantId}`, { + type: 'migrations', + project: tenantId, + }) + return + } + logSchema.error(logger, `[Migrations] failed for tenant ${tenantId}`, { type: 'migrations', error: e, diff --git a/src/storage/events/workers.ts b/src/storage/events/workers.ts index a0e9d927..9e3577f1 100644 --- a/src/storage/events/workers.ts +++ b/src/storage/events/workers.ts @@ -1,9 +1,16 @@ import { Queue } from '@internal/queue' -import { ObjectAdminDelete, Webhook, RunMigrationsOnTenants, BackupObjectEvent } from './index' +import { + ObjectAdminDelete, + Webhook, + RunMigrationsOnTenants, + BackupObjectEvent, + ResetMigrationsOnTenant, +} from './index' export function registerWorkers() { Queue.register(Webhook) Queue.register(ObjectAdminDelete) Queue.register(RunMigrationsOnTenants) Queue.register(BackupObjectEvent) + Queue.register(ResetMigrationsOnTenant) } diff --git a/src/storage/object.ts b/src/storage/object.ts index 2776ae9f..7c4b784d 100644 --- a/src/storage/object.ts +++ b/src/storage/object.ts @@ -17,6 +17,7 @@ import { ObjectUpdatedMetadata, } from './events' import { FastifyRequest } from 'fastify/types/request' +import { Obj } from '@storage/schemas' const { requestUrlLengthLimit, storageS3Bucket } = getConfig() @@ -539,11 +540,75 @@ export class ObjectStorage { async listObjectsV2(options?: { prefix?: string delimiter?: string - nextToken?: string + cursor?: string startAfter?: string maxKeys?: number + encodingType?: 'url' }) { - return this.db.listObjectsV2(this.bucketId, options) + const limit = Math.min(options?.maxKeys || 1000, 1000) + const prefix = options?.prefix || '' + const delimiter = options?.delimiter + + const cursor = options?.cursor ? decodeContinuationToken(options?.cursor) : undefined + const searchResult = await this.db.listObjectsV2(this.bucketId, { + prefix: options?.prefix, + delimiter: options?.delimiter, + maxKeys: limit + 1, + nextToken: cursor, + startAfter: cursor || options?.startAfter, + }) + + let results = searchResult + let prevPrefix = '' + + if (delimiter) { + const delimitedResults: Obj[] = [] + for (const object of searchResult) { + let idx = object.name.replace(prefix, '').indexOf(delimiter) + + if (idx >= 0) { + idx = prefix.length + idx + delimiter.length + const currPrefix = object.name.substring(0, idx) + if (currPrefix === prevPrefix) { + continue + } + prevPrefix = currPrefix + delimitedResults.push({ + id: null, + name: options?.encodingType === 'url' ? encodeURIComponent(currPrefix) : currPrefix, + bucket_id: object.bucket_id, + }) + continue + } + + delimitedResults.push({ + ...object, + name: options?.encodingType === 'url' ? encodeURIComponent(object.name) : object.name, + }) + } + results = delimitedResults + } + + let isTruncated = false + + if (results.length > limit) { + results = results.slice(0, limit) + isTruncated = true + } + + const folders = results.filter((obj) => obj.id === null) + const objects = results.filter((obj) => obj.id !== null) + + const nextContinuationToken = isTruncated + ? encodeContinuationToken(results[results.length - 1].name) + : undefined + + return { + hasNext: isTruncated, + nextCursor: nextContinuationToken, + folders: folders, + objects: objects, + } } /** @@ -695,3 +760,17 @@ export class ObjectStorage { return payload } } + +function encodeContinuationToken(name: string) { + return Buffer.from(`l:${name}`).toString('base64') +} + +function decodeContinuationToken(token: string) { + const decoded = Buffer.from(token, 'base64').toString().split(':') + + if (decoded.length === 0) { + throw new Error('Invalid continuation token') + } + + return decoded[1] +} diff --git a/src/storage/protocols/s3/s3-handler.ts b/src/storage/protocols/s3/s3-handler.ts index baa6a3ed..7e4dba5e 100644 --- a/src/storage/protocols/s3/s3-handler.ts +++ b/src/storage/protocols/s3/s3-handler.ts @@ -207,78 +207,31 @@ export class S3ProtocolHandler { const limit = Math.min(maxKeys || 1000, 1000) - const objects = await this.storage.from(bucket).listObjectsV2({ + const results = await this.storage.from(bucket).listObjectsV2({ prefix, delimiter: delimiter, - maxKeys: limit + 1, - nextToken: continuationToken ? decodeContinuationToken(continuationToken) : undefined, - startAfter, + maxKeys: limit, + cursor: continuationToken, + startAfter: startAfter, + encodingType: command.EncodingType, }) - let results = objects - let prevPrefix = '' - - if (delimiter) { - const delimitedResults: Obj[] = [] - for (const object of objects) { - let idx = object.name.replace(prefix, '').indexOf(delimiter) - - if (idx >= 0) { - idx = prefix.length + idx + delimiter.length - const currPrefix = object.name.substring(0, idx) - if (currPrefix === prevPrefix) { - continue - } - prevPrefix = currPrefix - delimitedResults.push({ - id: null, - name: command.EncodingType === 'url' ? encodeURIComponent(currPrefix) : currPrefix, - bucket_id: bucket, - owner: '', - metadata: null, - created_at: '', - updated_at: '', - version: '', - }) - continue - } - - delimitedResults.push(object) + const commonPrefixes = results.folders.map((object) => { + return { + Prefix: object.name, } - results = delimitedResults - } - - let isTruncated = false - - if (results.length > limit) { - results = results.slice(0, limit) - isTruncated = true - } - - const commonPrefixes = results - .filter((e) => e.id === null) - .map((object) => { - return { - Prefix: object.name, - } - }) + }) const contents = - results - .filter((o) => o.id) - .map((o) => ({ - Key: command.EncodingType === 'url' ? encodeURIComponent(o.name) : o.name, - LastModified: (o.updated_at ? new Date(o.updated_at).toISOString() : undefined) as - | Date - | undefined, - ETag: o.metadata?.eTag as string, - Size: o.metadata?.size as number, - StorageClass: 'STANDARD' as const, - })) || [] - - const nextContinuationToken = isTruncated - ? encodeContinuationToken(results[results.length - 1].name) - : undefined + results.objects.map((o) => ({ + Key: o.name, + LastModified: (o.updated_at ? new Date(o.updated_at).toISOString() : undefined) as + | Date + | undefined, + ETag: o.metadata?.eTag as string, + Size: o.metadata?.size as number, + StorageClass: 'STANDARD' as const, + })) || [] const response: { ListBucketResult: ListObjectsV2Output } = { ListBucketResult: { @@ -286,17 +239,17 @@ export class S3ProtocolHandler { Prefix: prefix, ContinuationToken: continuationToken, Contents: contents, - IsTruncated: isTruncated, + IsTruncated: results.hasNext, MaxKeys: limit, Delimiter: delimiter, EncodingType: encodingType, - KeyCount: results.length, + KeyCount: results.folders.length + results.folders.length, CommonPrefixes: commonPrefixes, }, } - if (nextContinuationToken) { - response.ListBucketResult.NextContinuationToken = nextContinuationToken + if (results.nextCursor) { + response.ListBucketResult.NextContinuationToken = results.nextCursor } return { diff --git a/src/test/tenant.test.ts b/src/test/tenant.test.ts index d53bc0d1..f816e611 100644 --- a/src/test/tenant.test.ts +++ b/src/test/tenant.test.ts @@ -16,7 +16,7 @@ const payload = { serviceKey: 'd', jwks: { keys: [] }, migrationStatus: 'COMPLETED', - migrationVersion: 'custom-metadata', + migrationVersion: 'add-insert-trigger-prefixes', tracingMode: 'basic', features: { imageTransformation: { @@ -40,7 +40,7 @@ const payload2 = { serviceKey: 'h', jwks: null, migrationStatus: 'COMPLETED', - migrationVersion: 'custom-metadata', + migrationVersion: 'add-insert-trigger-prefixes', tracingMode: 'basic', features: { imageTransformation: {