From e7dab0e3df8111dc044c03d5847322bd6abe0889 Mon Sep 17 00:00:00 2001 From: blueblue Date: Wed, 29 Nov 2017 17:59:06 +0800 Subject: [PATCH 1/2] Refactor leader-elect related code --- go/cmd/doorman/doorman_server.go | 4 +- go/server/election/election.go | 137 ---------------------------- go/server/election/etcd.go | 147 +++++++++++++++++++++++++++++++ go/server/election/trivial.go | 49 +++++++++++ 4 files changed, 198 insertions(+), 139 deletions(-) create mode 100644 go/server/election/etcd.go create mode 100644 go/server/election/trivial.go diff --git a/go/cmd/doorman/doorman_server.go b/go/cmd/doorman/doorman_server.go index f346bba..0cd4310 100644 --- a/go/cmd/doorman/doorman_server.go +++ b/go/cmd/doorman/doorman_server.go @@ -65,7 +65,7 @@ var ( keyFile = flag.String("key_file", "", "The TLS key file") etcdEndpoints = flag.String("etcd_endpoints", "", "comma separated list of etcd endpoints") - masterDelay = flag.Duration("master_delay", 10*time.Second, "delay in master elections") + masterLease = flag.Duration("master_lease", 10*time.Second, "lease in master elections") masterElectionLock = flag.String("master_election_lock", "", "etcd path for the master election or empty for no master election") ) @@ -154,7 +154,7 @@ func main() { log.Exit("-etcd_endpoints cannot be empty if -master_election_lock is provided") } - masterElection = election.Etcd(etcdEndpointsSlice, *masterElectionLock, *masterDelay) + masterElection = election.NewEtcdElection(etcdEndpointsSlice, *masterElectionLock, *masterLease) } else { masterElection = election.Trivial() } diff --git a/go/server/election/election.go b/go/server/election/election.go index bdd32f2..5245d1b 100644 --- a/go/server/election/election.go +++ b/go/server/election/election.go @@ -17,11 +17,6 @@ package election import ( - "fmt" - "time" - - "github.com/coreos/etcd/client" - log "github.com/golang/glog" "golang.org/x/net/context" ) @@ -38,135 +33,3 @@ type Election interface { // master is currently unknown. Current() chan string } - -type trivial struct { - isMaster chan bool - current chan string -} - -// Trivial returns a trivial election: the participant immediately -// wins. Use this if you need the election interface, but don't really -// care about the master election (eg. you'll never have more than one -// candidate). -func Trivial() Election { - return &trivial{ - isMaster: make(chan bool, 1), - current: make(chan string, 1), - } -} - -func (e *trivial) String() string { - return "no election, acting as the master" -} - -func (e *trivial) Current() chan string { - return e.current -} - -func (e *trivial) IsMaster() chan bool { - return e.isMaster -} - -func (e *trivial) Run(_ context.Context, id string) error { - e.isMaster <- true - e.current <- id - return nil -} - -type etcd struct { - endpoints []string - delay time.Duration - isMaster chan bool - lock string - current chan string -} - -// Etcd returns an etcd based master election (endpoints are used to -// connect to the etcd cluster). The participants synchronize on lock, -// and the master has delay time to renew its lease. Higher values of -// delay may lead to more stable mastership at the cost of potentially -// longer periods without any master. -func Etcd(endpoints []string, lock string, delay time.Duration) Election { - return &etcd{ - endpoints: endpoints, - isMaster: make(chan bool), - current: make(chan string), - delay: delay, - lock: lock, - } -} - -func (e *etcd) String() string { - return fmt.Sprintf("etcd lock: %v (endpoints: %v)", e.lock, e.endpoints) -} - -func (e *etcd) Current() chan string { - return e.current -} - -func (e *etcd) IsMaster() chan bool { - return e.isMaster -} - -func (e *etcd) Run(ctx context.Context, id string) error { - c, err := client.New(client.Config{Endpoints: e.endpoints}) - if err != nil { - return err - } - log.V(2).Infof("connected to etcd at %v", e.endpoints) - kapi := client.NewKeysAPI(c) - - go func() { - w := kapi.Watcher(e.lock, nil) - log.V(2).Infof("watching %v for master updates", e.lock) - var last string - for { - r, err := w.Next(ctx) - if err != nil { - if !client.IsKeyNotFound(err) { - log.Errorf("Failed receiving new master: %v", err) - } - e.current <- "" - time.Sleep(e.delay) - continue - } - log.V(2).Infof("received master update: %v", r) - if last != r.Node.Value { - last = r.Node.Value - e.current <- r.Node.Value - } - } - }() - - go func() { - for { - log.V(2).Infof("trying to become master at %v", e.lock) - if _, err := kapi.Set(ctx, e.lock, id, &client.SetOptions{ - TTL: e.delay, - PrevExist: client.PrevNoExist, - }); err != nil { - log.V(2).Infof("failed becoming the master, retrying in %v: %v", e.delay, err) - time.Sleep(e.delay) - continue - } - e.isMaster <- true - log.V(2).Info("Became master at %v as %v.", e.lock, id) - for { - time.Sleep(e.delay / 3) - log.V(2).Infof("Renewing mastership lease at %v as %v", e.lock, id) - _, err := kapi.Set(ctx, e.lock, id, &client.SetOptions{ - TTL: e.delay, - PrevExist: client.PrevExist, - PrevValue: id, - }) - - if err != nil { - log.V(2).Info("lost mastership") - e.isMaster <- false - break - } - } - } - }() - return nil -} diff --git a/go/server/election/etcd.go b/go/server/election/etcd.go new file mode 100644 index 0000000..0d0f3a9 --- /dev/null +++ b/go/server/election/etcd.go @@ -0,0 +1,147 @@ +// Copyright 2016 Google, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package election + +import ( + "fmt" + "github.com/coreos/etcd/client" + log "github.com/golang/glog" + "golang.org/x/net/context" + "time" +) + +type etcdElection struct { + endpoints []string + client client.KeysAPI + + lease time.Duration + lock string + isMaster chan bool + current chan string +} + +var _ Election = &etcdElection{} + +// NewEtcdElection returns an etcd based master election (endpoints are used to +// connect to the etcd cluster). The participants synchronize on lock, +// and the master has delay time to renew its lease. Higher values of +// delay may lead to more stable mastership at the cost of potentially +// longer periods without any master. +func NewEtcdElection(endpoints []string, lock string, lease time.Duration) Election { + return &etcdElection{ + endpoints: endpoints, + isMaster: make(chan bool), + current: make(chan string), + lease: lease, + lock: lock, + } +} + +func (e *etcdElection) String() string { + return fmt.Sprintf("etcd lock: %v (endpoints: %v)", e.lock, e.endpoints) +} + +func (e *etcdElection) Current() chan string { + return e.current +} + +func (e *etcdElection) IsMaster() chan bool { + return e.isMaster +} + +func (e *etcdElection) Run(ctx context.Context, id string) error { + err := e.init() + if err != nil { + return err + } + + go e.refreshMastership(ctx) + go e.campaignAndRenew(ctx, id) + return nil +} + +// +func (e *etcdElection) init() error { + c, err := client.New(client.Config{Endpoints: e.endpoints}) + if err != nil { + return err + } + log.V(2).Infof("connected to etcd at %v", e.endpoints) + e.client = client.NewKeysAPI(c) + return nil +} + +// +func (e *etcdElection) refreshMastership(ctx context.Context) { + w := e.client.Watcher(e.lock, nil) + log.V(2).Infof("watching %v for master updates", e.lock) + var last string + for { + r, err := w.Next(ctx) + if err != nil { + if !client.IsKeyNotFound(err) { + log.Errorf("Failed receiving new master: %v", err) + } + e.current <- "" + time.Sleep(e.lease) + continue + } + log.V(2).Infof("received master update: %v", r) + if last != r.Node.Value { + last = r.Node.Value + e.current <- r.Node.Value + } + } +} + +// +func (e *etcdElection) campaignAndRenew(ctx context.Context, id string) { + for { + log.V(2).Infof("trying to become master at %v", e.lock) + if _, err := e.client.Set(ctx, e.lock, id, &client.SetOptions{ + TTL: e.lease, + PrevExist: client.PrevNoExist, + }); err != nil { + log.V(2).Infof("failed becoming the master, retrying in %v: %v", e.lease, err) + time.Sleep(e.lease) + continue + } + e.isMaster <- true + log.V(2).Info("Became master at %v as %v.", e.lock, id) + for { + time.Sleep(e.lease / 5) + log.V(2).Infof("Renewing mastership lease at %v as %v", e.lock, id) + _, err := e.client.Set(ctx, e.lock, id, &client.SetOptions{ + TTL: e.lease, + PrevExist: client.PrevExist, + PrevValue: id, + }) + + for retry := 0; retry < 2; retry++ { + _, err = e.client.Set(ctx, e.lock, id, &client.SetOptions{ + TTL: e.lease, + PrevExist: client.PrevExist, + PrevValue: id, + }) + } + + if err != nil { + log.V(2).Info("lost mastership") + e.isMaster <- false + break + } + } + } +} diff --git a/go/server/election/trivial.go b/go/server/election/trivial.go new file mode 100644 index 0000000..888dc66 --- /dev/null +++ b/go/server/election/trivial.go @@ -0,0 +1,49 @@ +// Copyright 2016 Google, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package election + +type trivial struct { + isMaster chan bool + current chan string +} + +// Trivial returns a trivial election: the participant immediately +// wins. Use this if you need the election interface, but don't really +// care about the master election (eg. you'll never have more than one +// candidate). +func Trivial() Election { + return &trivial{ + isMaster: make(chan bool, 1), + current: make(chan string, 1), + } +} + +func (e *trivial) String() string { + return "no election, acting as the master" +} + +func (e *trivial) Current() chan string { + return e.current +} + +func (e *trivial) IsMaster() chan bool { + return e.isMaster +} + +func (e *trivial) Run(_ context.Context, id string) error { + e.isMaster <- true + e.current <- id + return nil +} From 6465ad2cbcf2021a7cb788aa9c1b4e337f3177e5 Mon Sep 17 00:00:00 2001 From: blueblue Date: Thu, 30 Nov 2017 17:01:25 +0800 Subject: [PATCH 2/2] Refactor doorman server code and repaire client-connect bug --- go/cmd/doorman/doorman_server.go | 229 +---------------------- go/cmd/doorman/option/options.go | 178 ++++++++++++++++++ go/configuration/configuration.go | 9 +- go/connection/connection.go | 45 ++++- go/{flagenv => flagutil}/flagenv.go | 7 +- go/{flagenv => flagutil}/flagenv_test.go | 2 +- go/{cmd => server}/doorman/resourcez.go | 18 +- go/server/doorman/server.go | 127 +++++++++++-- go/server/election/trivial.go | 2 + vendor/github.com/spf13/pflag | 1 + 10 files changed, 358 insertions(+), 260 deletions(-) create mode 100644 go/cmd/doorman/option/options.go rename go/{flagenv => flagutil}/flagenv.go (90%) rename go/{flagenv => flagutil}/flagenv_test.go (97%) rename go/{cmd => server}/doorman/resourcez.go (91%) create mode 160000 vendor/github.com/spf13/pflag diff --git a/go/cmd/doorman/doorman_server.go b/go/cmd/doorman/doorman_server.go index 0cd4310..80591c5 100644 --- a/go/cmd/doorman/doorman_server.go +++ b/go/cmd/doorman/doorman_server.go @@ -15,234 +15,21 @@ package main import ( - "flag" "fmt" - "net" - "net/http" - "os" - "strings" - "time" - - "golang.org/x/net/context" - - log "github.com/golang/glog" - "github.com/prometheus/client_golang/prometheus" - rpc "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - - "github.com/ghodss/yaml" - "github.com/youtube/doorman/go/configuration" - "github.com/youtube/doorman/go/connection" - "github.com/youtube/doorman/go/flagenv" + "github.com/spf13/pflag" + "github.com/youtube/doorman/go/cmd/doorman/option" "github.com/youtube/doorman/go/server/doorman" - "github.com/youtube/doorman/go/server/election" - "github.com/youtube/doorman/go/status" - - pb "github.com/youtube/doorman/proto/doorman" - - _ "expvar" - _ "net/http/pprof" -) - -var ( - port = flag.Int("port", 0, "port to bind to") - // FIXME(ryszard): As of Jan 21, 2016 it's impossible to serve - // both RPC and HTTP traffic on the same port. This should be - // fixed by grpc/grpc-go#75. When that happens, remove - // debugPort. - debugPort = flag.Int("debug_port", 8081, "port to bind for HTTP debug info") - serverRole = flag.String("server_role", "root", "Role of this server in the server tree") - parent = flag.String("parent", "", "Address of the parent server which this server connects to") - hostname = flag.String("hostname", "", "Use this as the hostname (if empty, use whatever the kernel reports") - config = flag.String("config", "", "source to load the config from (text protobufs)") - - rpcDialTimeout = flag.Duration("doorman_rpc_dial_timeout", 5*time.Second, "timeout to use for connecting to the doorman server") - - minimumRefreshInterval = flag.Duration("doorman_minimum_refresh_interval", 5*time.Second, "minimum refresh interval") - - tls = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP") - certFile = flag.String("cert_file", "", "The TLS cert file") - keyFile = flag.String("key_file", "", "The TLS key file") - - etcdEndpoints = flag.String("etcd_endpoints", "", "comma separated list of etcd endpoints") - masterLease = flag.Duration("master_lease", 10*time.Second, "lease in master elections") - masterElectionLock = flag.String("master_election_lock", "", "etcd path for the master election or empty for no master election") -) - -var ( - statusz = ` -

Mastership

-

-{{if .IsMaster}} - This is the master. -{{else}} -This is not the master. - {{with .CurrentMaster}} - The current master is {{.}} - {{else}} - The current master is unknown. - {{end}} -{{end}} -

-{{with .Election}}{{.}}{{end}} - -

Resources

-{{ with .Resources }} - - - - - - - - - - - - - {{range .}} - - - - - - - - - {{end}} -
IDCapacitySumHasSumWantsClientsLearningAlgorithm
{{.ID}}{{.Capacity}}{{.SumHas}}{{.SumWants}}{{.Count}}{{.InLearningMode}} - {{.Algorithm}}
-{{else}} -No resources in the store. -{{end}} - -

Configuration

-
{{.Config}}
-` + "os" ) -// getServerID returns a unique server id, consisting of a host:port id. -func getServerID(port int) string { - if *hostname != "" { - return fmt.Sprintf("%s:%d", *hostname, port) - } - hn, err := os.Hostname() - - if err != nil { - hn = "unknown.localhost" - } - - return fmt.Sprintf("%s:%d", hn, port) -} - func main() { - flag.Parse() - if err := flagenv.Populate(flag.CommandLine, "DOORMAN"); err != nil { - log.Exit(err) - } - - if *config == "" { - log.Exit("--config cannot be empty") - } - var ( - etcdEndpointsSlice = strings.Split(*etcdEndpoints, ",") - masterElection election.Election - ) - if *masterElectionLock != "" { - - if len(etcdEndpointsSlice) == 1 && etcdEndpointsSlice[0] == "" { - log.Exit("-etcd_endpoints cannot be empty if -master_election_lock is provided") - } - - masterElection = election.NewEtcdElection(etcdEndpointsSlice, *masterElectionLock, *masterLease) - } else { - masterElection = election.Trivial() - } - dm, err := doorman.New(context.Background(), getServerID(*port), *parent, masterElection, - connection.MinimumRefreshInterval(*minimumRefreshInterval), - connection.DialOpts( - rpc.WithTimeout(*rpcDialTimeout))) - if err != nil { - log.Exitf("doorman.NewIntermediate: %v", err) - } - - var opts []rpc.ServerOption - if *tls { - log.Infof("Loading credentials from %v and %v.", *certFile, *keyFile) - creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile) - if err != nil { - log.Exitf("Failed to generate credentials %v", err) - } - opts = []rpc.ServerOption{rpc.Creds(creds)} - } - server := rpc.NewServer(opts...) - - pb.RegisterCapacityServer(server, dm) - - if *config == "" { - log.Exit("-config cannot be empty") - } + config := option.NewServerConfiguration() + config.InitServerFlags(pflag.CommandLine) - var cfg configuration.Source - kind, path := configuration.ParseSource(*config) - switch { - case kind == "file": - cfg = configuration.LocalFile(path) - case kind == "etcd": - if len(etcdEndpointsSlice) == 1 && etcdEndpointsSlice[0] == "" { - log.Exit("-etcd_endpoints cannot be empty if a config source etcd is provided") - } - cfg = configuration.Etcd(path, etcdEndpointsSlice) - default: - panic("unreachable") + if err := doorman.Run(config); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) } - // Try to load the background. If there's a problem with loading - // the server for the first time, the server will keep running, - // but will not serve traffic. - go func() { - for { - data, err := cfg(context.Background()) - if err != nil { - log.Errorf("cannot load config data: %v", err) - continue - } - cfg := new(pb.ResourceRepository) - if err := yaml.Unmarshal(data, cfg); err != nil { - log.Errorf("cannot unmarshal config data: %q", data) - continue - } - - if err := dm.LoadConfig(context.Background(), cfg, map[string]*time.Time{}); err != nil { - log.Errorf("cannot load config: %v", err) - } - } - }() - - status.AddStatusPart("Doorman", statusz, func(context.Context) interface{} { return dm.Status() }) - - // Redirect form / to /debug/status. - http.Handle("/", http.RedirectHandler("/debug/status", http.StatusMovedPermanently)) - AddServer(dm) - - http.Handle("/metrics", prometheus.Handler()) - - go http.ListenAndServe(fmt.Sprintf(":%v", *debugPort), nil) - - // Waits for the server to get its initial configuration. This guarantees that - // the server will never run without a valid configuration. - log.Info("Waiting for the server to be configured...") - dm.WaitUntilConfigured() - - // Runs the server. - log.Info("Server is configured, ready to go!") - - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) - if err != nil { - log.Exit(err) - } - - server.Serve(lis) - } diff --git a/go/cmd/doorman/option/options.go b/go/cmd/doorman/option/options.go new file mode 100644 index 0000000..ce2a36b --- /dev/null +++ b/go/cmd/doorman/option/options.go @@ -0,0 +1,178 @@ +// Copyright 2016 Google, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package option + +import ( + "flag" + "time" + + _ "expvar" + "fmt" + log "github.com/golang/glog" + "github.com/spf13/pflag" + "github.com/youtube/doorman/go/flagutil" + _ "net/http/pprof" + "os" +) + +var ( + Statusz = ` +

Mastership

+

+{{if .IsMaster}} + This is the master. +{{else}} +This is not the master. + {{with .CurrentMaster}} + The current master is {{.}} + {{else}} + The current master is unknown. + {{end}} +{{end}} +

+{{with .Election}}{{.}}{{end}} + +

Resources

+{{ with .Resources }} + + + + + + + + + + + + + {{range .}} + + + + + + + + + {{end}} +
IDCapacitySumHasSumWantsClientsLearningAlgorithm
{{.ID}}{{.Capacity}}{{.SumHas}}{{.SumWants}}{{.Count}}{{.InLearningMode}} + {{.Algorithm}}
+{{else}} +No resources in the store. +{{end}} + +

Configuration

+
{{.Config}}
+` +) + +type ServerConfiguration struct { + Port int + DebugPort int + ServerRole string + + HostName string + Config string + + RpcDialTimeout time.Duration + MinRefreshInterval time.Duration + + *SecureConfig + + MasterLease time.Duration + MasterElectionLock string + + EtcdEndpoints []string + ParentServers []string +} + +type SecureConfig struct { + EnableTls bool + CertFile string + KeyFile string +} + +func NewServerConfiguration() *ServerConfiguration { + return &ServerConfiguration{ + Port: 5101, + DebugPort: 5151, + + ServerRole: "root", + + HostName: os.Hostname(), + Config: "/etc/doorman/config.yml", + + RpcDialTimeout: 5 * time.Second, + MinRefreshInterval: 5 * time.Second, + + SecureConfig: &SecureConfig{ + EnableTls: false, + }, + + MasterLease: 10 * time.Second, + MasterElectionLock: "", + } +} + +func (sc *ServerConfiguration) InitServerFlags(fs *pflag.FlagSet) { + + fs.IntVar(&sc.Port,"port",sc.Port,"port to bind to") + + // FIXME(ryszard): As of Jan 21, 2016 it's impossible to serve + // both RPC and HTTP traffic on the same port. This should be + // fixed by grpc/grpc-go#75. When that happens, remove + // debugPort. + fs.IntVar(&sc.DebugPort,"debug_port",sc.DebugPort,"port to bind for HTTP debug info") + fs.StringVar(&sc.ServerRole,"server_role",sc.ServerRole,"Role of this server in the server tree") + + fs.StringVar(&sc.HostName,"hostname",sc.HostName,"Use this as the hostname (if empty, use whatever the kernel reports") + fs.StringVar(&sc.Config,"config",sc.Config,"source to load the config from (text protobufs)") + + fs.DurationVar(&sc.RpcDialTimeout,"doorman_rpc_dial_timeout",sc.RpcDialTimeout,"timeout to use for connecting to the doorman server") + fs.DurationVar(&sc.MinRefreshInterval,"doorman_minimum_refresh_interval",sc.MinRefreshInterval,"minimum refresh interval") + + fs.BoolVar(&sc.EnableTls,"tls",sc.EnableTls,"Connection uses TLS if true, else plain TCP") + fs.StringVar(&sc.CertFile,"cert_file","","The TLS cert file") + fs.StringVar(&sc.KeyFile,"key_file","","The TLS key file") + + fs.DurationVar(&sc.MasterLease,"master_lease",sc.MasterLease,"lease in master elections") + fs.StringVar(&sc.MasterElectionLock,"master_election_lock","","etcd path for the master election or empty for no master election") + + fs.StringSliceVar(&sc.EtcdEndpoints,"etcd_endpoints",sc.EtcdEndpoints,"comma separated list of etcd endpoints") + + fs.StringSliceVar(&sc.ParentServers,"parent_servers",sc.ParentServers,"Addresses of the parent servers which this server connects to") + + flag.Parse() + if err := flagutil.Populate(fs, "DOORMAN"); err != nil { + log.Exit(err) + } + + +} + +// getServerID returns a unique server id, consisting of a host:port id. +func GetServerID(port int, hostname string) string { + if hostname != "" { + return fmt.Sprintf("%s:%d", hostname, port) + } + hn, err := os.Hostname() + + if err != nil { + hn = "unknown.localhost" + } + + return fmt.Sprintf("%s:%d", hn, port) +} diff --git a/go/configuration/configuration.go b/go/configuration/configuration.go index 0db86da..7b263fa 100644 --- a/go/configuration/configuration.go +++ b/go/configuration/configuration.go @@ -16,9 +16,9 @@ import ( "golang.org/x/net/context" ) -// Source is a source for configuration. Calling it will block until a +// ConfigReader is a source for configuration. Calling it will block until a // new version of the config is available. -type Source func(context.Context) (data []byte, err error) +type ConfigReader func(context.Context) (data []byte, err error) type pair struct { data []byte @@ -28,7 +28,7 @@ type pair struct { // LocalFiles is a configuration stored in a file in the local // filesystem. The file will be reloaded if the process receives a // SIGHUP. -func LocalFile(path string) Source { +func LocalFile(path string) ConfigReader { updates := make(chan pair, 1) c := make(chan os.Signal, 1) @@ -53,7 +53,7 @@ func LocalFile(path string) Source { // Etcd is a configuration stored in etcd. It will be reloaded as soon // as it changes. -func Etcd(path string, endpoints []string) Source { +func Etcd(path string, endpoints []string) ConfigReader { updates := make(chan pair, 1) req := make(chan context.Context) @@ -120,3 +120,4 @@ func ParseSource(text string) (kind string, path string) { return "file", text } + diff --git a/go/connection/connection.go b/go/connection/connection.go index 5eb9008..a3d59fd 100644 --- a/go/connection/connection.go +++ b/go/connection/connection.go @@ -39,7 +39,8 @@ const ( // Connection contains information about connection between the server and the client. type Connection struct { - addr string + addrs []string + pinned int currentMaster string Stub pb.CapacityClient conn *rpc.ClientConn @@ -51,13 +52,14 @@ func (connection *Connection) String() string { } // New creates a new Connection with the given server address. -func New(addr string, options ...Option) (*Connection, error) { +func New(addrs []string, options ...Option) (*Connection, error) { connection := &Connection{ - addr: addr, - Opts: getOptions(options), + addrs: addrs, + pinned: -1, + Opts: getOptions(options), } - if err := connection.connect(addr); err != nil { + if err := connection.tryConnect(); err != nil { return nil, err } @@ -100,6 +102,18 @@ func DialOpts(dialOpts ...rpc.DialOption) Option { } } +// +func (connection *Connection) tryConnect() error { + var err error + len := len(connection.addrs) + next := connection.pinned + 1 + + for i := next; err != nil && i < next+len; i++ { + err = connection.connect(connection.addrs[i%len(connection.addrs)]) + } + return err +} + // connect connects the client to the server at addr. func (connection *Connection) connect(addr string) error { connection.Close() @@ -107,14 +121,29 @@ func (connection *Connection) connect(addr string) error { conn, err := rpc.Dial(addr, connection.Opts.DialOpts...) if err != nil { - log.Errorf("connection failed: %v", err) + log.Errorf("connect addr %v failed, err: %v", addr, err) return err } + connection.updateConnectionInfo(addr, conn) + return nil +} + +// +func (connection *Connection) updateConnectionInfo(addr string, conn *rpc.ClientConn) { connection.conn, connection.Stub = conn, pb.NewCapacityClient(conn) connection.currentMaster = addr - return nil + found := false + for i, address := range connection.addrs { + if address == addr { + found = true + connection.pinned = i + } + } + if !found { + log.Errorf("addr %s is not in predefined addresses %v", addr, connection.addrs) + } } // ExecuteRPC executes an RPC against the current master. @@ -155,7 +184,7 @@ func (connection *Connection) runMasterAware(callback func() (HasMastership, err // If there is no current client connection, connect to the original target. // If that fails, retry. if connection.conn == nil { - if err := connection.connect(connection.addr); err != nil { + if err := connection.tryConnect(); err != nil { // The connection failed. Retry. continue } diff --git a/go/flagenv/flagenv.go b/go/flagutil/flagenv.go similarity index 90% rename from go/flagenv/flagenv.go rename to go/flagutil/flagenv.go index 48f23bc..6f9fe5a 100644 --- a/go/flagenv/flagenv.go +++ b/go/flagutil/flagenv.go @@ -1,5 +1,5 @@ -// Package flagenv allows setting flags from the command line. -package flagenv +// Package flagutil allows setting flags from the command line. +package flagutil import ( "flag" @@ -8,6 +8,7 @@ import ( "strings" log "github.com/golang/glog" + "github.com/spf13/pflag" ) // NOTE(ryszard): This code is heavily inspired by @@ -19,7 +20,7 @@ import ( // prepended with prefix and an underscore. So, if prefix is // "DOORMAN", and the flag's name "foo-bar", the environment variable // DOORMAN_FOO_BAR will be used. -func Populate(set *flag.FlagSet, prefix string) error { +func Populate(set *pflag.FlagSet, prefix string) error { var ( setThroughFlags = make(map[string]bool) knownEnv = make(map[string]bool) diff --git a/go/flagenv/flagenv_test.go b/go/flagutil/flagenv_test.go similarity index 97% rename from go/flagenv/flagenv_test.go rename to go/flagutil/flagenv_test.go index 070afaa..1144fea 100644 --- a/go/flagenv/flagenv_test.go +++ b/go/flagutil/flagenv_test.go @@ -1,4 +1,4 @@ -package flagenv +package flagutil import ( "flag" diff --git a/go/cmd/doorman/resourcez.go b/go/server/doorman/resourcez.go similarity index 91% rename from go/cmd/doorman/resourcez.go rename to go/server/doorman/resourcez.go index 9cbc68d..0f68d8d 100644 --- a/go/cmd/doorman/resourcez.go +++ b/go/server/doorman/resourcez.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package doorman // Implements /resourcez, a page which gives detail information about // outstanding leases. @@ -23,8 +23,6 @@ import ( "sync" log "github.com/golang/glog" - - "github.com/youtube/doorman/go/server/doorman" ) var ( @@ -32,7 +30,7 @@ var ( tmpl *template.Template // A slice of Doorman servers which have registered with us. - servers []*doorman.Server + servers []*Server // A mutex which protects the global data in this module. mu sync.RWMutex @@ -47,11 +45,11 @@ func init() { tmpl = template.Must(template.New("resourcez").Parse(resourcezHTML)) // Makes the slice that holds the servers for which we need to provide information. - servers = make([]*doorman.Server, 0, 5) + servers = make([]*Server, 0, 5) } // AddServer adds a Doorman server to the list of servers that we provide information for. -func AddServer(dm *doorman.Server) { +func AddServer(dm *Server) { mu.Lock() defer mu.Unlock() @@ -138,8 +136,8 @@ const resourcezHTML = ` // engine and which gets inserted into the template. type resourcezData struct { Resource string - ServerStatus []doorman.ServerStatus - ResourceLeaseStatus []doorman.ResourceLeaseStatus + ServerStatus []ServerStatus + ResourceLeaseStatus []ResourceLeaseStatus } // resourcezHandler is the handler function that gets called for a request @@ -152,8 +150,8 @@ func resourcezHandler(w http.ResponseWriter, r *http.Request) { // Creates the data structure for the template. data := resourcezData{ Resource: r.FormValue("resource"), - ServerStatus: make([]doorman.ServerStatus, 0, len(servers)), - ResourceLeaseStatus: make([]doorman.ResourceLeaseStatus, 0, len(servers)), + ServerStatus: make([]ServerStatus, 0, len(servers)), + ResourceLeaseStatus: make([]ResourceLeaseStatus, 0, len(servers)), } // Goes through all the servers and fills in their information in the data object. diff --git a/go/server/doorman/server.go b/go/server/doorman/server.go index 08c17b7..a04e73a 100644 --- a/go/server/doorman/server.go +++ b/go/server/doorman/server.go @@ -22,10 +22,6 @@ package doorman import ( "errors" - "path/filepath" - "sync" - "time" - log "github.com/golang/glog" "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus" @@ -35,8 +31,21 @@ import ( "golang.org/x/net/context" rpc "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "path/filepath" + "sync" + "time" + _ "expvar" + "fmt" + "github.com/ghodss/yaml" + "github.com/youtube/doorman/go/cmd/doorman/option" + "github.com/youtube/doorman/go/configuration" + "github.com/youtube/doorman/go/status" pb "github.com/youtube/doorman/proto/doorman" + "net" + "net/http" + _ "net/http/pprof" ) var ( @@ -481,16 +490,101 @@ func (server *Server) triggerElection(ctx context.Context) error { return nil } +func Run(config *option.ServerConfiguration) error { + s, err := NewServer(config, connection.DialOpts(rpc.WithTimeout(config.RpcDialTimeout))) + if err != nil { + return err + } + + var opts []rpc.ServerOption + if config.EnableTls { + log.Infof("Loading credentials from %v and %v.", config.CertFile, config.KeyFile) + creds, err := credentials.NewServerTLSFromFile(config.CertFile, config.KeyFile) + if err != nil { + return fmt.Errorf("Failed to generate credentials,err: %v", err) + } + opts = []rpc.ServerOption{rpc.Creds(creds)} + } + server := rpc.NewServer(opts...) + pb.RegisterCapacityServer(server, s) + + var reader configuration.ConfigReader + kind, path := configuration.ParseSource(config.Config) + switch { + case kind == "file": + reader = configuration.LocalFile(path) + case kind == "etcd": + if len(config.EtcdEndpoints) == 1 && config.EtcdEndpoints[0] == "" { + fmt.Errorf("-etcd_endpoints cannot be empty if a config source etcd is provided") + } + reader = configuration.Etcd(path, config.EtcdEndpoints) + default: + return fmt.Errorf("can't read config from %v", config.Config) + } + + // Try to load the background. If there's a problem with loading + // the server for the first time, the server will keep running, + // but will not serve traffic. + go func() { + for { + data, err := reader(context.Background()) + if err != nil { + log.Errorf("cannot load config data: %v", err) + continue + } + cfg := new(pb.ResourceRepository) + if err := yaml.Unmarshal(data, cfg); err != nil { + log.Errorf("cannot unmarshal config data: %q", data) + continue + } + + if err := s.LoadConfig(context.Background(), cfg, map[string]*time.Time{}); err != nil { + log.Errorf("cannot load config: %v", err) + } + } + }() + + status.AddStatusPart("Doorman", option.Statusz, func(context.Context) interface{} { return s.Status() }) + + // Redirect form / to /debug/status. + http.Handle("/", http.RedirectHandler("/debug/status", http.StatusMovedPermanently)) + AddServer(s) + + http.Handle("/metrics", prometheus.Handler()) + + go http.ListenAndServe(fmt.Sprintf(":%v", config.DebugPort), nil) + + // Waits for the server to get its initial configuration. This guarantees that + // the server will never run without a valid configuration. + log.Info("Waiting for the server to be configured...") + s.WaitUntilConfigured() + + // Runs the server. + log.Info("Server is configured, ready to go!") + + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.Port)) + if err != nil { + return err + } + + server.Serve(lis) + return nil +} + // New returns a new unconfigured server. parentAddr is the address of // a parent, pass the empty string to create a root server. This // function should be called only once, as it registers metrics. -func New(ctx context.Context, id string, parentAddr string, leader election.Election, opts ...connection.Option) (*Server, error) { - s, err := NewIntermediate(ctx, id, parentAddr, leader, opts...) +func NewServer(config *option.ServerConfiguration, opts ...connection.Option) (*Server, error) { + s, err := NewIntermediate(context.Background(), config, opts) if err != nil { - return nil, err + return s, fmt.Errorf("failed to create new server, err: %v", err) } - return s, prometheus.Register(s) + err = prometheus.Register(s) + if err != nil { + return s, fmt.Errorf("failed to register to prometheus, err: %v", err) + } + return s, nil } // Describe implements prometheus.Collector. @@ -512,19 +606,19 @@ func (server *Server) Collect(ch chan<- prometheus.Metric) { } // NewIntermediate creates a server connected to the lower level server. -func NewIntermediate(ctx context.Context, id string, addr string, leader election.Election, opts ...connection.Option) (*Server, error) { +func NewIntermediate(ctx context.Context, config *option.ServerConfiguration, opts ...connection.Option) (*Server, error) { var ( conn *connection.Connection updater updater err error ) - isRootServer := addr == "" + isRootServer := (config.ParentServers == nil || len(config.ParentServers) == 0) // Set up some configuration for intermediate server: establish a connection // to a lower-level server (e.g. the root server) and assign the updater function. if !isRootServer { - if conn, err = connection.New(addr, opts...); err != nil { + if conn, err = connection.New(config.ParentServers, opts...); err != nil { return nil, err } @@ -533,9 +627,16 @@ func NewIntermediate(ctx context.Context, id string, addr string, leader electio } } + var masterElection election.Election + if config.MasterElectionLock != "" { + masterElection = election.NewEtcdElection(config.EtcdEndpoints, config.MasterElectionLock, config.MasterLease) + } else { + masterElection = election.Trivial() + } + server := &Server{ - ID: id, - Election: leader, + ID: option.GetServerID(config.Port, config.HostName), + Election: masterElection, isConfigured: make(chan bool), resources: make(map[string]*Resource), becameMasterAt: time.Unix(0, 0), diff --git a/go/server/election/trivial.go b/go/server/election/trivial.go index 888dc66..fb1f8a7 100644 --- a/go/server/election/trivial.go +++ b/go/server/election/trivial.go @@ -14,6 +14,8 @@ package election +import "context" + type trivial struct { isMaster chan bool current chan string diff --git a/vendor/github.com/spf13/pflag b/vendor/github.com/spf13/pflag new file mode 160000 index 0000000..9ff6c69 --- /dev/null +++ b/vendor/github.com/spf13/pflag @@ -0,0 +1 @@ +Subproject commit 9ff6c6923cfffbcd502984b8e0c80539a94968b7