From d2ef3f6d4fb7b586f3d9e299fa20b87d2fa0a657 Mon Sep 17 00:00:00 2001 From: Oliver Rice Date: Thu, 9 Jan 2025 10:14:50 -0600 Subject: [PATCH 1/3] add test highlighting the drop_queue issue. raises exception --- nix/tests/expected/pgmq_logical_backups.out | 102 ++++++++++++++++++++ nix/tests/sql/pgmq_logical_backups.sql | 93 ++++++++++++++++++ 2 files changed, 195 insertions(+) create mode 100644 nix/tests/expected/pgmq_logical_backups.out create mode 100644 nix/tests/sql/pgmq_logical_backups.sql diff --git a/nix/tests/expected/pgmq_logical_backups.out b/nix/tests/expected/pgmq_logical_backups.out new file mode 100644 index 000000000..ef4dfe4b3 --- /dev/null +++ b/nix/tests/expected/pgmq_logical_backups.out @@ -0,0 +1,102 @@ +/* + This test confirms that the pgmq after-create supautils hook + that replaces the drop_queue function is inter-operable with + the infra hook that detaches pgmq queues from the pgmq + extension's ownership prior to taking logical backups +*/ +-- Create a queue +select pgmq.create('lb-test'); + create +-------- + +(1 row) + +-- Add a record +select + * +from + pgmq.send( + queue_name:='lb-test', + msg:='{"foo": "bar1"}' + ); + send +------ + 1 +(1 row) + +/* + COPY/PASTE of the on-pause hook that + - detaches ownership of queues from the extension + - updates identity columns to avoid pg_dump segfault +*/ +do $$ +declare + tbl record; + seq_name text; + new_seq_name text; + archive_table_name text; +begin + -- Loop through each table in the pgmq schema starting with 'q_' + -- Rebuild the pkey column's default to avoid pg_dumpall segfaults + for tbl in + select c.relname as table_name + from pg_catalog.pg_attribute a + join pg_catalog.pg_class c on c.oid = a.attrelid + join pg_catalog.pg_namespace n on n.oid = c.relnamespace + where n.nspname = 'pgmq' + and c.relname like 'q_%' + and a.attname = 'msg_id' + and a.attidentity in ('a', 'd') -- 'a' for ALWAYS, 'd' for BY DEFAULT + loop + -- Check if msg_id is an IDENTITY column for idempotency + -- Define sequence names + seq_name := 'pgmq.' || format ('"%s_msg_id_seq"', tbl.table_name); + new_seq_name := 'pgmq.' || format ('"%s_msg_id_seq2"', tbl.table_name); + archive_table_name := regexp_replace(tbl.table_name, '^q_', 'a_'); + -- Execute dynamic SQL to perform the required operations + execute format(' + create sequence %s; + select setval(''%s'', nextval(''%s'')); + alter table %s."%s" alter column msg_id drop identity; + alter table %s."%s" alter column msg_id set default nextval(''%s''); + alter sequence %s rename to %s; + alter sequence %s owner to postgres;', + -- Parameters for format placeholders + new_seq_name, + new_seq_name, seq_name, + 'pgmq', tbl.table_name, + 'pgmq', tbl.table_name, + new_seq_name, + -- alter seq + new_seq_name, format('"%s_msg_id_seq"', tbl.table_name), + -- set owner + seq_name + ); + end loop; + -- No tables should be owned by the extension. + -- We want them to be included in logical backups + for tbl in + select c.relname as table_name + from pg_class c + join pg_depend d + on c.oid = d.objid + join pg_extension e + on d.refobjid = e.oid + where + c.relkind in ('r', 'p', 'u') + and e.extname = 'pgmq' + and (c.relname like 'q_%' or c.relname like 'a_%') + loop + execute format(' + alter extension pgmq drop table pgmq."%s";', + tbl.table_name + ); + end loop; +end $$; +-- Now confirm that pgmq.drop_queue still works +select pgmq.drop_queue('lb-test'); +ERROR: table pgmq."q_lb-test" is not a member of extension "pgmq" +CONTEXT: SQL statement " + ALTER EXTENSION pgmq DROP TABLE pgmq."q_lb-test" + " +PL/pgSQL function pgmq.drop_queue(text,boolean) line 8 at EXECUTE diff --git a/nix/tests/sql/pgmq_logical_backups.sql b/nix/tests/sql/pgmq_logical_backups.sql new file mode 100644 index 000000000..fc3b13615 --- /dev/null +++ b/nix/tests/sql/pgmq_logical_backups.sql @@ -0,0 +1,93 @@ +/* + This test confirms that the pgmq after-create supautils hook + that replaces the drop_queue function is inter-operable with + the infra hook that detaches pgmq queues from the pgmq + extension's ownership prior to taking logical backups +*/ + +-- Create a queue +select pgmq.create('lb-test'); + +-- Add a record +select + * +from + pgmq.send( + queue_name:='lb-test', + msg:='{"foo": "bar1"}' + ); + + +/* + COPY/PASTE of the on-pause hook that + - detaches ownership of queues from the extension + - updates identity columns to avoid pg_dump segfault +*/ + +do $$ +declare + tbl record; + seq_name text; + new_seq_name text; + archive_table_name text; +begin + -- Loop through each table in the pgmq schema starting with 'q_' + -- Rebuild the pkey column's default to avoid pg_dumpall segfaults + for tbl in + select c.relname as table_name + from pg_catalog.pg_attribute a + join pg_catalog.pg_class c on c.oid = a.attrelid + join pg_catalog.pg_namespace n on n.oid = c.relnamespace + where n.nspname = 'pgmq' + and c.relname like 'q_%' + and a.attname = 'msg_id' + and a.attidentity in ('a', 'd') -- 'a' for ALWAYS, 'd' for BY DEFAULT + loop + -- Check if msg_id is an IDENTITY column for idempotency + -- Define sequence names + seq_name := 'pgmq.' || format ('"%s_msg_id_seq"', tbl.table_name); + new_seq_name := 'pgmq.' || format ('"%s_msg_id_seq2"', tbl.table_name); + archive_table_name := regexp_replace(tbl.table_name, '^q_', 'a_'); + -- Execute dynamic SQL to perform the required operations + execute format(' + create sequence %s; + select setval(''%s'', nextval(''%s'')); + alter table %s."%s" alter column msg_id drop identity; + alter table %s."%s" alter column msg_id set default nextval(''%s''); + alter sequence %s rename to %s; + alter sequence %s owner to postgres;', + -- Parameters for format placeholders + new_seq_name, + new_seq_name, seq_name, + 'pgmq', tbl.table_name, + 'pgmq', tbl.table_name, + new_seq_name, + -- alter seq + new_seq_name, format('"%s_msg_id_seq"', tbl.table_name), + -- set owner + seq_name + ); + end loop; + -- No tables should be owned by the extension. + -- We want them to be included in logical backups + for tbl in + select c.relname as table_name + from pg_class c + join pg_depend d + on c.oid = d.objid + join pg_extension e + on d.refobjid = e.oid + where + c.relkind in ('r', 'p', 'u') + and e.extname = 'pgmq' + and (c.relname like 'q_%' or c.relname like 'a_%') + loop + execute format(' + alter extension pgmq drop table pgmq."%s";', + tbl.table_name + ); + end loop; +end $$; + +-- Now confirm that pgmq.drop_queue still works +select pgmq.drop_queue('lb-test'); From 65c4c618088bc681021d126b6e146ad9b8343e0a Mon Sep 17 00:00:00 2001 From: Oliver Rice Date: Thu, 9 Jan 2025 10:41:28 -0600 Subject: [PATCH 2/3] add after create patch for drop_queue --- .../pgmq/after-create.sql | 131 ++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/ansible/files/postgresql_extension_custom_scripts/pgmq/after-create.sql b/ansible/files/postgresql_extension_custom_scripts/pgmq/after-create.sql index 8b126d403..276220a3c 100644 --- a/ansible/files/postgresql_extension_custom_scripts/pgmq/after-create.sql +++ b/ansible/files/postgresql_extension_custom_scripts/pgmq/after-create.sql @@ -4,6 +4,137 @@ declare r record; begin set local search_path = ''; + +/* + Override the pgmq.drop_queue to check if relevant tables are owned + by the pgmq extension before attempting to run + `alter extension pgmq drop table ...` + this is necessary becasue, to enable nightly logical backups to include user queues + we automatically detach them from pgmq. + + this update is backwards compatible with version 1.4.4 but should be removed once we're on + physical backups everywhere +*/ +-- Detach and delete the official function +alter extension pgmq drop function pgmq.drop_queue; +drop function pgmq.drop_queue; + +-- Create and reattach the patched function +CREATE FUNCTION pgmq.drop_queue(queue_name TEXT) +RETURNS BOOLEAN AS $func$ +DECLARE + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); + qtable_seq TEXT := qtable || '_msg_id_seq'; + fq_qtable TEXT := 'pgmq.' || qtable; + atable TEXT := pgmq.format_table_name(queue_name, 'a'); + fq_atable TEXT := 'pgmq.' || atable; + partitioned BOOLEAN; +BEGIN + EXECUTE FORMAT( + $QUERY$ + SELECT is_partitioned FROM pgmq.meta WHERE queue_name = %L + $QUERY$, + queue_name + ) INTO partitioned; + + -- NEW CONDITIONAL CHECK + if exists ( + select 1 + from pg_class c + join pg_depend d on c.oid = d.objid + join pg_extension e on d.refobjid = e.oid + where c.relname = qtable and e.extname = 'pgmq' + ) then + + EXECUTE FORMAT( + $QUERY$ + ALTER EXTENSION pgmq DROP TABLE pgmq.%I + $QUERY$, + qtable + ); + + end if; + + -- NEW CONDITIONAL CHECK + if exists ( + select 1 + from pg_class c + join pg_depend d on c.oid = d.objid + join pg_extension e on d.refobjid = e.oid + where c.relname = qtable_seq and e.extname = 'pgmq' + ) then + EXECUTE FORMAT( + $QUERY$ + ALTER EXTENSION pgmq DROP SEQUENCE pgmq.%I + $QUERY$, + qtable_seq + ); + + end if; + + -- NEW CONDITIONAL CHECK + if exists ( + select 1 + from pg_class c + join pg_depend d on c.oid = d.objid + join pg_extension e on d.refobjid = e.oid + where c.relname = atable and e.extname = 'pgmq' + ) then + + EXECUTE FORMAT( + $QUERY$ + ALTER EXTENSION pgmq DROP TABLE pgmq.%I + $QUERY$, + atable + ); + + end if; + + -- NO CHANGES PAST THIS POINT + + EXECUTE FORMAT( + $QUERY$ + DROP TABLE IF EXISTS pgmq.%I + $QUERY$, + qtable + ); + + EXECUTE FORMAT( + $QUERY$ + DROP TABLE IF EXISTS pgmq.%I + $QUERY$, + atable + ); + + IF EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_name = 'meta' and table_schema = 'pgmq' + ) THEN + EXECUTE FORMAT( + $QUERY$ + DELETE FROM pgmq.meta WHERE queue_name = %L + $QUERY$, + queue_name + ); + END IF; + + IF partitioned THEN + EXECUTE FORMAT( + $QUERY$ + DELETE FROM %I.part_config where parent_table in (%L, %L) + $QUERY$, + pgmq._get_pg_partman_schema(), fq_qtable, fq_atable + ); + END IF; + + RETURN TRUE; +END; +$func$ LANGUAGE plpgsql; + +alter extension pgmq add function pgmq.drop_queue; + + update pg_extension set extowner = 'postgres'::regrole where extname = 'pgmq'; for r in (select * from pg_depend where refobjid = extoid) loop if r.classid = 'pg_type'::regclass then From 37ed28122cec56e748426021207d8a600ef7a565 Mon Sep 17 00:00:00 2001 From: Oliver Rice Date: Thu, 9 Jan 2025 10:43:05 -0600 Subject: [PATCH 3/3] remove test because supautils hooks dont run. confirmed it works using start-server --- nix/tests/expected/pgmq_logical_backups.out | 102 -------------------- nix/tests/sql/pgmq_logical_backups.sql | 93 ------------------ 2 files changed, 195 deletions(-) delete mode 100644 nix/tests/expected/pgmq_logical_backups.out delete mode 100644 nix/tests/sql/pgmq_logical_backups.sql diff --git a/nix/tests/expected/pgmq_logical_backups.out b/nix/tests/expected/pgmq_logical_backups.out deleted file mode 100644 index ef4dfe4b3..000000000 --- a/nix/tests/expected/pgmq_logical_backups.out +++ /dev/null @@ -1,102 +0,0 @@ -/* - This test confirms that the pgmq after-create supautils hook - that replaces the drop_queue function is inter-operable with - the infra hook that detaches pgmq queues from the pgmq - extension's ownership prior to taking logical backups -*/ --- Create a queue -select pgmq.create('lb-test'); - create --------- - -(1 row) - --- Add a record -select - * -from - pgmq.send( - queue_name:='lb-test', - msg:='{"foo": "bar1"}' - ); - send ------- - 1 -(1 row) - -/* - COPY/PASTE of the on-pause hook that - - detaches ownership of queues from the extension - - updates identity columns to avoid pg_dump segfault -*/ -do $$ -declare - tbl record; - seq_name text; - new_seq_name text; - archive_table_name text; -begin - -- Loop through each table in the pgmq schema starting with 'q_' - -- Rebuild the pkey column's default to avoid pg_dumpall segfaults - for tbl in - select c.relname as table_name - from pg_catalog.pg_attribute a - join pg_catalog.pg_class c on c.oid = a.attrelid - join pg_catalog.pg_namespace n on n.oid = c.relnamespace - where n.nspname = 'pgmq' - and c.relname like 'q_%' - and a.attname = 'msg_id' - and a.attidentity in ('a', 'd') -- 'a' for ALWAYS, 'd' for BY DEFAULT - loop - -- Check if msg_id is an IDENTITY column for idempotency - -- Define sequence names - seq_name := 'pgmq.' || format ('"%s_msg_id_seq"', tbl.table_name); - new_seq_name := 'pgmq.' || format ('"%s_msg_id_seq2"', tbl.table_name); - archive_table_name := regexp_replace(tbl.table_name, '^q_', 'a_'); - -- Execute dynamic SQL to perform the required operations - execute format(' - create sequence %s; - select setval(''%s'', nextval(''%s'')); - alter table %s."%s" alter column msg_id drop identity; - alter table %s."%s" alter column msg_id set default nextval(''%s''); - alter sequence %s rename to %s; - alter sequence %s owner to postgres;', - -- Parameters for format placeholders - new_seq_name, - new_seq_name, seq_name, - 'pgmq', tbl.table_name, - 'pgmq', tbl.table_name, - new_seq_name, - -- alter seq - new_seq_name, format('"%s_msg_id_seq"', tbl.table_name), - -- set owner - seq_name - ); - end loop; - -- No tables should be owned by the extension. - -- We want them to be included in logical backups - for tbl in - select c.relname as table_name - from pg_class c - join pg_depend d - on c.oid = d.objid - join pg_extension e - on d.refobjid = e.oid - where - c.relkind in ('r', 'p', 'u') - and e.extname = 'pgmq' - and (c.relname like 'q_%' or c.relname like 'a_%') - loop - execute format(' - alter extension pgmq drop table pgmq."%s";', - tbl.table_name - ); - end loop; -end $$; --- Now confirm that pgmq.drop_queue still works -select pgmq.drop_queue('lb-test'); -ERROR: table pgmq."q_lb-test" is not a member of extension "pgmq" -CONTEXT: SQL statement " - ALTER EXTENSION pgmq DROP TABLE pgmq."q_lb-test" - " -PL/pgSQL function pgmq.drop_queue(text,boolean) line 8 at EXECUTE diff --git a/nix/tests/sql/pgmq_logical_backups.sql b/nix/tests/sql/pgmq_logical_backups.sql deleted file mode 100644 index fc3b13615..000000000 --- a/nix/tests/sql/pgmq_logical_backups.sql +++ /dev/null @@ -1,93 +0,0 @@ -/* - This test confirms that the pgmq after-create supautils hook - that replaces the drop_queue function is inter-operable with - the infra hook that detaches pgmq queues from the pgmq - extension's ownership prior to taking logical backups -*/ - --- Create a queue -select pgmq.create('lb-test'); - --- Add a record -select - * -from - pgmq.send( - queue_name:='lb-test', - msg:='{"foo": "bar1"}' - ); - - -/* - COPY/PASTE of the on-pause hook that - - detaches ownership of queues from the extension - - updates identity columns to avoid pg_dump segfault -*/ - -do $$ -declare - tbl record; - seq_name text; - new_seq_name text; - archive_table_name text; -begin - -- Loop through each table in the pgmq schema starting with 'q_' - -- Rebuild the pkey column's default to avoid pg_dumpall segfaults - for tbl in - select c.relname as table_name - from pg_catalog.pg_attribute a - join pg_catalog.pg_class c on c.oid = a.attrelid - join pg_catalog.pg_namespace n on n.oid = c.relnamespace - where n.nspname = 'pgmq' - and c.relname like 'q_%' - and a.attname = 'msg_id' - and a.attidentity in ('a', 'd') -- 'a' for ALWAYS, 'd' for BY DEFAULT - loop - -- Check if msg_id is an IDENTITY column for idempotency - -- Define sequence names - seq_name := 'pgmq.' || format ('"%s_msg_id_seq"', tbl.table_name); - new_seq_name := 'pgmq.' || format ('"%s_msg_id_seq2"', tbl.table_name); - archive_table_name := regexp_replace(tbl.table_name, '^q_', 'a_'); - -- Execute dynamic SQL to perform the required operations - execute format(' - create sequence %s; - select setval(''%s'', nextval(''%s'')); - alter table %s."%s" alter column msg_id drop identity; - alter table %s."%s" alter column msg_id set default nextval(''%s''); - alter sequence %s rename to %s; - alter sequence %s owner to postgres;', - -- Parameters for format placeholders - new_seq_name, - new_seq_name, seq_name, - 'pgmq', tbl.table_name, - 'pgmq', tbl.table_name, - new_seq_name, - -- alter seq - new_seq_name, format('"%s_msg_id_seq"', tbl.table_name), - -- set owner - seq_name - ); - end loop; - -- No tables should be owned by the extension. - -- We want them to be included in logical backups - for tbl in - select c.relname as table_name - from pg_class c - join pg_depend d - on c.oid = d.objid - join pg_extension e - on d.refobjid = e.oid - where - c.relkind in ('r', 'p', 'u') - and e.extname = 'pgmq' - and (c.relname like 'q_%' or c.relname like 'a_%') - loop - execute format(' - alter extension pgmq drop table pgmq."%s";', - tbl.table_name - ); - end loop; -end $$; - --- Now confirm that pgmq.drop_queue still works -select pgmq.drop_queue('lb-test');