diff --git a/README.md b/README.md index d6992d27cf..3458deaf0a 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,10 @@ of available backends and the keys that can be put into the this dictionary are * alloc: only perform subnet allocation (no forwarding of data packets) * ```Type``` (string): ```alloc``` +* vxlan: use in-kernel VXLAN to encapsulate the packets. + * ```Type``` (string): ```vxlan``` + * ```VNI``` (number): VXLAN Identifier (VNI) to be used. Defaults to 1 + ### Example configuration JSON The following configuration illustrates the use of most options. @@ -90,8 +94,9 @@ The following configuration illustrates the use of most options. ``` ### Firewalls -flannel uses UDP port 8285 for sending encapsulated packets. Make sure that your firewall rules allow -this traffic for all hosts participating in the overlay network. +When using ```udp``` backend, flannel uses UDP port 8285 for sending encapsulated packets. +When using ```vxlan``` backend, kernel uses UDP port 8472 for sending encapsulated packets. +Make sure that your firewall rules allow this traffic for all hosts participating in the overlay network. ## Running @@ -115,6 +120,12 @@ MTU that it supports. -v=0: log level for V logs. Set to 1 to see messages related to data path ``` +## Zero-downtime restarts +When running in VXLAN mode, the kernel is providing the data path with flanneld acting as the control plane. As such, flanneld +can be restarted (even to do an upgrade) without disturbing existing flows. However, this needs to be done in few seconds as ARP +entries can start to timeout requiring the flanneld daemon to refresh them. Also, to avoid interruptions during restart, the configuration +must not be changed (e.g. VNI, --iface value). + ## Docker integration Docker daemon accepts ```--bip``` argument to configure the subnet of the docker0 bridge. It also accepts ```--mtu``` to set the MTU diff --git a/backend/alloc/alloc.go b/backend/alloc/alloc.go index cc0978a70c..f09f927eaa 100644 --- a/backend/alloc/alloc.go +++ b/backend/alloc/alloc.go @@ -24,11 +24,11 @@ func New(sm *subnet.SubnetManager) backend.Backend { } func (m *AllocBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) { - attrs := subnet.BaseAttrs{ + attrs := subnet.LeaseAttrs{ PublicIP: ip.FromIP(extIP), } - sn, err := m.sm.AcquireLease(ip.FromIP(extIP), &attrs, m.stop) + sn, err := m.sm.AcquireLease(&attrs, m.stop) if err != nil { if err == task.ErrCanceled { return nil, err diff --git a/backend/udp/cproxy.go b/backend/udp/cproxy.go index bfa51afabf..062d277fbd 100644 --- a/backend/udp/cproxy.go +++ b/backend/udp/cproxy.go @@ -72,9 +72,8 @@ func removeRoute(ctl *os.File, dst ip.IP4Net) { func stopProxy(ctl *os.File) { cmd := C.command{ - cmd: C.CMD_STOP, + cmd: C.CMD_STOP, } writeCommand(ctl, &cmd) } - diff --git a/backend/udp/udp.go b/backend/udp/udp.go index 77fc75b0da..90f86171f1 100644 --- a/backend/udp/udp.go +++ b/backend/udp/udp.go @@ -58,11 +58,11 @@ func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (* } // Acquire the lease form subnet manager - attrs := subnet.BaseAttrs{ + attrs := subnet.LeaseAttrs{ PublicIP: ip.FromIP(extIP), } - sn, err := m.sm.AcquireLease(attrs.PublicIP, &attrs, m.stop) + sn, err := m.sm.AcquireLease(&attrs, m.stop) if err != nil { if err == task.ErrCanceled { return nil, err @@ -263,13 +263,7 @@ func (m *UdpBackend) processSubnetEvents(batch subnet.EventBatch) { case subnet.SubnetAdded: log.Info("Subnet added: ", evt.Lease.Network) - var attrs subnet.BaseAttrs - if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil { - log.Error("Error decoding subnet lease JSON: ", err) - continue - } - - setRoute(m.ctl, evt.Lease.Network, attrs.PublicIP, m.cfg.Port) + setRoute(m.ctl, evt.Lease.Network, evt.Lease.Attrs.PublicIP, m.cfg.Port) case subnet.SubnetRemoved: log.Info("Subnet removed: ", evt.Lease.Network) diff --git a/backend/vxlan/device.go b/backend/vxlan/device.go new file mode 100644 index 0000000000..2b46d48791 --- /dev/null +++ b/backend/vxlan/device.go @@ -0,0 +1,287 @@ +package vxlan + +import ( + "fmt" + "net" + "os" + "syscall" + "time" + + log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog" + "github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink" + "github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink/nl" + + "github.com/coreos/flannel/pkg/ip" +) + +type vxlanDeviceAttrs struct { + vni uint32 + name string + vtepIndex int + vtepAddr net.IP + vtepPort int +} + +type vxlanDevice struct { + link *netlink.Vxlan +} + +func sysctlSet(path, value string) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + _, err = f.Write([]byte(value)) + return err +} + +func newVXLANDevice(devAttrs *vxlanDeviceAttrs) (*vxlanDevice, error) { + link := &netlink.Vxlan{ + LinkAttrs: netlink.LinkAttrs{ + Name: devAttrs.name, + }, + VxlanId: int(devAttrs.vni), + VtepDevIndex: devAttrs.vtepIndex, + SrcAddr: devAttrs.vtepAddr, + Port: devAttrs.vtepPort, + Learning: true, + Proxy: true, + L2miss: true, + } + + link, err := ensureLink(link) + if err != nil { + return nil, err + } + + // this enables ARP requests being sent to userspace via netlink + sysctlPath := fmt.Sprintf("/proc/sys/net/ipv4/neigh/%s/app_solicit", devAttrs.name) + sysctlSet(sysctlPath, "3") + + return &vxlanDevice{ + link: link, + }, nil +} + +func ensureLink(vxlan *netlink.Vxlan) (*netlink.Vxlan, error) { + err := netlink.LinkAdd(vxlan) + if err != nil { + if err == syscall.EEXIST { + // it's ok if the device already exists as long as config is similar + existing, err := netlink.LinkByName(vxlan.Name) + if err != nil { + return nil, err + } + if incompat := vxlanLinksIncompat(vxlan, existing); incompat != "" { + log.Warningf("%q already exists with incompatable configuration: %v; recreating device", vxlan.Name, incompat) + + // delete existing + if err = netlink.LinkDel(existing); err != nil { + return nil, fmt.Errorf("failed to delete interface: %v", err) + } + + // create new + if err = netlink.LinkAdd(vxlan); err != nil { + return nil, fmt.Errorf("failed to create vxlan interface: %v", err) + } + } else { + return existing.(*netlink.Vxlan), nil + } + } else { + return nil, err + } + } + + ifindex := vxlan.Index + link, err := netlink.LinkByIndex(vxlan.Index) + if err != nil { + return nil, fmt.Errorf("can't locate created vxlan device with index %v", ifindex) + } + var ok bool + if vxlan, ok = link.(*netlink.Vxlan); !ok { + return nil, fmt.Errorf("created vxlan device with index %v is not vxlan", ifindex) + } + + return vxlan, nil +} + +func (dev *vxlanDevice) Configure(ipn ip.IP4Net) error { + setAddr4(dev.link, ipn.ToIPNet()) + + if err := netlink.LinkSetUp(dev.link); err != nil { + return fmt.Errorf("failed to set interface %s to UP state: %s", dev.link.Attrs().Name, err) + } + + // explicitly add a route since there might be a route for a subnet already + // installed by Docker and then it won't get auto added + route := netlink.Route{ + LinkIndex: dev.link.Attrs().Index, + Scope: netlink.SCOPE_UNIVERSE, + Dst: ipn.Network().ToIPNet(), + } + if err := netlink.RouteAdd(&route); err != nil && err != syscall.EEXIST { + return fmt.Errorf("failed to add route (%s -> %s): %v", ipn.Network().String(), dev.link.Attrs().Name, err) + } + + return nil +} + +func (dev *vxlanDevice) Destroy() { + netlink.LinkDel(dev.link) +} + +func (dev *vxlanDevice) MACAddr() net.HardwareAddr { + return dev.link.HardwareAddr +} + +func (dev *vxlanDevice) MTU() int { + return dev.link.MTU +} + +func (dev *vxlanDevice) MonitorMisses(misses chan *netlink.Neigh) { + nlsock, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH) + if err != nil { + log.Error("Failed to subscribe to netlink RTNLGRP_NEIGH messages") + return + } + + for { + msgs, err := nlsock.Recieve() + if err != nil { + log.Errorf("Failed to receive from netlink: %v ", err) + + // wait 1 sec before retrying but honor the cancel channel + time.Sleep(1*time.Second) + continue + } + + for _, msg := range msgs { + dev.processNeighMsg(msg, misses) + } + } +} + +func isNeighResolving(state int) bool { + return (state & (netlink.NUD_INCOMPLETE | netlink.NUD_STALE | netlink.NUD_DELAY | netlink.NUD_PROBE)) != 0 +} + +func (dev *vxlanDevice) processNeighMsg(msg syscall.NetlinkMessage, misses chan *netlink.Neigh) { + neigh, err := netlink.NeighDeserialize(msg.Data) + if err != nil { + log.Error("Failed to deserialize netlink ndmsg: %v", err) + return + } + + if int(neigh.LinkIndex) != dev.link.Index { + return + } + + if msg.Header.Type != syscall.RTM_GETNEIGH && msg.Header.Type != syscall.RTM_NEWNEIGH { + return + } + + if !isNeighResolving(neigh.State) { + // misses come with NUD_STALE bit set + return + } + + misses <- neigh +} + +func (dev *vxlanDevice) AddL2(mac net.HardwareAddr, vtep net.IP) error { + neigh := netlink.Neigh{ + LinkIndex: dev.link.Index, + State: netlink.NUD_REACHABLE, + Family: syscall.AF_BRIDGE, + Flags: netlink.NTF_SELF, + IP: vtep, + HardwareAddr: mac, + } + + log.Infof("calling NeighAdd: %v, %v", vtep, mac) + return netlink.NeighAdd(&neigh) +} + +func (dev *vxlanDevice) DelL2(mac net.HardwareAddr, vtep net.IP) error { + neigh := netlink.Neigh{ + LinkIndex: dev.link.Index, + Family: syscall.AF_BRIDGE, + Flags: netlink.NTF_SELF, + IP: vtep, + HardwareAddr: mac, + } + + log.Infof("calling NeighDel: %v, %v", vtep, mac) + return netlink.NeighDel(&neigh) +} + +func (dev *vxlanDevice) AddL3(ip net.IP, mac net.HardwareAddr) error { + neigh := netlink.Neigh{ + LinkIndex: dev.link.Index, + State: netlink.NUD_REACHABLE, + Type: syscall.RTN_UNICAST, + IP: ip, + HardwareAddr: mac, + } + + log.Infof("calling NeighSet: %v, %v", ip, mac) + return netlink.NeighSet(&neigh) +} + +func vxlanLinksIncompat(l1, l2 netlink.Link) string { + if l1.Type() != l2.Type() { + return fmt.Sprintf("link type: %v vs %v", l1.Type(), l2.Type()) + } + + v1 := l1.(*netlink.Vxlan) + v2 := l2.(*netlink.Vxlan) + + if v1.VxlanId != v2.VxlanId { + return fmt.Sprintf("vni: %v vs %v", v1.VxlanId, v2.VxlanId) + } + + if v1.VtepDevIndex > 0 && v2.VtepDevIndex > 0 && v1.VtepDevIndex != v2.VtepDevIndex { + return fmt.Sprintf("vtep (external) interface: %v vs %v", v1.VtepDevIndex, v2.VtepDevIndex) + } + + if len(v1.SrcAddr) > 0 && len(v2.SrcAddr) > 0 && !v1.SrcAddr.Equal(v2.SrcAddr) { + return fmt.Sprintf("vtep (external) IP: %v vs %v", v1.SrcAddr, v2.SrcAddr) + } + + if len(v1.Group) > 0 && len(v2.Group) > 0 && !v1.Group.Equal(v2.Group) { + return fmt.Sprintf("group address: %v vs %v", v1.Group, v2.Group) + } + + if v1.L2miss != v2.L2miss { + return fmt.Sprintf("l2miss: %v vs %v", v1.L2miss, v2.L2miss) + } + + if v1.Port > 0 && v2.Port > 0 && v1.Port != v2.Port { + return fmt.Sprintf("port: %v vs %v", v1.Port, v2.Port) + } + + return "" +} + +// sets IP4 addr on link removing any existing ones first +func setAddr4(link *netlink.Vxlan, ipn *net.IPNet) error { + addrs, err := netlink.AddrList(link, syscall.AF_INET) + if err != nil { + return err + } + + for _, addr := range addrs { + if err = netlink.AddrDel(link, &addr); err != nil { + return fmt.Errorf("failed to delete IPv4 addr %s from %s", addr.String(), link.Attrs().Name) + } + } + + addr := netlink.Addr{ ipn, "" } + if err = netlink.AddrAdd(link, &addr); err != nil { + return fmt.Errorf("failed to add IP address %s to %s: %s", ipn.String(), link.Attrs().Name, err) + } + + return nil +} diff --git a/backend/vxlan/routes.go b/backend/vxlan/routes.go new file mode 100644 index 0000000000..9091746c36 --- /dev/null +++ b/backend/vxlan/routes.go @@ -0,0 +1,55 @@ +package vxlan + +import ( + "bytes" + "net" + + "github.com/coreos/flannel/pkg/ip" +) + +type route struct { + network ip.IP4Net + vtepIP net.IP + vtepMAC net.HardwareAddr +} + +type routes []route + +func (rts *routes) set(nw ip.IP4Net, vtepIP net.IP, vtepMAC net.HardwareAddr) { + for i, rt := range *rts { + if rt.network.Equal(nw) { + (*rts)[i].vtepIP = vtepIP + (*rts)[i].vtepMAC = vtepMAC + return + } + } + *rts = append(*rts, route{nw, vtepIP, vtepMAC}) +} + +func (rts *routes) remove(nw ip.IP4Net) { + for i, rt := range *rts { + if rt.network.Equal(nw) { + (*rts)[i] = (*rts)[len(*rts)-1] + (*rts) = (*rts)[0 : len(*rts)-1] + return + } + } +} + +func (rts routes) findByNetwork(ipAddr ip.IP4) *route { + for i, rt := range rts { + if rt.network.Contains(ipAddr) { + return &rts[i] + } + } + return nil +} + +func (rts routes) findByVtepMAC(mac net.HardwareAddr) *route { + for i, rt := range rts { + if bytes.Equal(rt.vtepMAC, mac) { + return &rts[i] + } + } + return nil +} diff --git a/backend/vxlan/vxlan.go b/backend/vxlan/vxlan.go new file mode 100644 index 0000000000..b47eea2533 --- /dev/null +++ b/backend/vxlan/vxlan.go @@ -0,0 +1,277 @@ +package vxlan + +import ( + "encoding/json" + "fmt" + "net" + "sync" + + log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog" + "github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink" + + "github.com/coreos/flannel/backend" + "github.com/coreos/flannel/subnet" + "github.com/coreos/flannel/pkg/ip" + "github.com/coreos/flannel/pkg/task" +) + +const ( + defaultVNI = 1 +) + +type VXLANBackend struct { + sm *subnet.SubnetManager + rawCfg json.RawMessage + cfg struct { + Vni int + Port int + } + dev *vxlanDevice + stop chan bool + wg sync.WaitGroup + rts routes +} + +func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend { + vb := &VXLANBackend{ + sm: sm, + rawCfg: config, + stop: make(chan bool), + } + vb.cfg.Vni = defaultVNI + + return vb +} + +func newSubnetAttrs(pubIP net.IP, mac net.HardwareAddr) (*subnet.LeaseAttrs, error) { + sa := subnet.LeaseAttrs{ + PublicIP: ip.FromIP(pubIP), + BackendType: "vxlan", + } + + data, err := json.Marshal(vxlanLeaseAttrs{hardwareAddr(mac)}) + if err != nil { + return nil, err + } + + err = sa.BackendData.UnmarshalJSON(data) + if err != nil { + return nil, err + } + + return &sa, nil +} + +func (vb *VXLANBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) { + // Parse our configuration + if len(vb.rawCfg) > 0 { + if err := json.Unmarshal(vb.rawCfg, &vb.cfg); err != nil { + return nil, fmt.Errorf("error decoding UDP backend config: %v", err) + } + } + + devAttrs := vxlanDeviceAttrs{ + vni: uint32(vb.cfg.Vni), + name: fmt.Sprintf("flannel.%v", vb.cfg.Vni), + vtepIndex: extIface.Index, + vtepAddr: extIP, + vtepPort: vb.cfg.Port, + } + + var err error + vb.dev, err = newVXLANDevice(&devAttrs) + if err != nil { + return nil, err + } + + sa, err := newSubnetAttrs(extIP, vb.dev.MACAddr()) + if err != nil { + return nil, err + } + + sn, err := vb.sm.AcquireLease(sa, vb.stop) + if err != nil { + if err == task.ErrCanceled { + return nil, err + } else { + return nil, fmt.Errorf("failed to acquire lease: %v", err) + } + } + + // vxlan's subnet is that of the whole overlay network (e.g. /16) + // and not that of the individual host (e.g. /24) + vxlanNet := ip.IP4Net{ + IP: sn.IP, + PrefixLen: vb.sm.GetConfig().Network.PrefixLen, + } + if err = vb.dev.Configure(vxlanNet); err != nil { + return nil, err + } + + return &backend.SubnetDef{sn, vb.dev.MTU()}, nil +} + +func (vb *VXLANBackend) Run() { + vb.wg.Add(1) + go func() { + vb.sm.LeaseRenewer(vb.stop) + vb.wg.Done() + }() + + log.Info("Watching for L2/L3 misses") + misses := make(chan *netlink.Neigh, 100) + // Unfortunately MonitorMisses does not take a cancel channel + // as there's no wait to interrupt netlink socket recv + go vb.dev.MonitorMisses(misses) + + log.Info("Watching for new subnet leases") + evts := make(chan subnet.EventBatch) + vb.wg.Add(1) + go func() { + vb.sm.WatchLeases(evts, vb.stop) + vb.wg.Done() + }() + + defer vb.wg.Wait() + + for { + select { + case miss := <-misses: + vb.handleMiss(miss) + + case evtBatch := <-evts: + vb.handleSubnetEvents(evtBatch) + + case <-vb.stop: + return + } + } +} + +func (vb *VXLANBackend) Stop() { + close(vb.stop) +} + +func (vb *VXLANBackend) Name() string { + return "VXLAN" +} + +// So we can make it JSON (un)marshalable +type hardwareAddr net.HardwareAddr + +func (hw hardwareAddr) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("%q", net.HardwareAddr(hw))), nil +} + +func (hw *hardwareAddr) UnmarshalJSON(b []byte) error { + if len(b) < 2 || b[0] != '"' || b[len(b)-1] != '"' { + return fmt.Errorf("error parsing hardware addr") + } + + b = b[1:len(b)-1] + + mac, err := net.ParseMAC(string(b)) + if err != nil { + return err + } + + *hw = hardwareAddr(mac) + return nil +} + +type vxlanLeaseAttrs struct { + VtepMAC hardwareAddr +} + +func (vb *VXLANBackend) handleSubnetEvents(batch subnet.EventBatch) { + for _, evt := range batch { + switch evt.Type { + case subnet.SubnetAdded: + log.Info("Subnet added: ", evt.Lease.Network) + + if evt.Lease.Attrs.BackendType != "vxlan" { + log.Warningf("Ignoring non-vxlan subnet: type=%v", evt.Lease.Attrs.BackendType) + continue + } + + var attrs vxlanLeaseAttrs + if err := json.Unmarshal(evt.Lease.Attrs.BackendData, &attrs); err != nil { + log.Error("Error decoding subnet lease JSON: ", err) + continue + } + + vb.rts.set(evt.Lease.Network, evt.Lease.Attrs.PublicIP.ToIP(), net.HardwareAddr(attrs.VtepMAC)) + + case subnet.SubnetRemoved: + log.Info("Subnet removed: ", evt.Lease.Network) + + vb.rts.remove(evt.Lease.Network) + + if evt.Lease.Attrs.BackendType != "vxlan" { + log.Warningf("Ignoring non-vxlan subnet: type=%v", evt.Lease.Attrs.BackendType) + continue + } + + var attrs vxlanLeaseAttrs + if err := json.Unmarshal(evt.Lease.Attrs.BackendData, &attrs); err != nil { + log.Error("Error decoding subnet lease JSON: ", err) + continue + } + + if len(attrs.VtepMAC) > 0 { + vb.dev.DelL2(net.HardwareAddr(attrs.VtepMAC), evt.Lease.Attrs.PublicIP.ToIP()) + } + + default: + log.Error("Internal error: unknown event type: ", int(evt.Type)) + } + } +} + +func (vb *VXLANBackend) handleMiss(miss *netlink.Neigh) { + switch { + case len(miss.IP) == 0 && len(miss.HardwareAddr) == 0: + log.Info("Ignoring nil miss") + + case len(miss.IP) == 0: + vb.handleL2Miss(miss) + + case len(miss.HardwareAddr) == 0: + vb.handleL3Miss(miss) + + default: + log.Infof("Ignoring not a miss: %v, %v", miss.HardwareAddr, miss.IP) + } +} + +func (vb *VXLANBackend) handleL2Miss(miss *netlink.Neigh) { + log.Infof("L2 miss: %v", miss.HardwareAddr) + + rt := vb.rts.findByVtepMAC(miss.HardwareAddr) + if rt == nil { + log.Infof("Route for %v not found", miss.HardwareAddr) + return + } + + if err := vb.dev.AddL2(miss.HardwareAddr, rt.vtepIP); err != nil { + log.Errorf("AddL2 failed: %v", err) + } else { + log.Info("AddL2 succeeded") + } +} + +func (vb *VXLANBackend) handleL3Miss(miss *netlink.Neigh) { + log.Infof("L3 miss: %v", miss.IP) + + rt := vb.rts.findByNetwork(ip.FromIP(miss.IP)) + if rt == nil { + log.Infof("Route for %v not found", miss.IP) + return + } + + if err := vb.dev.AddL3(miss.IP, rt.vtepMAC); err != nil { + log.Errorf("AddL3 failed: %v", err) + } else { + log.Info("AddL3 succeeded") + } +} diff --git a/main.go b/main.go index 62b0c596de..5f819233a6 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/flannel/subnet" "github.com/coreos/flannel/backend/alloc" "github.com/coreos/flannel/backend/udp" + "github.com/coreos/flannel/backend/vxlan" ) type CmdLineOpts struct { @@ -159,6 +160,8 @@ func newBackend() (backend.Backend, error) { return udp.New(sm, config.Backend), nil case "alloc": return alloc.New(sm), nil + case "vxlan": + return vxlan.New(sm, config.Backend), nil default: return nil, fmt.Errorf("'%v': unknown backend type", bt.Type) } @@ -189,7 +192,6 @@ func run(be backend.Backend, exit chan int) { sn, err := be.Init(iface, ipaddr, opts.ipMasq) if err != nil { - log.Errorf("Could not init %v backend: %v", be.Name(), err) return } diff --git a/subnet/subnet.go b/subnet/subnet.go index aa2e85c209..465952fad3 100644 --- a/subnet/subnet.go +++ b/subnet/subnet.go @@ -38,9 +38,15 @@ var ( subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`) ) +type LeaseAttrs struct { + PublicIP ip.IP4 + BackendType string `json:",omitempty"` + BackendData json.RawMessage `json:",omitempty"` +} + type SubnetLease struct { Network ip.IP4Net - Data string + Attrs LeaseAttrs } type SubnetManager struct { @@ -66,15 +72,9 @@ func NewSubnetManager(etcdEndpoint []string, prefix string) (*SubnetManager, err return newSubnetManager(esr) } -func (sm *SubnetManager) AcquireLease(extIP ip.IP4, data interface{}, cancel chan bool) (ip.IP4Net, error) { - dataBytes, err := json.Marshal(data) - if err != nil { - return ip.IP4Net{}, err - } - - var sn ip.IP4Net +func (sm *SubnetManager) AcquireLease(attrs *LeaseAttrs, cancel chan bool) (ip.IP4Net, error) { for { - sn, err = sm.acquireLeaseOnce(extIP, string(dataBytes), cancel) + sn, err := sm.acquireLeaseOnce(attrs, cancel) switch { case err == nil: log.Info("Subnet lease acquired: ", sn) @@ -96,53 +96,71 @@ func (sm *SubnetManager) AcquireLease(extIP ip.IP4, data interface{}, cancel cha } } -func (sm *SubnetManager) acquireLeaseOnce(extIP ip.IP4, data string, cancel chan bool) (ip.IP4Net, error) { - for i := 0; i < registerRetries; i++ { - var err error - sm.leases, err = sm.getLeases() - if err != nil { - return ip.IP4Net{}, err +func findLeaseByIP(leases []SubnetLease, pubIP ip.IP4) *SubnetLease { + for _, l := range leases { + if pubIP == l.Attrs.PublicIP { + return &l } + } - // try to reuse a subnet if there's one that matches our IP - for _, l := range sm.leases { - var ba BaseAttrs - err = json.Unmarshal([]byte(l.Data), &ba) - if err != nil { - log.Error("Error parsing subnet lease JSON: ", err) - } else { - if extIP == ba.PublicIP { - resp, err := sm.registry.updateSubnet(l.Network.StringSep(".", "-"), data, subnetTTL) - if err != nil { - return ip.IP4Net{}, err - } - - sm.myLease.Network = l.Network - sm.leaseExp = *resp.Node.Expiration - return l.Network, nil - } - } - } + return nil +} - // no existing match, grab a new one - sn, err := sm.allocateSubnet() +func (sm *SubnetManager) tryAcquireLease(extIP ip.IP4, attrs []byte) (ip.IP4Net, error) { + var err error + sm.leases, err = sm.getLeases() + if err != nil { + return ip.IP4Net{}, err + } + + // try to reuse a subnet if there's one that matches our IP + if l := findLeaseByIP(sm.leases, extIP); l != nil { + resp, err := sm.registry.updateSubnet(l.Network.StringSep(".", "-"), string(attrs), subnetTTL) if err != nil { return ip.IP4Net{}, err } - resp, err := sm.registry.createSubnet(sn.StringSep(".", "-"), data, subnetTTL) - switch { - case err == nil: - sm.myLease.Network = sn - sm.leaseExp = *resp.Node.Expiration - return sn, nil + sm.myLease.Network = l.Network + sm.leaseExp = *resp.Node.Expiration + return l.Network, nil + } - // if etcd returned Key Already Exists, try again. - case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists: - break + // no existing match, grab a new one + sn, err := sm.allocateSubnet() + if err != nil { + return ip.IP4Net{}, err + } - default: + resp, err := sm.registry.createSubnet(sn.StringSep(".", "-"), string(attrs), subnetTTL) + switch { + case err == nil: + sm.myLease.Network = sn + sm.leaseExp = *resp.Node.Expiration + return sn, nil + + // if etcd returned Key Already Exists, try again. + case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists: + return ip.IP4Net{}, nil + + default: + return ip.IP4Net{}, err + } +} + +func (sm *SubnetManager) acquireLeaseOnce(attrs *LeaseAttrs, cancel chan bool) (ip.IP4Net, error) { + attrBytes, err := json.Marshal(attrs) + if err != nil { + log.Errorf("marshal failed: %#v, %v", attrs, err) + return ip.IP4Net{}, err + } + + for i := 0; i < registerRetries; i++ { + sn, err := sm.tryAcquireLease(attrs.PublicIP, attrBytes) + switch { + case err != nil: return ip.IP4Net{}, err + case sn.IP != 0: + return sn, nil } // before moving on, check for cancel @@ -154,12 +172,6 @@ func (sm *SubnetManager) acquireLeaseOnce(extIP ip.IP4, data string, cancel chan return ip.IP4Net{}, errors.New("Max retries reached trying to acquire a subnet") } -func (sm *SubnetManager) UpdateSubnet(data string) error { - resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), data, subnetTTL) - sm.leaseExp = *resp.Node.Expiration - return err -} - func (sm *SubnetManager) GetConfig() *Config { return sm.config } @@ -205,8 +217,11 @@ func (sm *SubnetManager) getLeases() ([]SubnetLease, error) { for _, node := range resp.Node.Nodes { sn, err := parseSubnetKey(node.Key) if err == nil { - lease := SubnetLease{sn, node.Value} - leases = append(leases, lease) + var attrs LeaseAttrs + if err = json.Unmarshal([]byte(node.Value), &attrs); err == nil { + lease := SubnetLease{sn, attrs} + leases = append(leases, lease) + } } } sm.lastIndex = resp.EtcdIndex @@ -261,39 +276,41 @@ func (sm *SubnetManager) applyLeases(newLeases []SubnetLease) EventBatch { return batch } -func (sm *SubnetManager) applySubnetChange(action string, ipn ip.IP4Net, data string) Event { +func (sm *SubnetManager) applySubnetChange(action string, ipn ip.IP4Net, data string) (Event, error) { switch action { case "delete", "expire": for i, l := range sm.leases { if l.Network.Equal(ipn) { deleteLease(sm.leases, i) - return Event{SubnetRemoved, l} + return Event{SubnetRemoved, l}, nil } } log.Errorf("Removed subnet (%s) was not found", ipn) return Event{ SubnetRemoved, - SubnetLease{ipn, ""}, - } + SubnetLease{ipn, LeaseAttrs{}}, + }, nil default: + var attrs LeaseAttrs + err := json.Unmarshal([]byte(data), &attrs) + if err != nil { + return Event{}, err + } + for i, l := range sm.leases { if l.Network.Equal(ipn) { - sm.leases[i] = SubnetLease{ipn, data} - return Event{SubnetAdded, sm.leases[i]} + sm.leases[i] = SubnetLease{ipn, attrs} + return Event{SubnetAdded, sm.leases[i]}, nil } } - sm.leases = append(sm.leases, SubnetLease{ipn, data}) - return Event{SubnetAdded, sm.leases[len(sm.leases)-1]} + sm.leases = append(sm.leases, SubnetLease{ipn, attrs}) + return Event{SubnetAdded, sm.leases[len(sm.leases)-1]}, nil } } -type BaseAttrs struct { - PublicIP ip.IP4 -} - func (sm *SubnetManager) allocateSubnet() (ip.IP4Net, error) { log.Infof("Picking subnet in range %s ... %s", sm.config.SubnetMin, sm.config.SubnetMax) @@ -369,7 +386,10 @@ func (sm *SubnetManager) parseSubnetWatchResponse(resp *etcd.Response) (batch *E // Don't process our own changes if !sm.myLease.Network.Equal(sn) { - evt := sm.applySubnetChange(resp.Action, sn, resp.Node.Value) + evt, err := sm.applySubnetChange(resp.Action, sn, resp.Node.Value) + if err != nil { + return nil, err + } batch = &EventBatch{evt} } @@ -404,7 +424,14 @@ func (sm *SubnetManager) LeaseRenewer(cancel chan bool) { for { select { case <-time.After(dur): - resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), sm.myLease.Data, subnetTTL) + attrBytes, err := json.Marshal(sm.myLease.Attrs) + if err != nil { + log.Error("Error renewing lease (trying again in 1 min): ", err) + dur = time.Minute + continue + } + + resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), string(attrBytes), subnetTTL) if err != nil { log.Error("Error renewing lease (trying again in 1 min): ", err) dur = time.Minute diff --git a/subnet/subnet_test.go b/subnet/subnet_test.go index 22507d77a8..803fdc7710 100644 --- a/subnet/subnet_test.go +++ b/subnet/subnet_test.go @@ -112,6 +112,7 @@ func (msr *mockSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd case sn = <-msr.addCh: n := etcd.Node{ Key: sn, + Value: `{"PublicIP": "1.1.1.1"}`, ModifiedIndex: msr.index, } msr.subnets.Nodes = append(msr.subnets.Nodes, &n) @@ -152,12 +153,12 @@ func TestAcquireLease(t *testing.T) { } extIP, _ := ip.ParseIP4("1.2.3.4") - data := BaseAttrs{ + attrs := LeaseAttrs{ PublicIP: extIP, } cancel := make(chan bool) - sn, err := sm.AcquireLease(extIP, data, cancel) + sn, err := sm.AcquireLease(&attrs, cancel) if err != nil { t.Fatal("AcquireLease failed: ", err) } @@ -167,7 +168,7 @@ func TestAcquireLease(t *testing.T) { } // Acquire again, should reuse - if sn, err = sm.AcquireLease(extIP, data, cancel); err != nil { + if sn, err = sm.AcquireLease(&attrs, cancel); err != nil { t.Fatal("AcquireLease failed: ", err) } @@ -258,14 +259,14 @@ func TestRenewLease(t *testing.T) { } extIP, _ := ip.ParseIP4("1.2.3.4") - data := BaseAttrs{ + attrs := LeaseAttrs{ PublicIP: extIP, } cancel := make(chan bool) defer close(cancel) - sn, err := sm.AcquireLease(extIP, data, cancel) + sn, err := sm.AcquireLease(&attrs, cancel) if err != nil { t.Fatal("AcquireLease failed: ", err) }