Skip to content

Commit

Permalink
feat: search objects v2
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Feb 5, 2025
1 parent ddc5163 commit ca62e66
Show file tree
Hide file tree
Showing 34 changed files with 1,160 additions and 244 deletions.
1 change: 1 addition & 0 deletions .docker/docker-compose-infra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:

tenant_db:
image: postgres:15
shm_size: '1gb'
ports:
- '5432:5432'
healthcheck:
Expand Down
4 changes: 2 additions & 2 deletions migrations/tenant/0025-custom-metadata.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
ALTER TABLE storage.objects ADD COLUMN user_metadata jsonb NULL;
ALTER TABLE storage.s3_multipart_uploads ADD COLUMN user_metadata jsonb NULL;
ALTER TABLE storage.objects ADD COLUMN IF NOT EXISTS user_metadata jsonb NULL;
ALTER TABLE storage.s3_multipart_uploads ADD COLUMN IF NOT EXISTS user_metadata jsonb NULL;
207 changes: 207 additions & 0 deletions migrations/tenant/0026-objects-prefixes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
-- Add level column to objects
ALTER TABLE storage.objects ADD COLUMN IF NOT EXISTS level INT NULL;

--- Index Functions
CREATE OR REPLACE FUNCTION "storage"."get_level"("name" text)
RETURNS int
AS $func$
SELECT array_length(string_to_array("name", '/'), 1);
$func$ LANGUAGE SQL IMMUTABLE STRICT;


-- Function to check if object with prefix exists
CREATE OR REPLACE FUNCTION storage.object_exists_with_prefix(
p_bucket_id TEXT,
p_name TEXT
)
RETURNS BOOLEAN
LANGUAGE plpgsql
STABLE
SECURITY INVOKER
AS $$
BEGIN
RETURN EXISTS (
SELECT 1
FROM storage.objects o
WHERE o.bucket_id = p_bucket_id
AND o.name LIKE p_name || '%'
);
END;
$$;

-- Table
CREATE TABLE IF NOT EXISTS "storage"."prefixes" (
"bucket_id" text,
"name" text COLLATE "C" NOT NULL,
"level" int GENERATED ALWAYS AS ("storage"."get_level"("name")) STORED,
"created_at" timestamptz DEFAULT now(),
"updated_at" timestamptz DEFAULT now(),
CONSTRAINT "prefixes_bucketId_fkey" FOREIGN KEY ("bucket_id") REFERENCES "storage"."buckets"("id"),
PRIMARY KEY ("bucket_id", "level", "name")
);

ALTER TABLE storage.prefixes ENABLE ROW LEVEL SECURITY;

DROP POLICY IF EXISTS "prefixes_allow_select_for_owned_objects" ON "storage"."prefixes";
CREATE POLICY "prefixes_allow_select_for_owned_objects" ON "storage"."prefixes"
FOR SELECT
USING (
(storage.object_exists_with_prefix("bucket_id", "name"))
);

-- Functions
CREATE OR REPLACE FUNCTION "storage"."get_prefix"("name" text)
RETURNS text
AS $func$
SELECT
CASE WHEN strpos("name", '/') > 0 THEN
regexp_replace("name", '[\/]{1}[^\/]+\/?$', '')
ELSE
''
END;
$func$ LANGUAGE SQL IMMUTABLE STRICT;

CREATE OR REPLACE FUNCTION "storage"."get_prefixes"("name" text)
RETURNS text[]
AS $func$
DECLARE
parts text[];
prefixes text[];
prefix text;
BEGIN
-- Split the name into parts by '/'
parts := string_to_array("name", '/');
prefixes := '{}';

-- Construct the prefixes, stopping one level below the last part
FOR i IN 1..array_length(parts, 1) - 1 LOOP
prefix := array_to_string(parts[1:i], '/');
prefixes := array_append(prefixes, prefix);
END LOOP;

RETURN prefixes;
END;
$func$ LANGUAGE plpgsql IMMUTABLE STRICT;

CREATE OR REPLACE FUNCTION "storage"."add_prefixes"(
"_bucket_id" TEXT,
"_name" TEXT
)
RETURNS void
SECURITY DEFINER
AS $func$
DECLARE
prefixes text[];
BEGIN
prefixes := "storage"."get_prefixes"("_name");

IF array_length(prefixes, 1) > 0 THEN
INSERT INTO storage.prefixes (name, bucket_id)
SELECT UNNEST(prefixes) as name, "_bucket_id" ON CONFLICT DO NOTHING;
END IF;
END;
$func$ LANGUAGE plpgsql VOLATILE;

