Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: search objects v2 #626

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .docker/docker-compose-infra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:

tenant_db:
image: postgres:15
shm_size: '1gb'
ports:
- '5432:5432'
healthcheck:
Expand Down
4 changes: 2 additions & 2 deletions migrations/tenant/0025-custom-metadata.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
ALTER TABLE storage.objects ADD COLUMN user_metadata jsonb NULL;
ALTER TABLE storage.s3_multipart_uploads ADD COLUMN user_metadata jsonb NULL;
ALTER TABLE storage.objects ADD COLUMN IF NOT EXISTS user_metadata jsonb NULL;
ALTER TABLE storage.s3_multipart_uploads ADD COLUMN IF NOT EXISTS user_metadata jsonb NULL;
207 changes: 207 additions & 0 deletions migrations/tenant/0026-objects-prefixes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
-- Add level column to objects
ALTER TABLE storage.objects ADD COLUMN IF NOT EXISTS level INT NULL;

--- Index Functions
CREATE OR REPLACE FUNCTION "storage"."get_level"("name" text)
RETURNS int
AS $func$
SELECT array_length(string_to_array("name", '/'), 1);
$func$ LANGUAGE SQL IMMUTABLE STRICT;


-- Function to check if object with prefix exists
CREATE OR REPLACE FUNCTION storage.object_exists_with_prefix(
p_bucket_id TEXT,
p_name TEXT
)
RETURNS BOOLEAN
LANGUAGE plpgsql
STABLE
SECURITY INVOKER
AS $$
BEGIN
RETURN EXISTS (
SELECT 1
FROM storage.objects o
WHERE o.bucket_id = p_bucket_id
AND o.name LIKE p_name || '%'
);
END;
$$;

-- Table
CREATE TABLE IF NOT EXISTS "storage"."prefixes" (
"bucket_id" text,
"name" text COLLATE "C" NOT NULL,
"level" int GENERATED ALWAYS AS ("storage"."get_level"("name")) STORED,
"created_at" timestamptz DEFAULT now(),
"updated_at" timestamptz DEFAULT now(),
CONSTRAINT "prefixes_bucketId_fkey" FOREIGN KEY ("bucket_id") REFERENCES "storage"."buckets"("id"),
PRIMARY KEY ("bucket_id", "level", "name")
);

ALTER TABLE storage.prefixes ENABLE ROW LEVEL SECURITY;

DROP POLICY IF EXISTS "prefixes_allow_select_for_owned_objects" ON "storage"."prefixes";
CREATE POLICY "prefixes_allow_select_for_owned_objects" ON "storage"."prefixes"
FOR SELECT
USING (
(storage.object_exists_with_prefix("bucket_id", "name"))
);

-- Functions
CREATE OR REPLACE FUNCTION "storage"."get_prefix"("name" text)
RETURNS text
AS $func$
SELECT
CASE WHEN strpos("name", '/') > 0 THEN
regexp_replace("name", '[\/]{1}[^\/]+\/?$', '')
ELSE
''
END;
$func$ LANGUAGE SQL IMMUTABLE STRICT;

CREATE OR REPLACE FUNCTION "storage"."get_prefixes"("name" text)
RETURNS text[]
AS $func$
DECLARE
parts text[];
prefixes text[];
prefix text;
BEGIN
-- Split the name into parts by '/'
parts := string_to_array("name", '/');
prefixes := '{}';

-- Construct the prefixes, stopping one level below the last part
FOR i IN 1..array_length(parts, 1) - 1 LOOP
prefix := array_to_string(parts[1:i], '/');
prefixes := array_append(prefixes, prefix);
END LOOP;

RETURN prefixes;
END;
$func$ LANGUAGE plpgsql IMMUTABLE STRICT;

CREATE OR REPLACE FUNCTION "storage"."add_prefixes"(
"_bucket_id" TEXT,
"_name" TEXT
)
RETURNS void
SECURITY DEFINER
AS $func$
DECLARE
prefixes text[];
BEGIN
prefixes := "storage"."get_prefixes"("_name");

IF array_length(prefixes, 1) > 0 THEN
INSERT INTO storage.prefixes (name, bucket_id)
SELECT UNNEST(prefixes) as name, "_bucket_id" ON CONFLICT DO NOTHING;
END IF;
END;
$func$ LANGUAGE plpgsql VOLATILE;

