Skip to content

Commit

Permalink
fix for upsert_item and upsert_items functions (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
bitner authored Jan 17, 2024
1 parent e44ec16 commit 0aff9ef
Show file tree
Hide file tree
Showing 9 changed files with 4,921 additions and 30 deletions.
361 changes: 361 additions & 0 deletions src/pgstac/migrations/pgstac.0.8.2-unreleased.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,361 @@
SET client_min_messages TO WARNING;
SET SEARCH_PATH to pgstac, public;
RESET ROLE;
DO $$
DECLARE
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_extension WHERE extname='postgis') THEN
CREATE EXTENSION IF NOT EXISTS postgis;
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_extension WHERE extname='btree_gist') THEN
CREATE EXTENSION IF NOT EXISTS btree_gist;
END IF;
END;
$$ LANGUAGE PLPGSQL;

DO $$
BEGIN
CREATE ROLE pgstac_admin;
EXCEPTION WHEN duplicate_object THEN
RAISE NOTICE '%, skipping', SQLERRM USING ERRCODE = SQLSTATE;
END
$$;

DO $$
BEGIN
CREATE ROLE pgstac_read;
EXCEPTION WHEN duplicate_object THEN
RAISE NOTICE '%, skipping', SQLERRM USING ERRCODE = SQLSTATE;
END
$$;

DO $$
BEGIN
CREATE ROLE pgstac_ingest;
EXCEPTION WHEN duplicate_object THEN
RAISE NOTICE '%, skipping', SQLERRM USING ERRCODE = SQLSTATE;
END
$$;


GRANT pgstac_admin TO current_user;

-- Function to make sure pgstac_admin is the owner of items
CREATE OR REPLACE FUNCTION pgstac_admin_owns() RETURNS VOID AS $$
DECLARE
f RECORD;
BEGIN
FOR f IN (
SELECT
concat(
oid::regproc::text,
'(',
coalesce(pg_get_function_identity_arguments(oid),''),
')'
) AS name,
CASE prokind WHEN 'f' THEN 'FUNCTION' WHEN 'p' THEN 'PROCEDURE' WHEN 'a' THEN 'AGGREGATE' END as typ
FROM pg_proc
WHERE
pronamespace=to_regnamespace('pgstac')
AND proowner != to_regrole('pgstac_admin')
AND proname NOT LIKE 'pg_stat%'
)
LOOP
BEGIN
EXECUTE format('ALTER %s %s OWNER TO pgstac_admin;', f.typ, f.name);
EXCEPTION WHEN others THEN
RAISE NOTICE '%, skipping', SQLERRM USING ERRCODE = SQLSTATE;
END;
END LOOP;
FOR f IN (
SELECT
oid::regclass::text as name,
CASE relkind
WHEN 'i' THEN 'INDEX'
WHEN 'I' THEN 'INDEX'
WHEN 'p' THEN 'TABLE'
WHEN 'r' THEN 'TABLE'
WHEN 'v' THEN 'VIEW'
WHEN 'S' THEN 'SEQUENCE'
ELSE NULL
END as typ
FROM pg_class
WHERE relnamespace=to_regnamespace('pgstac') and relowner != to_regrole('pgstac_admin') AND relkind IN ('r','p','v','S') AND relname NOT LIKE 'pg_stat'
)
LOOP
BEGIN
EXECUTE format('ALTER %s %s OWNER TO pgstac_admin;', f.typ, f.name);
EXCEPTION WHEN others THEN
RAISE NOTICE '%, skipping', SQLERRM USING ERRCODE = SQLSTATE;
END;
END LOOP;
RETURN;
END;
$$ LANGUAGE PLPGSQL;
SELECT pgstac_admin_owns();

CREATE SCHEMA IF NOT EXISTS pgstac AUTHORIZATION pgstac_admin;

GRANT ALL ON ALL FUNCTIONS IN SCHEMA pgstac to pgstac_admin;
GRANT ALL ON ALL TABLES IN SCHEMA pgstac to pgstac_admin;
GRANT ALL ON ALL SEQUENCES IN SCHEMA pgstac to pgstac_admin;

ALTER ROLE pgstac_admin SET SEARCH_PATH TO pgstac, public;
ALTER ROLE pgstac_read SET SEARCH_PATH TO pgstac, public;
ALTER ROLE pgstac_ingest SET SEARCH_PATH TO pgstac, public;

GRANT USAGE ON SCHEMA pgstac to pgstac_read;
ALTER DEFAULT PRIVILEGES IN SCHEMA pgstac GRANT SELECT ON TABLES TO pgstac_read;
ALTER DEFAULT PRIVILEGES IN SCHEMA pgstac GRANT USAGE ON TYPES TO pgstac_read;
ALTER DEFAULT PRIVILEGES IN SCHEMA pgstac GRANT ALL ON SEQUENCES TO pgstac_read;

