From c6cd12c2518315235b2f879118d547924e0b453f Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Thu, 1 Aug 2024 17:43:00 -0500 Subject: [PATCH 1/3] Remove replication slot on member unregistration (#252) --- cmd/pg_unregister/main.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/cmd/pg_unregister/main.go b/cmd/pg_unregister/main.go index 1f4c1ac1..93b31649 100644 --- a/cmd/pg_unregister/main.go +++ b/cmd/pg_unregister/main.go @@ -7,6 +7,7 @@ import ( "os" "github.com/fly-apps/postgres-flex/internal/flypg" + "github.com/fly-apps/postgres-flex/internal/flypg/admin" "github.com/fly-apps/postgres-flex/internal/utils" ) @@ -48,5 +49,20 @@ func processUnregistration(ctx context.Context) error { return fmt.Errorf("failed to unregister member: %v", err) } + slots, err := admin.ListReplicationSlots(ctx, conn) + if err != nil { + return fmt.Errorf("failed to list replication slots: %v", err) + } + + targetSlot := fmt.Sprintf("repmgr_slot_%d", member.ID) + for _, slot := range slots { + if slot.Name == targetSlot { + if err := admin.DropReplicationSlot(ctx, conn, targetSlot); err != nil { + return fmt.Errorf("failed to drop replication slot: %v", err) + } + break + } + } + return nil } From 0e58cbcecfc1740cc2425ab200b478627b18102c Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Tue, 6 Aug 2024 20:51:30 -0500 Subject: [PATCH 2/3] Fix slot drop race (#253) * Address race condition during unregistration process * bug fix --- cmd/pg_unregister/main.go | 48 +++++++++++++++++++++++++++-------- internal/flypg/admin/admin.go | 13 +++++++++- 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/cmd/pg_unregister/main.go b/cmd/pg_unregister/main.go index 93b31649..c1ac9517 100644 --- a/cmd/pg_unregister/main.go +++ b/cmd/pg_unregister/main.go @@ -4,11 +4,14 @@ import ( "context" "encoding/base64" "fmt" + "log" "os" + "time" "github.com/fly-apps/postgres-flex/internal/flypg" "github.com/fly-apps/postgres-flex/internal/flypg/admin" "github.com/fly-apps/postgres-flex/internal/utils" + "github.com/jackc/pgx/v5" ) func main() { @@ -49,20 +52,43 @@ func processUnregistration(ctx context.Context) error { return fmt.Errorf("failed to unregister member: %v", err) } - slots, err := admin.ListReplicationSlots(ctx, conn) - if err != nil { - return fmt.Errorf("failed to list replication slots: %v", err) + slotName := fmt.Sprintf("repmgr_slot_%d", member.ID) + if err := removeReplicationSlot(ctx, conn, slotName); err != nil { + return err } - targetSlot := fmt.Sprintf("repmgr_slot_%d", member.ID) - for _, slot := range slots { - if slot.Name == targetSlot { - if err := admin.DropReplicationSlot(ctx, conn, targetSlot); err != nil { - return fmt.Errorf("failed to drop replication slot: %v", err) + return nil +} + +func removeReplicationSlot(ctx context.Context, conn *pgx.Conn, slotName string) error { + ticker := time.NewTicker(1 * time.Second) + timeout := time.After(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timeout: + return fmt.Errorf("timed out trying to drop replication slot") + case <-ticker.C: + slot, err := admin.GetReplicationSlot(ctx, conn, slotName) + if err != nil { + if err == pgx.ErrNoRows { + return nil + } + return fmt.Errorf("failed to get replication slot %s: %v", slotName, err) + } + + if slot.Active { + log.Printf("Slot %s is still active, waiting...", slotName) + continue } - break + + if err := admin.DropReplicationSlot(ctx, conn, slotName); err != nil { + return fmt.Errorf("failed to drop replication slot %s: %v", slotName, err) + } + + return nil } } - - return nil } diff --git a/internal/flypg/admin/admin.go b/internal/flypg/admin/admin.go index 2f1925b1..985dd483 100644 --- a/internal/flypg/admin/admin.go +++ b/internal/flypg/admin/admin.go @@ -130,6 +130,18 @@ type ReplicationSlot struct { RetainedWalInBytes int } +func GetReplicationSlot(ctx context.Context, pg *pgx.Conn, slotName string) (*ReplicationSlot, error) { + sql := fmt.Sprintf("SELECT slot_name, active, wal_status, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS retained_wal FROM pg_replication_slots where slot_name = '%s';", slotName) + row := pg.QueryRow(ctx, sql) + + var slot ReplicationSlot + if err := row.Scan(&slot.Name, &slot.Active, &slot.WalStatus, &slot.RetainedWalInBytes); err != nil { + return nil, err + } + + return &slot, nil +} + func ListReplicationSlots(ctx context.Context, pg *pgx.Conn) ([]ReplicationSlot, error) { sql := "SELECT slot_name, active, wal_status, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS retained_wal FROM pg_replication_slots;" rows, err := pg.Query(ctx, sql) @@ -167,7 +179,6 @@ func ListReplicationSlots(ctx context.Context, pg *pgx.Conn) ([]ReplicationSlot, func DropReplicationSlot(ctx context.Context, pg *pgx.Conn, name string) error { sql := fmt.Sprintf("SELECT pg_drop_replication_slot('%s');", name) - _, err := pg.Exec(ctx, sql) if err != nil { return err From 7a8ac4d575f2aab1cb9b33488a8b83a42e3f73d4 Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Wed, 7 Aug 2024 16:45:33 -0500 Subject: [PATCH 3/3] Ensure pg_rewind is included in path (#254) --- pg16/Dockerfile | 17 ++++++++++------- pg16/Dockerfile-timescaledb | 16 +++++++++------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/pg16/Dockerfile b/pg16/Dockerfile index d0d869e3..523fe59d 100644 --- a/pg16/Dockerfile +++ b/pg16/Dockerfile @@ -21,15 +21,19 @@ COPY ./bin/* /fly/bin/ FROM ubuntu:24.04 -ENV PGDATA=/data/postgresql -ENV PGPASSFILE=/data/.pgpass -ENV AWS_SHARED_CREDENTIALS_FILE=/data/.aws/credentials ARG VERSION ARG PG_MAJOR_VERSION ARG POSTGIS_MAJOR=3 ARG HAPROXY_VERSION=2.8 ARG REPMGR_VERSION=5.4.1-1build2 +ENV PGDATA=/data/postgresql +ENV PGPASSFILE=/data/.pgpass +ENV AWS_SHARED_CREDENTIALS_FILE=/data/.aws/credentials +ENV PG_MAJOR_VERSION=${PG_MAJOR_VERSION} +ENV PATH="/usr/lib/postgresql/${PG_MAJOR_VERSION}/bin:$PATH" + + LABEL fly.app_role=postgres_cluster LABEL fly.version=${VERSION} LABEL fly.pg-version=${PG_VERSION} @@ -63,16 +67,15 @@ RUN apt-get update && apt-get install --no-install-recommends -y \ haproxy=$HAPROXY_VERSION.\* \ && apt autoremove -y && apt clean - -# Add PostgreSQL bin directory to PATH -ENV PATH="/usr/lib/postgresql/${PG_MAJOR_VERSION}/bin:$PATH" - # Copy Go binaries from the builder stage COPY --from=builder /fly/bin/* /usr/local/bin # Copy Postgres exporter COPY --from=wrouesnel/postgres_exporter:latest /postgres_exporter /usr/local/bin/ +# Move pg_rewind into path. +RUN ln -s /usr/lib/postgresql/${PG_MAJOR_VERSION}/bin/pg_rewind /usr/bin/pg_rewind + ADD /config/* /fly/ RUN mkdir -p /run/haproxy/ RUN usermod -d /data postgres diff --git a/pg16/Dockerfile-timescaledb b/pg16/Dockerfile-timescaledb index 62da26b5..700ab7b5 100644 --- a/pg16/Dockerfile-timescaledb +++ b/pg16/Dockerfile-timescaledb @@ -21,15 +21,18 @@ COPY ./bin/* /fly/bin/ FROM ubuntu:24.04 -ENV PGDATA=/data/postgresql -ENV PGPASSFILE=/data/.pgpass -ENV AWS_SHARED_CREDENTIALS_FILE=/data/.aws/credentials ARG VERSION ARG PG_MAJOR_VERSION ARG POSTGIS_MAJOR=3 ARG HAPROXY_VERSION=2.8 ARG REPMGR_VERSION=5.4.1-1build2 +ENV PGDATA=/data/postgresql +ENV PGPASSFILE=/data/.pgpass +ENV AWS_SHARED_CREDENTIALS_FILE=/data/.aws/credentials +ENV PG_MAJOR_VERSION=${PG_MAJOR_VERSION} +ENV PATH="/usr/lib/postgresql/${PG_MAJOR_VERSION}/bin:$PATH" + LABEL fly.app_role=postgres_cluster LABEL fly.version=${VERSION} LABEL fly.pg-version=${PG_VERSION} @@ -69,16 +72,15 @@ RUN apt-get update && apt-get install --no-install-recommends -y \ haproxy=$HAPROXY_VERSION.\* \ && apt autoremove -y && apt clean - -# Add PostgreSQL bin directory to PATH -ENV PATH="/usr/lib/postgresql/${PG_MAJOR_VERSION}/bin:$PATH" - # Copy Go binaries from the builder stage COPY --from=builder /fly/bin/* /usr/local/bin # Copy Postgres exporter COPY --from=wrouesnel/postgres_exporter:latest /postgres_exporter /usr/local/bin/ +# Move pg_rewind into path. +RUN ln -s /usr/lib/postgresql/${PG_MAJOR_VERSION}/bin/pg_rewind /usr/bin/pg_rewind + ADD /config/* /fly/ RUN mkdir -p /run/haproxy/ RUN usermod -d /data postgres