CREATE OR REPLACE FUNCTION "storage"."delete_prefix" (
"_bucket_id" TEXT,
"_name" TEXT
) RETURNS boolean
SECURITY DEFINER
AS $func$
BEGIN
-- Check if we can delete the prefix
IF EXISTS(
SELECT FROM "storage"."prefixes"
WHERE "prefixes"."bucket_id" = "_bucket_id"
AND level = "storage"."get_level"("_name") + 1
AND "prefixes"."name" COLLATE "C" LIKE "_name" || '/%'
LIMIT 1
)
OR EXISTS(
SELECT FROM "storage"."objects"
WHERE "objects"."bucket_id" = "_bucket_id"
AND "storage"."get_level"("objects"."name") = "storage"."get_level"("_name") + 1
AND "objects"."name" COLLATE "C" LIKE "_name" || '/%'
LIMIT 1
) THEN
-- There are sub-objects, skip deletion
RETURN false;
ELSE
DELETE FROM "storage"."prefixes"
WHERE "prefixes"."bucket_id" = "_bucket_id"
AND level = "storage"."get_level"("_name")
AND "prefixes"."name" = "_name";
RETURN true;
END IF;
END;
$func$ LANGUAGE plpgsql VOLATILE;

-- Triggers
CREATE OR REPLACE FUNCTION "storage"."prefixes_insert_trigger"()
RETURNS trigger
AS $func$
BEGIN
PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name");
RETURN NEW;
END;
$func$ LANGUAGE plpgsql VOLATILE;

CREATE OR REPLACE FUNCTION "storage"."objects_insert_prefix_trigger"()
RETURNS trigger
AS $func$
BEGIN
PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name");
NEW.level := "storage"."get_level"(NEW."name");

RETURN NEW;
END;
$func$ LANGUAGE plpgsql VOLATILE;

CREATE OR REPLACE FUNCTION "storage"."delete_prefix_hierarchy_trigger"()
RETURNS trigger
AS $func$
DECLARE
prefix text;
BEGIN
prefix := "storage"."get_prefix"(OLD."name");

IF coalesce(prefix, '') != '' THEN
PERFORM "storage"."delete_prefix"(OLD."bucket_id", prefix);
END IF;

RETURN OLD;
END;
$func$ LANGUAGE plpgsql VOLATILE;

-- "storage"."prefixes"
CREATE OR REPLACE TRIGGER "prefixes_delete_hierarchy"
AFTER DELETE ON "storage"."prefixes"
FOR EACH ROW
EXECUTE FUNCTION "storage"."delete_prefix_hierarchy_trigger"();

-- "storage"."objects"
CREATE OR REPLACE TRIGGER "objects_insert_create_prefix"
BEFORE INSERT ON "storage"."objects"
FOR EACH ROW
EXECUTE FUNCTION "storage"."objects_insert_prefix_trigger"();

CREATE OR REPLACE TRIGGER "objects_update_create_prefix"
BEFORE UPDATE ON "storage"."objects"
FOR EACH ROW
WHEN (NEW.name != OLD.name)
EXECUTE FUNCTION "storage"."objects_insert_prefix_trigger"();

CREATE OR REPLACE TRIGGER "objects_delete_delete_prefix"
AFTER DELETE ON "storage"."objects"
FOR EACH ROW
EXECUTE FUNCTION "storage"."delete_prefix_hierarchy_trigger"();

-- Permissions
DO $$
DECLARE
anon_role text = COALESCE(current_setting('storage.anon_role', true), 'anon');
authenticated_role text = COALESCE(current_setting('storage.authenticated_role', true), 'authenticated');
service_role text = COALESCE(current_setting('storage.service_role', true), 'service_role');
BEGIN
EXECUTE 'GRANT ALL ON TABLE storage.prefixes TO ' || service_role || ',' || authenticated_role || ', ' || anon_role;
END$$;
57 changes: 57 additions & 0 deletions migrations/tenant/0027-search-v2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@

CREATE OR REPLACE FUNCTION storage.search_v2 (
prefix text,
bucket_name text,
limits int DEFAULT 100,
levels int default 1,
start_after text DEFAULT ''
) RETURNS TABLE (
key text,
name text,
id uuid,
updated_at timestamptz,
created_at timestamptz,
metadata jsonb
)
SECURITY INVOKER
AS $func$
BEGIN
RETURN query EXECUTE
$sql$
SELECT * FROM (
(
SELECT
split_part(name, '/', $4) AS key,
name || '/' AS name,
NULL::uuid AS id,
NULL::timestamptz AS updated_at,
NULL::timestamptz AS created_at,
NULL::jsonb AS metadata
FROM storage.prefixes
WHERE name COLLATE "C" LIKE $1 || '%'
AND bucket_id = $2
AND level = $4
AND name COLLATE "C" > $5
ORDER BY name COLLATE "C" LIMIT $3
)
UNION ALL
(SELECT split_part(name, '/', $4) AS key,
name,
id,
updated_at,
created_at,
metadata
FROM storage.objects
WHERE name COLLATE "C" LIKE $1 || '%'
AND bucket_id = $2
AND level = $4
AND name COLLATE "C" > $5
ORDER BY name COLLATE "C" LIMIT $3)
) obj
ORDER BY name COLLATE "C" LIMIT $3;
$sql$
USING prefix, bucket_name, limits, levels, start_after;
END;
$func$ LANGUAGE plpgsql STABLE;


