Skip to content

Commit

Permalink
Node name fix (#268)
Browse files Browse the repository at this point in the history
* Ensure implementation is migration friendly

* fix

* Set go version in dockerfiles

* potential fix

* Adjusting how Zombie state is evaluated on boot.

* Fix unregistration err condition

* Not used anymore

* cleanup unregistration logic

* Fix err check

* Move this back to custom

* Remove todo

---------

Co-authored-by: Ben Iofel <[email protected]>
  • Loading branch information
davissp14 and Ben Iofel authored Jan 17, 2025
1 parent 31c3a48 commit 4d426fb
Show file tree
Hide file tree
Showing 15 changed files with 281 additions and 84 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.20'
go-version: '1.23'
go-version-file: 'go.mod'
cache: true

Expand All @@ -36,7 +36,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.20'
go-version: '1.23'
go-version-file: 'go.mod'
cache: true

Expand All @@ -52,7 +52,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.20'
go-version: '1.23'
go-version-file: 'go.mod'
cache: true

Expand All @@ -68,7 +68,7 @@ jobs:

- uses: actions/setup-go@v3
with:
go-version: '1.20'
go-version: '1.23'
go-version-file: 'go.mod'
cache: true

Expand Down
9 changes: 8 additions & 1 deletion bin/restart-repmgrd
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
#!/bin/bash

kill `cat /tmp/repmgrd.pid`
if [ -f /tmp/repmgrd.pid ]; then
PID=$(cat /tmp/repmgrd.pid)

# Check if the process is running
if ps -p $PID > /dev/null 2>&1; then
kill $PID
fi
fi
49 changes: 40 additions & 9 deletions cmd/pg_unregister/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func main() {

func processUnregistration(ctx context.Context) error {
encodedArg := os.Args[1]
hostnameBytes, err := base64.StdEncoding.DecodeString(encodedArg)
machineBytes, err := base64.StdEncoding.DecodeString(encodedArg)
if err != nil {
return fmt.Errorf("failed to decode hostname: %v", err)
return fmt.Errorf("failed to decode machine: %v", err)
}

node, err := flypg.NewNode()
Expand All @@ -43,19 +43,50 @@ func processUnregistration(ctx context.Context) error {
}
defer func() { _ = conn.Close(ctx) }()

member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes))
machineID := string(machineBytes)

if len(machineID) != 14 {
return fmt.Errorf("invalid machine id: %s", machineID)
}

member, err := node.RepMgr.MemberByNodeName(ctx, conn, machineID)
if err != nil {
return fmt.Errorf("failed to resolve member: %s", err)
// If no rows are found, the member was either already unregistered or never registered
if err != pgx.ErrNoRows {
return fmt.Errorf("failed to resolve member using %s: %s", machineID, err)
}
}

// If the member exists unregister it and remove the replication slot
if member != nil {
if err := node.RepMgr.UnregisterMember(*member); err != nil {
return fmt.Errorf("failed to unregister member: %v", err)
}

slotName := fmt.Sprintf("repmgr_slot_%d", member.ID)
if err := removeReplicationSlot(ctx, conn, slotName); err != nil {
return fmt.Errorf("failed to remove replication slot: %v", err)
}
}

if err := node.RepMgr.UnregisterMember(*member); err != nil {
return fmt.Errorf("failed to unregister member: %v", err)
// Redirect logs to /dev/null temporarily so we don't pollute the response data.
devnull, err := os.Open(os.DevNull)
if err != nil {
return fmt.Errorf("failed to open /dev/null: %v", err)
}
defer func() { _ = devnull.Close() }()

// Save the original log output
originalLogOutput := log.Writer()

// Redirect logs to /dev/null
log.SetOutput(devnull)

slotName := fmt.Sprintf("repmgr_slot_%d", member.ID)
if err := removeReplicationSlot(ctx, conn, slotName); err != nil {
return err
if err := flypg.EvaluateClusterState(ctx, conn, node); err != nil {
return fmt.Errorf("failed to evaluate cluster state: %v", err)
}
// Restore the original log output
log.SetOutput(originalLogOutput)

return nil
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
module github.com/fly-apps/postgres-flex

go 1.20
go 1.23

require (
github.com/go-chi/chi/v5 v5.0.8
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/hashicorp/consul/api v1.18.0
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgx/v5 v5.5.4
github.com/olekukonko/tablewriter v0.0.5
github.com/pkg/errors v0.9.1
github.com/pkg/term v1.1.0
github.com/spf13/cobra v1.8.1
github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2
golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3
golang.org/x/sync v0.1.0
Expand All @@ -36,8 +38,6 @@ require (
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXO
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc=
github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag=
Expand All @@ -78,6 +79,7 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZ
github.com/jackc/pgx/v5 v5.5.4 h1:Xp2aQS8uXButQdnCMWNmvx6UysWQQC+u1EoizjguY+8=
github.com/jackc/pgx/v5 v5.5.4/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down Expand Up @@ -139,6 +141,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2 h1:mZNiJSrmbQA/3+Vy8GLL/Q9qdHxPzjcxKv+E14GZLFs=
github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2/go.mod h1:BbqpB4y6Z/cijQqKWJ3i8LMsAoC29gzX6vsSD3Qq7uw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand All @@ -154,6 +157,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
Expand Down
90 changes: 76 additions & 14 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/fly-apps/postgres-flex/internal/privnet"
"github.com/fly-apps/postgres-flex/internal/utils"
"github.com/jackc/pgx/v5"
"golang.org/x/exp/slices"
)

type Node struct {
AppName string
MachineID string
PrivateIP string
PrimaryRegion string
DataDir string
Expand Down Expand Up @@ -52,6 +54,8 @@ func NewNode() (*Node, error) {

node.PrivateIP = ipv6.String()

node.MachineID = os.Getenv("FLY_MACHINE_ID")

node.PrimaryRegion = os.Getenv("PRIMARY_REGION")
if node.PrimaryRegion == "" {
return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set")
Expand Down Expand Up @@ -88,7 +92,9 @@ func NewNode() (*Node, error) {
UserConfigPath: "/data/repmgr.user.conf",
PasswordConfigPath: "/data/.pgpass",
DataDir: node.DataDir,
HostName: node.Hostname(),
PrivateIP: node.PrivateIP,
MachineID: node.MachineID,
Port: 5433,
DatabaseName: "repmgr",
Credentials: node.ReplCredentials,
Expand Down Expand Up @@ -182,7 +188,7 @@ func (n *Node) Init(ctx context.Context) error {
}
} else {
log.Println("Provisioning standby")
cloneTarget, err := n.RepMgr.ResolveMemberOverDNS(ctx)
cloneTarget, err := n.RepMgr.ResolvePrimaryOverDNS(ctx)
if err != nil {
return fmt.Errorf("failed to resolve member over dns: %s", err)
}
Expand Down Expand Up @@ -225,14 +231,6 @@ func (n *Node) Init(ctx context.Context) error {

// PostInit are operations that need to be executed against a running Postgres on boot.
func (n *Node) PostInit(ctx context.Context) error {
if ZombieLockExists() {
log.Println("[ERROR] Manual intervention required.")
log.Println("[ERROR] If a new primary has been established, consider adding a new replica with `fly machines clone <primary-machine-id>` and then remove this member.")
log.Println("[ERROR] Sleeping for 5 minutes.")
time.Sleep(5 * time.Minute)
return fmt.Errorf("unrecoverable zombie")
}

// Use the Postgres user on boot, since our internal user may not have been created yet.
conn, err := n.NewLocalConnection(ctx, "postgres", n.OperatorCredentials)
if err != nil {
Expand Down Expand Up @@ -265,7 +263,7 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to resolve member role: %s", err)
}

// Restart repmgrd in the event the IP changes for an already registered node.
// Restart repmgrd in the event the machine ID changes for an already registered node.
// This can happen if the underlying volume is moved to a different node.
daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)

Expand All @@ -291,14 +289,22 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
}

// This should never happen
if primary != n.PrivateIP {
// This should never happen, but check anyways for correctness
if primary != n.Hostname() {
return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen",
primary,
n.PrivateIP,
n.Hostname(),
)
}

// Clear the zombie lock if it exists.
if ZombieLockExists() {
log.Println("[INFO] Clearing zombie lock and re-enabling read/write")
if err := RemoveZombieLock(); err != nil {
return fmt.Errorf("failed to remove zombie lock: %s", err)
}
}

// Re-register primary to apply any configuration changes.
if err := n.RepMgr.registerPrimary(daemonRestartRequired); err != nil {
return fmt.Errorf("failed to re-register existing primary: %s", err)
Expand All @@ -311,6 +317,10 @@ func (n *Node) PostInit(ctx context.Context) error {
}
}
case StandbyRoleName:
if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil {
return fmt.Errorf("failed to migrate node name: %s", err)
}

// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
return fmt.Errorf("failed to register existing standby: %s", err)
Expand Down Expand Up @@ -399,7 +409,7 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to enable repmgr: %s", err)
}

primary, err := n.RepMgr.ResolveMemberOverDNS(ctx)
primary, err := n.RepMgr.ResolvePrimaryOverDNS(ctx)
if err != nil {
return fmt.Errorf("failed to resolve primary member: %s", err)
}
Expand Down Expand Up @@ -527,3 +537,55 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro

return nil
}

// migrate node name from 6pn to machine ID if needed
func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error {
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
}

primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.Hostname)
if err != nil {
return fmt.Errorf("failed to establish connection to primary: %s", err)
}
defer func() { _ = primaryConn.Close(ctx) }()

rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication")
if err != nil {
return fmt.Errorf("failed to query pg_stat_replication: %s", err)
}
defer rows.Close()

var applicationNames []string
for rows.Next() {
var applicationName string
if err := rows.Scan(&applicationName); err != nil {
return fmt.Errorf("failed to scan application_name: %s", err)
}
applicationNames = append(applicationNames, applicationName)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate over rows: %s", err)
}

// if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql
if slices.Contains(applicationNames, n.PrivateIP) {
log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...")

if err := n.RepMgr.regenReplicationConf(ctx); err != nil {
return fmt.Errorf("failed to clone standby: %s", err)
}

if err := admin.ReloadPostgresConfig(ctx, repConn); err != nil {
return fmt.Errorf("failed to reload postgresql: %s", err)
}
}

return nil
}

// Hostname returns the hostname of the node.
func (n *Node) Hostname() string {
return fmt.Sprintf("%s.vm.%s.internal", n.MachineID, n.AppName)
}
4 changes: 2 additions & 2 deletions internal/flypg/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {

for _, member := range members {
if member.Role == PrimaryRoleName {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target)
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, target)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err)
Expand All @@ -85,7 +85,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {
}

for _, member := range members {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint)
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, RestartHaproxyEndpoint)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err)
Expand Down
Loading

0 comments on commit 4d426fb

Please sign in to comment.