CREATE OR REPLACE FUNCTION "storage"."delete_prefix" (
"_bucket_id" TEXT,
"_name" TEXT
) RETURNS boolean
SECURITY DEFINER
AS $func$
BEGIN
-- Check if we can delete the prefix
IF EXISTS(
SELECT FROM "storage"."prefixes"
WHERE "prefixes"."bucket_id" = "_bucket_id"
AND level = "storage"."get_level"("_name") + 1
AND "prefixes"."name" COLLATE "C" LIKE "_name" || '/%'
LIMIT 1
)
OR EXISTS(
SELECT FROM "storage"."objects"
WHERE "objects"."bucket_id" = "_bucket_id"
AND "storage"."get_level"("objects"."name") = "storage"."get_level"("_name") + 1
AND "objects"."name" COLLATE "C" LIKE "_name" || '/%'
LIMIT 1
) THEN
-- There are sub-objects, skip deletion
RETURN false;
ELSE
DELETE FROM "storage"."prefixes"
WHERE "prefixes"."bucket_id" = "_bucket_id"
AND level = "storage"."get_level"("_name")
AND "prefixes"."name" = "_name";
RETURN true;
END IF;
END;
$func$ LANGUAGE plpgsql VOLATILE;

-- Triggers
CREATE OR REPLACE FUNCTION "storage"."prefixes_insert_trigger"()
RETURNS trigger
AS $func$
BEGIN
PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name");
RETURN NEW;
END;
$func$ LANGUAGE plpgsql VOLATILE;

CREATE OR REPLACE FUNCTION "storage"."objects_insert_prefix_trigger"()
RETURNS trigger
AS $func$
BEGIN
PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name");
NEW.level := "storage"."get_level"(NEW."name");

RETURN NEW;
END;
$func$ LANGUAGE plpgsql VOLATILE;

CREATE OR REPLACE FUNCTION "storage"."delete_prefix_hierarchy_trigger"()
RETURNS trigger
AS $func$
DECLARE
prefix text;
BEGIN
prefix := "storage"."get_prefix"(OLD."name");

IF coalesce(prefix, '') != '' THEN
PERFORM "storage"."delete_prefix"(OLD."bucket_id", prefix);
END IF;

RETURN OLD;
END;
$func$ LANGUAGE plpgsql VOLATILE;

-- "storage"."prefixes"
CREATE OR REPLACE TRIGGER "prefixes_delete_hierarchy"
AFTER DELETE ON "storage"."prefixes"
FOR EACH ROW
EXECUTE FUNCTION "storage"."delete_prefix_hierarchy_trigger"();

-- "storage"."objects"
CREATE OR REPLACE TRIGGER "objects_insert_create_prefix"
BEFORE INSERT ON "storage"."objects"
FOR EACH ROW
EXECUTE FUNCTION "storage"."objects_insert_prefix_trigger"();

CREATE OR REPLACE TRIGGER "objects_update_create_prefix"
BEFORE UPDATE ON "storage"."objects"
FOR EACH ROW
WHEN (NEW.name != OLD.name)
EXECUTE FUNCTION "storage"."objects_insert_prefix_trigger"();

CREATE OR REPLACE TRIGGER "objects_delete_delete_prefix"
AFTER DELETE ON "storage"."objects"
FOR EACH ROW
EXECUTE FUNCTION "storage"."delete_prefix_hierarchy_trigger"();

-- Permissions
DO $$
DECLARE
anon_role text = COALESCE(current_setting('storage.anon_role', true), 'anon');
authenticated_role text = COALESCE(current_setting('storage.authenticated_role', true), 'authenticated');
service_role text = COALESCE(current_setting('storage.service_role', true), 'service_role');
BEGIN
EXECUTE 'GRANT ALL ON TABLE storage.prefixes TO ' || service_role || ',' || authenticated_role || ', ' || anon_role;
END$$;
57 changes: 57 additions & 0 deletions migrations/tenant/0027-search-v2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@

CREATE OR REPLACE FUNCTION storage.search_v2 (
prefix text,
bucket_name text,
limits int DEFAULT 100,
levels int default 1,
start_after text DEFAULT ''
) RETURNS TABLE (
key text,
name text,
id uuid,
updated_at timestamptz,
created_at timestamptz,
metadata jsonb
)
SECURITY INVOKER
AS $func$
BEGIN
RETURN query EXECUTE
$sql$
SELECT * FROM (
(
SELECT
split_part(name, '/', $4) AS key,
name || '/' AS name,
NULL::uuid AS id,
NULL::timestamptz AS updated_at,
NULL::timestamptz AS created_at,
NULL::jsonb AS metadata
FROM storage.prefixes
WHERE name COLLATE "C" LIKE $1 || '%'
AND bucket_id = $2
AND level = $4
AND name COLLATE "C" > $5
ORDER BY name COLLATE "C" LIMIT $3
)
UNION ALL
(SELECT split_part(name, '/', $4) AS key,
name,
id,
updated_at,
created_at,
metadata
FROM storage.objects
WHERE name COLLATE "C" LIKE $1 || '%'
AND bucket_id = $2
AND level = $4
AND name COLLATE "C" > $5
ORDER BY name COLLATE "C" LIMIT $3)
) obj
ORDER BY name COLLATE "C" LIMIT $3;
$sql$
USING prefix, bucket_name, limits, levels, start_after;
END;
$func$ LANGUAGE plpgsql STABLE;


