Skip to content

Commit

Permalink
cluster restart with simultaneous change of primary (major)
Browse files Browse the repository at this point in the history
* introduce "AIS_PRIMARY_EP" environment
  - either 'host[:port]' endpoint or http(s) URL
* remove "AIS_IS_SECONDARY" and "AIS_PRIMARY_ID" - not supported anymore
* rewrite part of the proxy's bootstrapping sequence
* RULES to determine that _this_ proxy is primary:
  - environment "AIS_PRIMARY_EP" and "AIS_IS_PRIMARY" take precedence
    (unconditionally and in that exact sequence);
  - next, loaded Smap
    (but it can be overridden by newer versions from other nodes);
  - finally, if none of the above applies, take into account cluster config
    (its "proxy" section).

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Apr 4, 2024
1 parent 9690d80 commit 91a5ea0
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 117 deletions.
33 changes: 33 additions & 0 deletions ais/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"errors"
"flag"
"fmt"
"net/url"
"os"
"runtime"
"strings"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/api/env"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/atomic"
"github.com/NVIDIA/aistore/cmn/cos"
Expand All @@ -35,6 +37,7 @@ type (
rg *rungroup
version string // major.minor.build (see cmd/aisnode)
buildTime string // YYYY-MM-DD HH:MM:SS-TZ
envPriURL string // env "AIS_PRIMARY_EP"
stopping atomic.Bool // true when exiting
resilver struct {
reason string // Reason why resilver needs to be run.
Expand Down Expand Up @@ -204,8 +207,38 @@ func initDaemon(version, buildTime string) cos.Runner {
// K8s
k8s.Init()

// declared xactions, as per xact/api.go
xreg.Init()

// primary 'host[:port]' endpoint or URL from the environment
if daemon.envPriURL = os.Getenv(env.AIS.PrimaryEP); daemon.envPriURL != "" {
scheme := "http"
if config.Net.HTTP.UseHTTPS {
scheme = "https"
}
if strings.Contains(daemon.envPriURL, "://") {
u, err := url.Parse(daemon.envPriURL)
if err != nil {
cos.ExitLogf("invalid environment %s=%s: %v", env.AIS.PrimaryEP, daemon.envPriURL, err)
}
if u.Path != "" && u.Path != "/" {
cos.ExitLogf("invalid environment %s=%s (not expecting path %q)",
env.AIS.PrimaryEP, daemon.envPriURL, u.Path)
}
// reassemble and compare
ustr := scheme + "://" + u.Hostname()
if port := u.Port(); port != "" {
ustr += ":" + port
}
if ustr != daemon.envPriURL {
nlog.Warningln("environment-set primary URL mismatch:", daemon.envPriURL, "vs", ustr)
daemon.envPriURL = ustr
}
} else {
daemon.envPriURL = scheme + "://" + daemon.envPriURL
}
}

// fork (proxy | target)
co := newConfigOwner(config)
if daemon.cli.role == apc.Proxy {
Expand Down
168 changes: 75 additions & 93 deletions ais/earlystart.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ const (
type (
bmds map[*meta.Snode]*bucketMD
smaps map[*meta.Snode]*smapX

// sourced from: (env, config, smap)
prim struct {
url string
isSmap bool // <-- loaded Smap
isCfg bool // <-- config.proxy.primary_url
isEP bool // <-- env AIS_PRIMARY_EP
isIs bool // <-- env AIS_IS_PRIMARY
}
)

// Background:
Expand All @@ -39,55 +48,33 @@ type (
// - Only the primary (leader) proxy distributes Smap updates to all other clustered nodes
// - Bootstrap sequence includes /steps/ intended to resolve all the usual conflicts that may arise.
func (p *proxy) bootstrap() {
var (
config = cmn.GCO.Get()
pid, primaryURL string
cii *cifl.Info
primary bool
)

// 1: load a local copy and try to utilize it for discovery
smap, reliable := p.loadSmap()
if !reliable {
smap = nil
nlog.Infoln(p.String() + ": starting without Smap")
} else {
nlog.Infoln(p.String()+": loaded", smap.StringEx())
}

// 2. make the preliminary/primary decision
pid, primary = p.determineRole(smap, config)
// 2. make preliminary _primary_ decision
config := cmn.GCO.Get()
prim := p.determineRole(smap, config)

// 3. primary?
if primary {
if pid != "" { // takes precedence over everything else
cos.Assert(pid == p.SID())
} else if reliable {
var cnt int // confirmation count
// double-check
if cii, cnt = p.bcastHealth(smap, true /*checkAll*/); cii != nil && cii.Smap.Version > smap.version() {
if cii.Smap.Primary.ID != p.SID() || cnt < maxVerConfirmations {
nlog.Warningf("%s: cannot assume the primary role: local %s < v%d(%s, cnt=%d)",
p.si, smap, cii.Smap.Version, cii.Smap.Primary.ID, cnt)
primary = false
primaryURL = cii.Smap.Primary.PubURL
} else {
nlog.Warningf("%s: proceeding as primary even though local %s < v%d(%s, cnt=%d)",
p.si, smap, cii.Smap.Version, cii.Smap.Primary.ID, cnt)
}
}
// 3: start as primary
if prim.isSmap || prim.isCfg || prim.isEP || prim.isIs {
var snow string
if prim.isSmap {
snow = " _for now_"
}
}

// 4.1: start as primary
if primary {
nlog.Infoln(p.String() + ": assuming primary role for now, starting up")
go p.primaryStartup(smap, config, daemon.cli.primary.ntargets)
nlog.Infof("%s: assuming primary role%s %+v", p, snow, prim)
go p.primaryStartup(smap, config, daemon.cli.primary.ntargets, prim)
return
}

// 4.2: otherwise, join as non-primary
// 4: otherwise, join as non-primary
nlog.Infoln(p.String() + ": starting up as non-primary")
err := p.secondaryStartup(smap, primaryURL)
err := p.secondaryStartup(smap, prim.url)
if err != nil {
if reliable {
svm := p.uncoverMeta(smap)
Expand All @@ -102,68 +89,55 @@ func (p *proxy) bootstrap() {
}
}

// - make the *primary* decision taking into account both environment and
// loaded Smap, if exists
// - handle AIS_PRIMARY_ID (TODO: target)
// - see also "change of mind"
func (p *proxy) determineRole(loadedSmap *smapX, config *cmn.Config) (pid string, primary bool) {
tag := "no Smap, "
if loadedSmap != nil {
loadedSmap.Pmap[p.SID()] = p.si
tag = loadedSmap.StringEx() + ", "
}
// parse env
envP := struct {
pid string
primary bool
secondary bool
}{
pid: os.Getenv(env.AIS.PrimaryID),
primary: cos.IsParseBool(os.Getenv(env.AIS.IsPrimary)),
secondary: cos.IsParseBool(os.Getenv(env.AIS.IsSecondary)),
}

if envP.primary && envP.secondary {
cos.ExitLogf("%s: both %s and %s cannot be true", p, env.AIS.IsPrimary, env.AIS.IsSecondary)
}
if envP.pid != "" && envP.primary && p.SID() != envP.pid {
cos.ExitLogf("%s: invalid combination of %s=true & %s=%s", p, env.AIS.IsPrimary, env.AIS.PrimaryID, envP.pid)
}
nlog.Infof("%s: %sprimary-env=%+v", p, tag, envP)

if loadedSmap != nil && envP.pid != "" {
primary := loadedSmap.GetProxy(envP.pid)
if primary == nil {
nlog.Errorf(
"%s: ignoring %s=%s - not found in the loaded %s",
p.si, env.AIS.IsPrimary, envP.pid, loadedSmap,
)
envP.pid = ""
} else if loadedSmap.Primary.ID() != envP.pid {
nlog.Warningf(
"%s: new %s=%s, previous %s",
p.si, env.AIS.PrimaryID, envP.pid, loadedSmap.Primary,
)
loadedSmap.Primary = primary
}
}

// NOTE: environment always takes precedence
// make the *primary* decision taking into account both the environment and loaded Smap, if exists
// cases 1 through 4 below (and see also "change of mind")
func (p *proxy) determineRole(smap *smapX /*loaded*/, config *cmn.Config) (prim prim) {
prim.isIs = cos.IsParseBool(os.Getenv(env.AIS.IsPrimary))
switch {
case envP.pid != "":
primary = envP.pid == p.SID()
pid = envP.pid
case envP.primary:
primary = true
case loadedSmap != nil && !envP.secondary:
primary = loadedSmap.isPrimary(p.si)
case daemon.envPriURL != "":
// 1. user override local Smap (if exists) via env-set primary URL
prim.isEP = daemon.envPriURL == p.si.URL(cmn.NetIntraControl) || daemon.envPriURL == p.si.URL(cmn.NetPublic)
if !prim.isEP {
prim.isEP = p.si.HasURL(daemon.envPriURL)
}
if prim.isIs && !prim.isEP {
nlog.Warningf("%s: invalid combination of '%s=true' vs '%s=%s'", p, env.AIS.IsPrimary,
env.AIS.PrimaryEP, daemon.envPriURL)
nlog.Warningln("proceeding as non-primary...")
}
if prim.isEP {
daemon.envPriURL = ""
} else {
prim.url = daemon.envPriURL
}
case prim.isIs:
// 2. TODO: needed for tests, consider removing
case smap != nil:
// 3. regular case: relying on local copy of Smap (double-checking its version though)
prim.isSmap = smap.isPrimary(p.si)
if prim.isSmap {
cii, cnt := p.bcastHealth(smap, true /*checkAll*/)
if cii != nil && cii.Smap.Version > smap.version() {
if cii.Smap.Primary.ID != p.SID() || cnt < maxVerConfirmations {
nlog.Warningf("%s: cannot assume the primary role: local %s < v%d(%s, cnt=%d)",
p.si, smap, cii.Smap.Version, cii.Smap.Primary.ID, cnt)
prim.isSmap = false
prim.url = cii.Smap.Primary.PubURL
} else {
nlog.Warningf("%s: proceeding as primary even though local %s < v%d(%s, cnt=%d)",
p.si, smap, cii.Smap.Version, cii.Smap.Primary.ID, cnt)
}
}
}
default:
primary = config.Proxy.PrimaryURL == p.si.URL(cmn.NetIntraControl) ||
// 4. initial deployment
prim.isCfg = config.Proxy.PrimaryURL == p.si.URL(cmn.NetIntraControl) ||
config.Proxy.PrimaryURL == p.si.URL(cmn.NetPublic)
if !primary {
primary = p.si.HasURL(config.Proxy.PrimaryURL)
if !prim.isCfg {
prim.isCfg = p.si.HasURL(config.Proxy.PrimaryURL)
}
}

return
}

Expand Down Expand Up @@ -207,7 +181,7 @@ func (p *proxy) secondaryStartup(smap *smapX, primaryURLs ...string) error {
// Proxy/gateway that is, potentially, the leader of the cluster.
// It waits a configured time for other nodes to join,
// discovers cluster-wide metadata, and resolve remaining conflicts.
func (p *proxy) primaryStartup(loadedSmap *smapX, config *cmn.Config, ntargets int) {
func (p *proxy) primaryStartup(loadedSmap *smapX, config *cmn.Config, ntargets int, prim prim) {
var (
smap = newSmap()
uuid, created string
Expand Down Expand Up @@ -272,6 +246,14 @@ func (p *proxy) primaryStartup(loadedSmap *smapX, config *cmn.Config, ntargets i
p.reg.mu.RUnlock()

uuid, created = smap.UUID, smap.CreationTime

// NOTE: all except prim.isSmap
if smap.Primary.ID() != p.SID() && (prim.isCfg || prim.isEP || prim.isIs) {
nlog.Warningf("%s: primary change from %s to self (based on %+v)", p, smap.Primary.StringEx(), prim)
smap.Primary = si
smap.Version += 50
}

p.owner.smap.put(smap)
p.owner.smap.mu.Unlock()

Expand Down
12 changes: 8 additions & 4 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,11 @@ func (h *htrun) loadSmap() (smap *smapX, reliable bool) {
return
}

// NOTE: not enforcing Snode's immutability - in particular, IPs that may change upon restart
// in certain environments
//
// NOTE: not enforcing Snode's immutability - in particular, IPs that may change upon restart in K8s
//
if _, err := smap.IsDupNet(h.si); err != nil {
nlog.Errorln(err)
return
nlog.Warningln(err, "- proceeding with the loaded", smap.String(), "anyway...")
}
reliable = true
return
Expand Down Expand Up @@ -1762,6 +1762,10 @@ func (h *htrun) join(query url.Values, htext htext, contactURLs ...string) (res
)
debug.Assert(pubValid && intraValid)

// env goes first
if daemon.envPriURL != "" {
candidates = _addCan(daemon.envPriURL, selfPublicURL.Host, selfIntraURL.Host, candidates)
}
primaryURL, psi := h.getPrimaryURLAndSI(nil, config)
candidates = _addCan(primaryURL, selfPublicURL.Host, selfIntraURL.Host, candidates)
if psi != nil {
Expand Down
14 changes: 9 additions & 5 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,12 @@ func (p *proxy) Run() error {
}
p.regNetHandlers(networkHandlers)

nlog.Infof("%s: [%s net] listening on: %s", p, cmn.NetPublic, p.si.PubNet.URL)
nlog.Infoln(cmn.NetPublic+":", "\t\t", p.si.PubNet.URL)
if p.si.PubNet.URL != p.si.ControlNet.URL {
nlog.Infof("%s: [%s net] listening on: %s", p, cmn.NetIntraControl, p.si.ControlNet.URL)
nlog.Infoln(cmn.NetIntraControl+":", "\t", p.si.ControlNet.URL)
}
if p.si.PubNet.URL != p.si.DataNet.URL {
nlog.Infof("%s: [%s net] listening on: %s", p, cmn.NetIntraData, p.si.DataNet.URL)
nlog.Infoln(cmn.NetIntraData+":", "\t", p.si.DataNet.URL)
}

dsort.Pinit(p, config)
Expand Down Expand Up @@ -993,7 +993,11 @@ func (p *proxy) healthHandler(w http.ResponseWriter, r *http.Request) {
}
smap := p.owner.smap.get()
if err := smap.validate(); err != nil {
p.writeErr(w, r, err, http.StatusServiceUnavailable)
if !p.ClusterStarted() {
w.WriteHeader(http.StatusServiceUnavailable)
} else {
p.writeErr(w, r, err, http.StatusServiceUnavailable)
}
return
}

Expand Down Expand Up @@ -3109,7 +3113,7 @@ func (p *proxy) receiveRMD(newRMD *rebMD, msg *aisMsg, caller string) (err error

// Register `nl` for rebalance/resilver
smap := p.owner.smap.get()
if smap.IsIC(p.si) && smap.CountActiveTs() > 0 {
if smap.IsIC(p.si) && smap.CountActiveTs() > 0 && (smap.IsPrimary(p.si) || p.ClusterStarted()) {
nl := xact.NewXactNL(xact.RebID2S(newRMD.Version), apc.ActRebalance, &smap.Smap, nil)
nl.SetOwner(equalIC)
err := p.notifs.add(nl)
Expand Down
16 changes: 7 additions & 9 deletions api/env/ais.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ package env

var (
AIS = struct {
Endpoint string
IsPrimary string
IsSecondary string
PrimaryID string
UseHTTPS string
Endpoint string
IsPrimary string
PrimaryEP string
UseHTTPS string
// TLS: client side
Certificate string
CertKey string
Expand All @@ -34,10 +33,9 @@ var (
K8sNamespace string
}{
// the way to designate primary when cluster's starting up
Endpoint: "AIS_ENDPOINT",
IsPrimary: "AIS_IS_PRIMARY",
IsSecondary: "AIS_IS_SECONDARY",
PrimaryID: "AIS_PRIMARY_ID",
Endpoint: "AIS_ENDPOINT",
IsPrimary: "AIS_IS_PRIMARY",
PrimaryEP: "AIS_PRIMARY_EP",

// false: HTTP transport, with all the TLS config (below) ignored
// true: HTTPS/TLS
Expand Down
5 changes: 5 additions & 0 deletions cmd/cli/cli/daeclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/NVIDIA/aistore/cmd/cli/teb"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/core/meta"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -104,6 +105,10 @@ outer:
}
}
}
if avail == 0 {
debug.Assert(num == 0)
return 0, ""
}

pctUsed := used * 100 / (used + avail)
if pctUsed > 60 {
Expand Down
Loading

0 comments on commit 91a5ea0

Please sign in to comment.