GRANT pgstac_read TO pgstac_ingest;
GRANT ALL ON SCHEMA pgstac TO pgstac_ingest;
ALTER DEFAULT PRIVILEGES IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest;
ALTER DEFAULT PRIVILEGES IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest;

ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT SELECT ON TABLES TO pgstac_read;
ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT USAGE ON TYPES TO pgstac_read;
ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON SEQUENCES TO pgstac_read;
ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest;
ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_admin IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest;

ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT SELECT ON TABLES TO pgstac_read;
ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT USAGE ON TYPES TO pgstac_read;
ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON SEQUENCES TO pgstac_read;
ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON TABLES TO pgstac_ingest;
ALTER DEFAULT PRIVILEGES FOR ROLE pgstac_ingest IN SCHEMA pgstac GRANT ALL ON FUNCTIONS TO pgstac_ingest;

SET SEARCH_PATH TO pgstac, public;
SET ROLE pgstac_admin;

DO $$
BEGIN
DROP FUNCTION IF EXISTS analyze_items;
EXCEPTION WHEN others THEN
RAISE NOTICE '%, skipping', SQLERRM USING ERRCODE = SQLSTATE;
END
$$;
DO $$
BEGIN
DROP FUNCTION IF EXISTS validate_constraints;
EXCEPTION WHEN others THEN
RAISE NOTICE '%, skipping', SQLERRM USING ERRCODE = SQLSTATE;
END
$$;

-- Install these idempotently as migrations do not put them before trying to modify the collections table


CREATE OR REPLACE FUNCTION collection_geom(content jsonb)
RETURNS geometry AS $$
WITH box AS (SELECT content->'extent'->'spatial'->'bbox'->0 as box)
SELECT
st_makeenvelope(
(box->>0)::float,
(box->>1)::float,
(box->>2)::float,
(box->>3)::float,
4326
)
FROM box;
$$ LANGUAGE SQL IMMUTABLE STRICT;

CREATE OR REPLACE FUNCTION collection_datetime(content jsonb)
RETURNS timestamptz AS $$
SELECT
CASE
WHEN
(content->'extent'->'temporal'->'interval'->0->>0) IS NULL
THEN '-infinity'::timestamptz
ELSE
(content->'extent'->'temporal'->'interval'->0->>0)::timestamptz
END
;
$$ LANGUAGE SQL IMMUTABLE STRICT;

CREATE OR REPLACE FUNCTION collection_enddatetime(content jsonb)
RETURNS timestamptz AS $$
SELECT
CASE
WHEN
(content->'extent'->'temporal'->'interval'->0->>1) IS NULL
THEN 'infinity'::timestamptz
ELSE
(content->'extent'->'temporal'->'interval'->0->>1)::timestamptz
END
;
$$ LANGUAGE SQL IMMUTABLE STRICT;
-- BEGIN migra calculated SQL
set check_function_bodies = off;

CREATE OR REPLACE FUNCTION pgstac.items_staging_triggerfunc()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
DECLARE
p record;
_partitions text[];
part text;
ts timestamptz := clock_timestamp();
nrows int;
BEGIN
RAISE NOTICE 'Creating Partitions. %', clock_timestamp() - ts;

FOR part IN WITH t AS (
SELECT
n.content->>'collection' as collection,
stac_daterange(n.content->'properties') as dtr,
partition_trunc
FROM newdata n JOIN collections ON (n.content->>'collection'=collections.id)
), p AS (
SELECT
collection,
COALESCE(date_trunc(partition_trunc::text, lower(dtr)),'-infinity') as d,
tstzrange(min(lower(dtr)),max(lower(dtr)),'[]') as dtrange,
tstzrange(min(upper(dtr)),max(upper(dtr)),'[]') as edtrange
FROM t
GROUP BY 1,2
) SELECT check_partition(collection, dtrange, edtrange) FROM p LOOP
RAISE NOTICE 'Partition %', part;
END LOOP;

RAISE NOTICE 'Creating temp table with data to be added. %', clock_timestamp() - ts;
DROP TABLE IF EXISTS tmpdata;
CREATE TEMP TABLE tmpdata ON COMMIT DROP AS
SELECT
(content_dehydrate(content)).*
FROM newdata;
GET DIAGNOSTICS nrows = ROW_COUNT;
RAISE NOTICE 'Added % rows to tmpdata. %', nrows, clock_timestamp() - ts;

RAISE NOTICE 'Doing the insert. %', clock_timestamp() - ts;
IF TG_TABLE_NAME = 'items_staging' THEN
INSERT INTO items
SELECT * FROM tmpdata;
GET DIAGNOSTICS nrows = ROW_COUNT;
RAISE NOTICE 'Inserted % rows to items. %', nrows, clock_timestamp() - ts;
ELSIF TG_TABLE_NAME = 'items_staging_ignore' THEN
INSERT INTO items
SELECT * FROM tmpdata
ON CONFLICT DO NOTHING;
GET DIAGNOSTICS nrows = ROW_COUNT;
RAISE NOTICE 'Inserted % rows to items. %', nrows, clock_timestamp() - ts;
ELSIF TG_TABLE_NAME = 'items_staging_upsert' THEN
DELETE FROM items i USING tmpdata s
WHERE
i.id = s.id
AND i.collection = s.collection
AND i IS DISTINCT FROM s
;
GET DIAGNOSTICS nrows = ROW_COUNT;
RAISE NOTICE 'Deleted % rows from items. %', nrows, clock_timestamp() - ts;
INSERT INTO items AS t
SELECT * FROM tmpdata
ON CONFLICT DO NOTHING;
GET DIAGNOSTICS nrows = ROW_COUNT;
RAISE NOTICE 'Inserted % rows to items. %', nrows, clock_timestamp() - ts;
END IF;