2 changes: 2 additions & 0 deletions migrations/tenant/0028-object-bucket-name-sorting.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- postgres-migrations disable-transaction
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx_name_bucket_unique on storage.objects (name COLLATE "C", bucket_id);
54 changes: 54 additions & 0 deletions migrations/tenant/0029-create-prefixes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
-- postgres-migrations disable-transaction
-- Backfill prefixes table records
-- We run this with 50k batch size to avoid long running transaction
DO $$
DECLARE
batch_size INTEGER := 50000;
total_scanned INTEGER := 0;
row_returned INTEGER := 0;
last_name TEXT COLLATE "C" := NULL;
last_bucket_id TEXT COLLATE "C" := NULL;
BEGIN
LOOP
-- Fetch a batch of objects ordered by name COLLATE "C"
WITH batch as (
SELECT id, bucket_id, name, owner
FROM storage.objects
WHERE (last_name IS NULL OR ((name COLLATE "C", bucket_id) > (last_name, last_bucket_id)))
ORDER BY name COLLATE "C", bucket_id
LIMIT batch_size
),
batch_count as (
SELECT COUNT(*) as count FROM batch
),
cursor as (
SELECT name as last_name, bucket_id as last_bucket FROM batch b
ORDER BY name COLLATE "C" DESC, bucket_id DESC LIMIT 1
),
all_prefixes as (
SELECT UNNEST(storage.get_prefixes(name)) as prefix, bucket_id
FROM batch
),
insert_prefixes as (
INSERT INTO storage.prefixes (bucket_id, name)
SELECT bucket_id, prefix FROM all_prefixes
WHERE coalesce(prefix, '') != ''
ON CONFLICT DO NOTHING
)
SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id;

RAISE NOTICE 'Object Row returned: %', row_returned;
RAISE NOTICE 'Last Object: %', last_name;

total_scanned := total_scanned + row_returned;

IF row_returned IS NULL OR row_returned < batch_size THEN
RAISE NOTICE 'Total Object scanned: %', coalesce(total_scanned, 0);
COMMIT;
EXIT;
ELSE
COMMIT;
END IF;
END LOOP;
END;
$$;
50 changes: 50 additions & 0 deletions migrations/tenant/0030-update-object-levels.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
-- postgres-migrations disable-transaction
-- Backfill prefixes table records
-- We run this with 10k batch size to avoid long running transaction
DO $$
DECLARE
batch_size INTEGER := 10000;
total_scanned INTEGER := 0;
row_returned INTEGER := 0;
last_name TEXT COLLATE "C" := NULL;
last_bucket_id TEXT COLLATE "C" := NULL;
BEGIN
LOOP
-- Fetch a batch of objects ordered by name COLLATE "C"
WITH batch as (
SELECT id, bucket_id, name, storage.get_level(name) as level
FROM storage.objects
WHERE level IS NULL AND (last_name IS NULL OR (name COLLATE "C", bucket_id) > (last_name, last_bucket_id))
ORDER BY name COLLATE "C", bucket_id
LIMIT batch_size
),
batch_count as (
SELECT COUNT(*) as count FROM batch
),
cursor as (
SELECT name as last_name, bucket_id as last_bucket FROM batch b
ORDER BY name COLLATE "C" DESC, bucket_id DESC LIMIT 1
),
update_level as (
UPDATE storage.objects o
SET level = b.level
FROM batch b
WHERE o.id = b.id
)
SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id;

RAISE NOTICE 'Object Row returned: %', row_returned;
RAISE NOTICE 'Last Object: %', last_name;

total_scanned := total_scanned + row_returned;

IF row_returned IS NULL OR row_returned < batch_size THEN
RAISE NOTICE 'Total Object scanned: %', coalesce(total_scanned, 0);
COMMIT;
EXIT;
ELSE
COMMIT;
END IF;
END LOOP;
END;
$$;
3 changes: 3 additions & 0 deletions migrations/tenant/0031-objects-level-index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- postgres-migrations disable-transaction
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "objects_bucket_id_level_idx"
ON "storage"."objects" ("bucket_id", level, "name" COLLATE "C");
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- postgres-migrations disable-transaction
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_objects_lower_name ON storage.objects ((path_tokens[level]), lower(name) text_pattern_ops, bucket_id, level);
Loading
Loading