2 changes: 2 additions & 0 deletions migrations/tenant/0028-object-bucket-name-sorting.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- postgres-migrations disable-transaction
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx_name_bucket_unique on storage.objects (name COLLATE "C", bucket_id);
54 changes: 54 additions & 0 deletions migrations/tenant/0029-create-prefixes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
-- postgres-migrations disable-transaction
-- Backfill prefixes table records
-- We run this with 50k batch size to avoid long running transaction
DO $$
DECLARE
batch_size INTEGER := 50000;
total_scanned INTEGER := 0;
row_returned INTEGER := 0;
last_name TEXT COLLATE "C" := NULL;
last_bucket_id TEXT COLLATE "C" := NULL;
BEGIN
LOOP
-- Fetch a batch of objects ordered by name COLLATE "C"
WITH batch as (
SELECT id, bucket_id, name, owner
FROM storage.objects
WHERE (last_name IS NULL OR ((name COLLATE "C", bucket_id) > (last_name, last_bucket_id)))
ORDER BY name COLLATE "C", bucket_id
LIMIT batch_size
),
batch_count as (
SELECT COUNT(*) as count FROM batch
),
cursor as (
SELECT name as last_name, bucket_id as last_bucket FROM batch b
ORDER BY name COLLATE "C" DESC, bucket_id DESC LIMIT 1
),
all_prefixes as (
SELECT UNNEST(storage.get_prefixes(name)) as prefix, bucket_id
FROM batch
),
insert_prefixes as (
INSERT INTO storage.prefixes (bucket_id, name)
SELECT bucket_id, prefix FROM all_prefixes
WHERE coalesce(prefix, '') != ''
ON CONFLICT DO NOTHING
)
SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id;

RAISE NOTICE 'Object Row returned: %', row_returned;
RAISE NOTICE 'Last Object: %', last_name;

total_scanned := total_scanned + row_returned;

IF row_returned IS NULL OR row_returned < batch_size THEN
RAISE NOTICE 'Total Object scanned: %', coalesce(total_scanned, 0);
COMMIT;
EXIT;
ELSE
COMMIT;
END IF;
END LOOP;
END;
$$;
50 changes: 50 additions & 0 deletions migrations/tenant/0030-update-object-levels.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
-- postgres-migrations disable-transaction
-- Backfill prefixes table records
-- We run this with 10k batch size to avoid long running transaction
DO $$
DECLARE
batch_size INTEGER := 10000;
total_scanned INTEGER := 0;
row_returned INTEGER := 0;
last_name TEXT COLLATE "C" := NULL;
last_bucket_id TEXT COLLATE "C" := NULL;
BEGIN
LOOP
-- Fetch a batch of objects ordered by name COLLATE "C"
WITH batch as (
SELECT id, bucket_id, name, storage.get_level(name) as level
FROM storage.objects
WHERE level IS NULL AND (last_name IS NULL OR (name COLLATE "C", bucket_id) > (last_name, last_bucket_id))
ORDER BY name COLLATE "C", bucket_id
LIMIT batch_size
),
batch_count as (
SELECT COUNT(*) as count FROM batch
),
cursor as (
SELECT name as last_name, bucket_id as last_bucket FROM batch b
ORDER BY name COLLATE "C" DESC, bucket_id DESC LIMIT 1
),
update_level as (
UPDATE storage.objects o
SET level = b.level
FROM batch b
WHERE o.id = b.id
)
SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id;

RAISE NOTICE 'Object Row returned: %', row_returned;
RAISE NOTICE 'Last Object: %', last_name;

total_scanned := total_scanned + row_returned;

IF row_returned IS NULL OR row_returned < batch_size THEN
RAISE NOTICE 'Total Object scanned: %', coalesce(total_scanned, 0);
COMMIT;
EXIT;
ELSE
COMMIT;
END IF;
END LOOP;
END;
$$;
3 changes: 3 additions & 0 deletions migrations/tenant/0031-objects-level-index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- postgres-migrations disable-transaction
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "objects_bucket_id_level_idx"
ON "storage"."objects" ("bucket_id", level, "name" COLLATE "C");
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- postgres-migrations disable-transaction
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_objects_lower_name ON storage.objects ((path_tokens[level]), lower(name) text_pattern_ops, bucket_id, level);
Loading

0 comments on commit ca62e66

Please sign in to comment.