RAISE NOTICE 'Deleting data from staging table. %', clock_timestamp() - ts;
DELETE FROM items_staging;
RAISE NOTICE 'Done. %', clock_timestamp() - ts;

RETURN NULL;

END;
$function$
;


-- END migra calculated SQL
DO $$
BEGIN
INSERT INTO queryables (name, definition, property_wrapper, property_index_type) VALUES
('id', '{"title": "Item ID","description": "Item identifier","$ref": "https://schemas.stacspec.org/v1.0.0/item-spec/json-schema/item.json#/definitions/core/allOf/2/properties/id"}', null, null);
EXCEPTION WHEN unique_violation THEN
RAISE NOTICE '%', SQLERRM USING ERRCODE = SQLSTATE;
END
$$;

DO $$
BEGIN
INSERT INTO queryables (name, definition, property_wrapper, property_index_type) VALUES
('geometry', '{"title": "Item Geometry","description": "Item Geometry","$ref": "https://geojson.org/schema/Feature.json"}', null, null);
EXCEPTION WHEN unique_violation THEN
RAISE NOTICE '%', SQLERRM USING ERRCODE = SQLSTATE;
END
$$;

DO $$
BEGIN
INSERT INTO queryables (name, definition, property_wrapper, property_index_type) VALUES
('datetime','{"description": "Datetime","type": "string","title": "Acquired","format": "date-time","pattern": "(\\+00:00|Z)$"}', null, null);
EXCEPTION WHEN unique_violation THEN
RAISE NOTICE '%', SQLERRM USING ERRCODE = SQLSTATE;
END
$$;

DELETE FROM queryables a USING queryables b
WHERE a.name = b.name AND a.collection_ids IS NOT DISTINCT FROM b.collection_ids AND a.id > b.id;


INSERT INTO pgstac_settings (name, value) VALUES
('context', 'off'),
('context_estimated_count', '100000'),
('context_estimated_cost', '100000'),
('context_stats_ttl', '1 day'),
('default_filter_lang', 'cql2-json'),
('additional_properties', 'true'),
('use_queue', 'false'),
('queue_timeout', '10 minutes'),
('update_collection_extent', 'false'),
('format_cache', 'false'),
('readonly', 'false')
ON CONFLICT DO NOTHING
;

ALTER FUNCTION to_text COST 5000;
ALTER FUNCTION to_float COST 5000;
ALTER FUNCTION to_int COST 5000;
ALTER FUNCTION to_tstz COST 5000;
ALTER FUNCTION to_text_array COST 5000;

ALTER FUNCTION update_partition_stats SECURITY DEFINER;
ALTER FUNCTION partition_after_triggerfunc SECURITY DEFINER;
ALTER FUNCTION drop_table_constraints SECURITY DEFINER;
ALTER FUNCTION create_table_constraints SECURITY DEFINER;
ALTER FUNCTION check_partition SECURITY DEFINER;
ALTER FUNCTION repartition SECURITY DEFINER;
ALTER FUNCTION where_stats SECURITY DEFINER;
ALTER FUNCTION search_query SECURITY DEFINER;
ALTER FUNCTION format_item SECURITY DEFINER;
ALTER FUNCTION maintain_index SECURITY DEFINER;

GRANT USAGE ON SCHEMA pgstac to pgstac_read;
GRANT ALL ON SCHEMA pgstac to pgstac_ingest;
GRANT ALL ON SCHEMA pgstac to pgstac_admin;

-- pgstac_read role limited to using function apis
GRANT EXECUTE ON FUNCTION search TO pgstac_read;
GRANT EXECUTE ON FUNCTION search_query TO pgstac_read;
GRANT EXECUTE ON FUNCTION item_by_id TO pgstac_read;
GRANT EXECUTE ON FUNCTION get_item TO pgstac_read;
GRANT SELECT ON ALL TABLES IN SCHEMA pgstac TO pgstac_read;


GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgstac to pgstac_ingest;
GRANT ALL ON ALL TABLES IN SCHEMA pgstac to pgstac_ingest;
GRANT USAGE ON ALL SEQUENCES IN SCHEMA pgstac to pgstac_ingest;

REVOKE ALL PRIVILEGES ON PROCEDURE run_queued_queries FROM public;
GRANT ALL ON PROCEDURE run_queued_queries TO pgstac_admin;

REVOKE ALL PRIVILEGES ON FUNCTION run_queued_queries_intransaction FROM public;
GRANT ALL ON FUNCTION run_queued_queries_intransaction TO pgstac_admin;

RESET ROLE;

SET ROLE pgstac_ingest;
SELECT update_partition_stats_q(partition) FROM partitions_view;
SELECT set_version('unreleased');
Loading

0 comments on commit 0aff9ef

Please sign in to comment.