From 4d426fb868277a513c76090e86df376e2670e5e9 Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Fri, 17 Jan 2025 09:46:10 -0600 Subject: [PATCH] Node name fix (#268) * Ensure implementation is migration friendly * fix * Set go version in dockerfiles * potential fix * Adjusting how Zombie state is evaluated on boot. * Fix unregistration err condition * Not used anymore * cleanup unregistration logic * Fix err check * Move this back to custom * Remove todo --------- Co-authored-by: Ben Iofel --- .github/workflows/push.yml | 8 +-- bin/restart-repmgrd | 9 +++- cmd/pg_unregister/main.go | 49 ++++++++++++++---- go.mod | 6 +-- go.sum | 4 ++ internal/flypg/node.go | 90 +++++++++++++++++++++++++++----- internal/flypg/readonly.go | 4 +- internal/flypg/repmgr.go | 98 ++++++++++++++++++++++++++++------- internal/flypg/repmgr_test.go | 6 ++- internal/flypg/zombie.go | 21 +++----- internal/privnet/sixpn.go | 58 ++++++++++++++++----- pg15/Dockerfile | 3 +- pg15/Dockerfile-timescaledb | 3 +- pg16/Dockerfile | 3 +- pg16/Dockerfile-timescaledb | 3 +- 15 files changed, 281 insertions(+), 84 deletions(-) diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index d13b153f..910fcd2b 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -18,7 +18,7 @@ jobs: - uses: actions/setup-go@v3 with: - go-version: '1.20' + go-version: '1.23' go-version-file: 'go.mod' cache: true @@ -36,7 +36,7 @@ jobs: - uses: actions/setup-go@v3 with: - go-version: '1.20' + go-version: '1.23' go-version-file: 'go.mod' cache: true @@ -52,7 +52,7 @@ jobs: - uses: actions/setup-go@v3 with: - go-version: '1.20' + go-version: '1.23' go-version-file: 'go.mod' cache: true @@ -68,7 +68,7 @@ jobs: - uses: actions/setup-go@v3 with: - go-version: '1.20' + go-version: '1.23' go-version-file: 'go.mod' cache: true diff --git a/bin/restart-repmgrd b/bin/restart-repmgrd index 10ef6bef..63f25f9d 100755 --- a/bin/restart-repmgrd +++ b/bin/restart-repmgrd @@ -1,3 +1,10 @@ #!/bin/bash -kill `cat /tmp/repmgrd.pid` +if [ -f /tmp/repmgrd.pid ]; then + PID=$(cat /tmp/repmgrd.pid) + + # Check if the process is running + if ps -p $PID > /dev/null 2>&1; then + kill $PID + fi +fi diff --git a/cmd/pg_unregister/main.go b/cmd/pg_unregister/main.go index c1ac9517..de7c5448 100644 --- a/cmd/pg_unregister/main.go +++ b/cmd/pg_unregister/main.go @@ -27,9 +27,9 @@ func main() { func processUnregistration(ctx context.Context) error { encodedArg := os.Args[1] - hostnameBytes, err := base64.StdEncoding.DecodeString(encodedArg) + machineBytes, err := base64.StdEncoding.DecodeString(encodedArg) if err != nil { - return fmt.Errorf("failed to decode hostname: %v", err) + return fmt.Errorf("failed to decode machine: %v", err) } node, err := flypg.NewNode() @@ -43,19 +43,50 @@ func processUnregistration(ctx context.Context) error { } defer func() { _ = conn.Close(ctx) }() - member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes)) + machineID := string(machineBytes) + + if len(machineID) != 14 { + return fmt.Errorf("invalid machine id: %s", machineID) + } + + member, err := node.RepMgr.MemberByNodeName(ctx, conn, machineID) if err != nil { - return fmt.Errorf("failed to resolve member: %s", err) + // If no rows are found, the member was either already unregistered or never registered + if err != pgx.ErrNoRows { + return fmt.Errorf("failed to resolve member using %s: %s", machineID, err) + } + } + + // If the member exists unregister it and remove the replication slot + if member != nil { + if err := node.RepMgr.UnregisterMember(*member); err != nil { + return fmt.Errorf("failed to unregister member: %v", err) + } + + slotName := fmt.Sprintf("repmgr_slot_%d", member.ID) + if err := removeReplicationSlot(ctx, conn, slotName); err != nil { + return fmt.Errorf("failed to remove replication slot: %v", err) + } } - if err := node.RepMgr.UnregisterMember(*member); err != nil { - return fmt.Errorf("failed to unregister member: %v", err) + // Redirect logs to /dev/null temporarily so we don't pollute the response data. + devnull, err := os.Open(os.DevNull) + if err != nil { + return fmt.Errorf("failed to open /dev/null: %v", err) } + defer func() { _ = devnull.Close() }() + + // Save the original log output + originalLogOutput := log.Writer() + + // Redirect logs to /dev/null + log.SetOutput(devnull) - slotName := fmt.Sprintf("repmgr_slot_%d", member.ID) - if err := removeReplicationSlot(ctx, conn, slotName); err != nil { - return err + if err := flypg.EvaluateClusterState(ctx, conn, node); err != nil { + return fmt.Errorf("failed to evaluate cluster state: %v", err) } + // Restore the original log output + log.SetOutput(originalLogOutput) return nil } diff --git a/go.mod b/go.mod index b1b1e9fb..d23b3a5d 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/fly-apps/postgres-flex -go 1.20 +go 1.23 require ( github.com/go-chi/chi/v5 v5.0.8 @@ -8,8 +8,10 @@ require ( github.com/hashicorp/consul/api v1.18.0 github.com/jackc/pgconn v1.14.3 github.com/jackc/pgx/v5 v5.5.4 + github.com/olekukonko/tablewriter v0.0.5 github.com/pkg/errors v0.9.1 github.com/pkg/term v1.1.0 + github.com/spf13/cobra v1.8.1 github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2 golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3 golang.org/x/sync v0.1.0 @@ -36,8 +38,6 @@ require ( github.com/mattn/go-runewidth v0.0.9 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect - github.com/olekukonko/tablewriter v0.0.5 // indirect - github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.0 // indirect golang.org/x/crypto v0.20.0 // indirect diff --git a/go.sum b/go.sum index a095412d..8692e6bd 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,7 @@ github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXO github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= @@ -78,6 +79,7 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZ github.com/jackc/pgx/v5 v5.5.4 h1:Xp2aQS8uXButQdnCMWNmvx6UysWQQC+u1EoizjguY+8= github.com/jackc/pgx/v5 v5.5.4/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -139,6 +141,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2 h1:mZNiJSrmbQA/3+Vy8GLL/Q9qdHxPzjcxKv+E14GZLFs= github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2/go.mod h1:BbqpB4y6Z/cijQqKWJ3i8LMsAoC29gzX6vsSD3Qq7uw= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -154,6 +157,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 3b099218..fe252327 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -16,10 +16,12 @@ import ( "github.com/fly-apps/postgres-flex/internal/privnet" "github.com/fly-apps/postgres-flex/internal/utils" "github.com/jackc/pgx/v5" + "golang.org/x/exp/slices" ) type Node struct { AppName string + MachineID string PrivateIP string PrimaryRegion string DataDir string @@ -52,6 +54,8 @@ func NewNode() (*Node, error) { node.PrivateIP = ipv6.String() + node.MachineID = os.Getenv("FLY_MACHINE_ID") + node.PrimaryRegion = os.Getenv("PRIMARY_REGION") if node.PrimaryRegion == "" { return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set") @@ -88,7 +92,9 @@ func NewNode() (*Node, error) { UserConfigPath: "/data/repmgr.user.conf", PasswordConfigPath: "/data/.pgpass", DataDir: node.DataDir, + HostName: node.Hostname(), PrivateIP: node.PrivateIP, + MachineID: node.MachineID, Port: 5433, DatabaseName: "repmgr", Credentials: node.ReplCredentials, @@ -182,7 +188,7 @@ func (n *Node) Init(ctx context.Context) error { } } else { log.Println("Provisioning standby") - cloneTarget, err := n.RepMgr.ResolveMemberOverDNS(ctx) + cloneTarget, err := n.RepMgr.ResolvePrimaryOverDNS(ctx) if err != nil { return fmt.Errorf("failed to resolve member over dns: %s", err) } @@ -225,14 +231,6 @@ func (n *Node) Init(ctx context.Context) error { // PostInit are operations that need to be executed against a running Postgres on boot. func (n *Node) PostInit(ctx context.Context) error { - if ZombieLockExists() { - log.Println("[ERROR] Manual intervention required.") - log.Println("[ERROR] If a new primary has been established, consider adding a new replica with `fly machines clone ` and then remove this member.") - log.Println("[ERROR] Sleeping for 5 minutes.") - time.Sleep(5 * time.Minute) - return fmt.Errorf("unrecoverable zombie") - } - // Use the Postgres user on boot, since our internal user may not have been created yet. conn, err := n.NewLocalConnection(ctx, "postgres", n.OperatorCredentials) if err != nil { @@ -265,7 +263,7 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to resolve member role: %s", err) } - // Restart repmgrd in the event the IP changes for an already registered node. + // Restart repmgrd in the event the machine ID changes for an already registered node. // This can happen if the underlying volume is moved to a different node. daemonRestartRequired := n.RepMgr.daemonRestartRequired(member) @@ -291,14 +289,22 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to run zombie diagnosis: %s", err) } - // This should never happen - if primary != n.PrivateIP { + // This should never happen, but check anyways for correctness + if primary != n.Hostname() { return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen", primary, - n.PrivateIP, + n.Hostname(), ) } + // Clear the zombie lock if it exists. + if ZombieLockExists() { + log.Println("[INFO] Clearing zombie lock and re-enabling read/write") + if err := RemoveZombieLock(); err != nil { + return fmt.Errorf("failed to remove zombie lock: %s", err) + } + } + // Re-register primary to apply any configuration changes. if err := n.RepMgr.registerPrimary(daemonRestartRequired); err != nil { return fmt.Errorf("failed to re-register existing primary: %s", err) @@ -311,6 +317,10 @@ func (n *Node) PostInit(ctx context.Context) error { } } case StandbyRoleName: + if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil { + return fmt.Errorf("failed to migrate node name: %s", err) + } + // Register existing standby to apply any configuration changes. if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil { return fmt.Errorf("failed to register existing standby: %s", err) @@ -399,7 +409,7 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to enable repmgr: %s", err) } - primary, err := n.RepMgr.ResolveMemberOverDNS(ctx) + primary, err := n.RepMgr.ResolvePrimaryOverDNS(ctx) if err != nil { return fmt.Errorf("failed to resolve primary member: %s", err) } @@ -527,3 +537,55 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro return nil } + +// migrate node name from 6pn to machine ID if needed +func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error { + primary, err := n.RepMgr.PrimaryMember(ctx, repConn) + if err != nil { + return fmt.Errorf("failed to resolve primary member when updating standby: %s", err) + } + + primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.Hostname) + if err != nil { + return fmt.Errorf("failed to establish connection to primary: %s", err) + } + defer func() { _ = primaryConn.Close(ctx) }() + + rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication") + if err != nil { + return fmt.Errorf("failed to query pg_stat_replication: %s", err) + } + defer rows.Close() + + var applicationNames []string + for rows.Next() { + var applicationName string + if err := rows.Scan(&applicationName); err != nil { + return fmt.Errorf("failed to scan application_name: %s", err) + } + applicationNames = append(applicationNames, applicationName) + } + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to iterate over rows: %s", err) + } + + // if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql + if slices.Contains(applicationNames, n.PrivateIP) { + log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...") + + if err := n.RepMgr.regenReplicationConf(ctx); err != nil { + return fmt.Errorf("failed to clone standby: %s", err) + } + + if err := admin.ReloadPostgresConfig(ctx, repConn); err != nil { + return fmt.Errorf("failed to reload postgresql: %s", err) + } + } + + return nil +} + +// Hostname returns the hostname of the node. +func (n *Node) Hostname() string { + return fmt.Sprintf("%s.vm.%s.internal", n.MachineID, n.AppName) +} diff --git a/internal/flypg/readonly.go b/internal/flypg/readonly.go index a714c254..1c075538 100644 --- a/internal/flypg/readonly.go +++ b/internal/flypg/readonly.go @@ -70,7 +70,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { for _, member := range members { if member.Role == PrimaryRoleName { - endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target) + endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, target) resp, err := http.Get(endpoint) if err != nil { log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err) @@ -85,7 +85,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { } for _, member := range members { - endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint) + endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, RestartHaproxyEndpoint) resp, err := http.Get(endpoint) if err != nil { log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err) diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 46ceab8c..b7058a46 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -33,7 +33,9 @@ type RepMgr struct { AppName string PrimaryRegion string Region string + HostName string PrivateIP string + MachineID string DataDir string DatabaseName string Credentials admin.Credential @@ -163,8 +165,8 @@ func (r *RepMgr) setDefaults() error { conf := ConfigMap{ "node_id": nodeID, - "node_name": fmt.Sprintf("'%s'", r.PrivateIP), - "conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", r.PrivateIP, r.Port, r.Credentials.Username, r.DatabaseName), + "node_name": fmt.Sprintf("'%s'", r.MachineID), + "conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", r.HostName, r.Port, r.Credentials.Username, r.DatabaseName), "data_directory": fmt.Sprintf("'%s'", r.DataDir), "failover": "'automatic'", "use_replication_slots": "yes", @@ -276,7 +278,7 @@ func (*RepMgr) restartDaemon() error { } func (r *RepMgr) daemonRestartRequired(m *Member) bool { - return m.Hostname != r.PrivateIP + return m.Name != r.MachineID } func (r *RepMgr) unregisterWitness(id int) error { @@ -301,14 +303,14 @@ func (r *RepMgr) rejoinCluster(hostname string) error { return err } -func (r *RepMgr) clonePrimary(ipStr string) error { +func (r *RepMgr) clonePrimary(hostname string) error { cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir) if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil { return fmt.Errorf("failed to create pg directory: %s", err) } cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -c -F", - ipStr, + hostname, r.Port, r.DatabaseName, r.Credentials.Username, @@ -322,15 +324,30 @@ func (r *RepMgr) clonePrimary(ipStr string) error { return nil } +func (r *RepMgr) regenReplicationConf(ctx context.Context) error { + if _, err := utils.RunCmd(ctx, "postgres", + "repmgr", "--replication-conf-only", + "-h", r.HostName, + "-p", fmt.Sprint(r.Port), + "-d", r.DatabaseName, + "-U", r.Credentials.Username, + "-f", r.ConfigPath, + "standby", "clone", "-F"); err != nil { + return fmt.Errorf("failed to regenerate replication conf: %s", err) + } + return nil +} + type Member struct { ID int + Name string Hostname string Active bool Region string Role string } -func (*RepMgr) Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) { +func (r *RepMgr) Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) { sql := "select node_id, node_name, location, active, type from repmgr.nodes;" rows, err := pg.Query(ctx, sql) if err != nil { @@ -341,10 +358,19 @@ func (*RepMgr) Members(ctx context.Context, pg *pgx.Conn) ([]Member, error) { var members []Member for rows.Next() { var member Member - if err := rows.Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role); err != nil { + if err := rows.Scan(&member.ID, &member.Name, &member.Region, &member.Active, &member.Role); err != nil { return nil, err } + // Assume we are working with a machineID if the name is 14 characters long. + if len(member.Name) == 14 { + member.Hostname = r.machineIDToDNS(member.Name) + } else { + // Member name is the private IP. + member.Hostname = member.Name + member.Name = "" + } + members = append(members, member) } @@ -371,14 +397,23 @@ func (r *RepMgr) Member(ctx context.Context, conn *pgx.Conn) (*Member, error) { return nil, pgx.ErrNoRows } -func (*RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error) { +func (r *RepMgr) PrimaryMember(ctx context.Context, pg *pgx.Conn) (*Member, error) { var member Member sql := "select node_id, node_name, location, active, type from repmgr.nodes where type = 'primary' and active = true;" - err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role) + err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Name, &member.Region, &member.Active, &member.Role) if err != nil { return nil, err } + // Assume we are working with a machineID if the name is 14 characters long. + if len(member.Name) == 14 { + member.Hostname = r.machineIDToDNS(member.Name) + } else { + // Member name is the private IP. + member.Hostname = member.Name + member.Name = "" + } + return &member, nil } @@ -419,38 +454,42 @@ func (*RepMgr) MemberByID(ctx context.Context, pg *pgx.Conn, id int) (*Member, e return &member, nil } -func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname string) (*Member, error) { +func (r *RepMgr) MemberByNodeName(ctx context.Context, pg *pgx.Conn, name string) (*Member, error) { var member Member - sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_name = '%s';", hostname) + sql := fmt.Sprintf("select node_id, node_name, location, active, type from repmgr.nodes where node_name = '%s';", name) - err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Hostname, &member.Region, &member.Active, &member.Role) + err := pg.QueryRow(ctx, sql).Scan(&member.ID, &member.Name, &member.Region, &member.Active, &member.Role) if err != nil { return nil, err } + member.Hostname = r.machineIDToDNS(name) + return &member, nil } -func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) { - ips, err := r.InRegionPeerIPs(ctx) +func (r *RepMgr) ResolvePrimaryOverDNS(ctx context.Context) (*Member, error) { + machineIDs, err := r.InRegionPeerMachines(ctx) if err != nil { return nil, err } var target *Member - for _, ip := range ips { - if ip.String() == r.PrivateIP { + for _, machineID := range machineIDs { + if machineID == r.MachineID { continue } - conn, err := r.NewRemoteConnection(ctx, ip.String()) + hostname := r.machineIDToDNS(machineID) + + conn, err := r.NewRemoteConnection(ctx, hostname) if err != nil { continue } defer func() { _ = conn.Close(ctx) }() - member, err := r.MemberByHostname(ctx, conn, ip.String()) + member, err := r.MemberByNodeName(ctx, conn, machineID) if err != nil { continue } @@ -477,6 +516,21 @@ func (r *RepMgr) InRegionPeerIPs(ctx context.Context) ([]net.IPAddr, error) { return privnet.AllPeers(ctx, targets) } +func (r *RepMgr) InRegionPeerMachines(ctx context.Context) ([]string, error) { + machines, err := privnet.AllMachines(ctx, r.AppName) + if err != nil { + return nil, err + } + + var machineIDs []string + for _, machine := range machines { + if machine.Region == r.PrimaryRegion { + machineIDs = append(machineIDs, machine.Id) + } + } + return machineIDs, nil +} + func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error) { ips, err := r.InRegionPeerIPs(ctx) if err != nil { @@ -514,3 +568,11 @@ func (r *RepMgr) UnregisterMember(member Member) error { func (r *RepMgr) eligiblePrimary() bool { return r.Region == r.PrimaryRegion } + +func (r *RepMgr) machineIDToDNS(nodeName string) string { + if len(nodeName) != 14 { + panic("invalid machine id") + } + + return fmt.Sprintf("%s.vm.%s.internal", nodeName, r.AppName) +} diff --git a/internal/flypg/repmgr_test.go b/internal/flypg/repmgr_test.go index 8251c7d4..1bce3b3a 100644 --- a/internal/flypg/repmgr_test.go +++ b/internal/flypg/repmgr_test.go @@ -33,6 +33,7 @@ func TestRepmgrInitialization(t *testing.T) { UserConfigPath: repgmrUserConfigFilePath, PasswordConfigPath: repgmrPasswordConfigFilePath, DataDir: repmgrTestDirectory, + MachineID: "abcdefg1234567", PrivateIP: "127.0.0.1", Credentials: admin.Credential{ Username: "user", @@ -91,8 +92,8 @@ func TestRepmgrInitialization(t *testing.T) { t.Fatal(err) } - if config["node_name"] != "'127.0.0.1'" { - t.Fatalf("expected node_name to be '127.0.0.1', got %v", config["node_name"]) + if config["node_name"] != "'abcdefg1234567'" { + t.Fatalf("expected node_name to be 'abcdefg1234567', got %v", config["node_name"]) } if config["location"] != "'dev'" { @@ -122,6 +123,7 @@ func TestRepmgrNodeIDGeneration(t *testing.T) { DataDir: repmgrTestDirectory, PrivateIP: "127.0.0.1", + MachineID: "abcdefg1234567", Port: 5433, DatabaseName: "repmgr", Credentials: admin.Credential{ diff --git a/internal/flypg/zombie.go b/internal/flypg/zombie.go index 7204846d..d19122cc 100644 --- a/internal/flypg/zombie.go +++ b/internal/flypg/zombie.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "log" - "net" "os" "github.com/fly-apps/postgres-flex/internal/utils" @@ -85,7 +84,7 @@ type DNASample struct { func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASample, error) { sample := &DNASample{ - hostname: node.PrivateIP, + hostname: node.Hostname(), totalMembers: len(standbys) + 1, totalActive: 1, totalInactive: 0, @@ -117,8 +116,8 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp sample.totalActive++ - // Record conflict when primary doesn't match. - if primary.Hostname != node.PrivateIP { + // Record conflict when primary name does not match our machine ID + if primary.Hostname != node.Hostname() && primary.Hostname != node.PrivateIP { sample.totalConflicts++ sample.conflictMap[primary.Hostname]++ } @@ -199,24 +198,19 @@ func handleZombieLock(ctx context.Context, n *Node) error { // If the zombie lock contains a hostname, it means we were able to // resolve the real primary and will attempt to rejoin it. if primaryStr != "" { - ip := net.ParseIP(primaryStr) - if ip == nil { - return fmt.Errorf("zombie.lock file contains an invalid ipv6 address") - } - - conn, err := n.RepMgr.NewRemoteConnection(ctx, ip.String()) + conn, err := n.RepMgr.NewRemoteConnection(ctx, primaryStr) if err != nil { - return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", ip.String(), err) + return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", primaryStr, err) } defer func() { _ = conn.Close(ctx) }() primary, err := n.RepMgr.PrimaryMember(ctx, conn) if err != nil { - return fmt.Errorf("failed to confirm primary on recover target %s: %s", ip.String(), err) + return fmt.Errorf("failed to confirm primary on recover target %s: %s", primaryStr, err) } // Confirm that our rejoin target still identifies itself as the primary. - if primary.Hostname != ip.String() { + if primary.Hostname != primaryStr { // Clear the zombie.lock file so we can attempt to re-resolve the correct primary. if err := RemoveZombieLock(); err != nil { return fmt.Errorf("failed to remove zombie lock: %s", err) @@ -250,6 +244,7 @@ func handleZombieLock(ctx context.Context, n *Node) error { // TODO - Provide link to documentation on how to address this log.Println("[WARN] Zombie lock file does not contain a hostname.") log.Println("[WARN] This likely means that we were unable to determine who the real primary is.") + log.Println("[WARN] If a new primary has been established, consider adding a new replica with `fly machines clone ` and then remove this member.") } return nil diff --git a/internal/privnet/sixpn.go b/internal/privnet/sixpn.go index 85b1eaeb..ed9d4005 100644 --- a/internal/privnet/sixpn.go +++ b/internal/privnet/sixpn.go @@ -14,20 +14,7 @@ func AllPeers(ctx context.Context, appName string) ([]net.IPAddr, error) { } func Get6PN(ctx context.Context, hostname string) ([]net.IPAddr, error) { - nameserver := os.Getenv("FLY_NAMESERVER") - if nameserver == "" { - nameserver = "fdaa::3" - } - nameserver = net.JoinHostPort(nameserver, "53") - r := &net.Resolver{ - PreferGo: true, - Dial: func(ctx context.Context, network, address string) (net.Conn, error) { - d := net.Dialer{ - Timeout: 1 * time.Second, - } - return d.DialContext(ctx, "udp6", nameserver) - }, - } + r := getResolver() ips, err := r.LookupIPAddr(ctx, hostname) if err != nil { @@ -54,6 +41,49 @@ func Get6PN(ctx context.Context, hostname string) ([]net.IPAddr, error) { return ips, err } +type Machine struct { + Id string + Region string +} + +func AllMachines(ctx context.Context, appName string) ([]Machine, error) { + r := getResolver() + txts, err := r.LookupTXT(ctx, fmt.Sprintf("vms.%s.internal", appName)) + if err != nil { + return nil, err + } + + machines := make([]Machine, 0) + for _, txt := range txts { + parts := strings.Split(txt, ",") + for _, part := range parts { + parts := strings.Split(part, " ") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid machine DNS TXT format: %s", txt) + } + machines = append(machines, Machine{Id: parts[0], Region: parts[1]}) + } + } + return machines, nil +} + +func getResolver() *net.Resolver { + nameserver := os.Getenv("FLY_NAMESERVER") + if nameserver == "" { + nameserver = "fdaa::3" + } + nameserver = net.JoinHostPort(nameserver, "53") + return &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, network, _ string) (net.Conn, error) { + d := net.Dialer{ + Timeout: 1 * time.Second, + } + return d.DialContext(ctx, "udp6", nameserver) + }, + } +} + func PrivateIPv6() (net.IP, error) { ips, err := net.LookupIP("fly-local-6pn") if err != nil && !strings.HasSuffix(err.Error(), "no such host") && !strings.HasSuffix(err.Error(), "server misbehaving") { diff --git a/pg15/Dockerfile b/pg15/Dockerfile index aaba77dd..9bf7571d 100644 --- a/pg15/Dockerfile +++ b/pg15/Dockerfile @@ -2,7 +2,7 @@ ARG PG_VERSION=15.8 ARG PG_MAJOR_VERSION=15 ARG VERSION=custom -FROM golang:1.20 +FROM golang:1.23 WORKDIR /go/src/github.com/fly-apps/fly-postgres COPY . . @@ -26,6 +26,7 @@ ENV AWS_SHARED_CREDENTIALS_FILE=/data/.aws/credentials ARG VERSION ARG PG_MAJOR_VERSION +ARG PG_VERSION ARG POSTGIS_MAJOR=3 ARG HAPROXY_VERSION=2.8 diff --git a/pg15/Dockerfile-timescaledb b/pg15/Dockerfile-timescaledb index 348d3611..578b9c79 100644 --- a/pg15/Dockerfile-timescaledb +++ b/pg15/Dockerfile-timescaledb @@ -2,7 +2,7 @@ ARG PG_VERSION=15.8 ARG PG_MAJOR_VERSION=15 ARG VERSION=custom -FROM golang:1.20 +FROM golang:1.23 WORKDIR /go/src/github.com/fly-apps/fly-postgres COPY . . @@ -27,6 +27,7 @@ ENV AWS_SHARED_CREDENTIALS_FILE=/data/.aws/credentials ARG VERSION ARG PG_MAJOR_VERSION +ARG PG_VERSION ARG POSTGIS_MAJOR=3 ARG HAPROXY_VERSION=2.8 diff --git a/pg16/Dockerfile b/pg16/Dockerfile index b754f53e..77be11fd 100644 --- a/pg16/Dockerfile +++ b/pg16/Dockerfile @@ -2,7 +2,7 @@ ARG PG_VERSION=16.4 ARG PG_MAJOR_VERSION=16 ARG VERSION=custom -FROM golang:1.20 AS builder +FROM golang:1.23 AS builder WORKDIR /go/src/github.com/fly-apps/fly-postgres COPY . . @@ -23,6 +23,7 @@ FROM ubuntu:24.04 ARG VERSION ARG PG_MAJOR_VERSION +ARG PG_VERSION ARG POSTGIS_MAJOR=3 ARG HAPROXY_VERSION=2.8 ARG REPMGR_VERSION=5.4.1-1build2 diff --git a/pg16/Dockerfile-timescaledb b/pg16/Dockerfile-timescaledb index 07088c05..e72adb63 100644 --- a/pg16/Dockerfile-timescaledb +++ b/pg16/Dockerfile-timescaledb @@ -2,7 +2,7 @@ ARG PG_VERSION=16.4 ARG PG_MAJOR_VERSION=16 ARG VERSION=custom -FROM golang:1.20 AS builder +FROM golang:1.23 AS builder WORKDIR /go/src/github.com/fly-apps/fly-postgres COPY . . @@ -23,6 +23,7 @@ FROM ubuntu:24.04 ARG VERSION ARG PG_MAJOR_VERSION +ARG PG_VERSION ARG POSTGIS_MAJOR=3 ARG HAPROXY_VERSION=2.8 ARG REPMGR_VERSION=5.4.1-1build2