From 80d0086994c415fa01dd98c6e6fc3056824d195d Mon Sep 17 00:00:00 2001 From: Eugene Yakubovich Date: Mon, 15 Sep 2014 10:24:26 -0700 Subject: [PATCH] Add VXLAN backend VXLAN datapath is supported by the kernel thereby reducing the overhead of TUN device and userspace switching. This patch takes advantange of DOVE extensions and does not use multicast for port flooding. Fixes #18 --- README.md | 15 ++- backend/alloc/alloc.go | 4 +- backend/udp/cproxy.go | 3 +- backend/udp/udp.go | 12 +- backend/vxlan/device.go | 287 ++++++++++++++++++++++++++++++++++++++++ backend/vxlan/routes.go | 55 ++++++++ backend/vxlan/vxlan.go | 277 ++++++++++++++++++++++++++++++++++++++ main.go | 4 +- subnet/subnet.go | 163 +++++++++++++---------- subnet/subnet_test.go | 11 +- 10 files changed, 742 insertions(+), 89 deletions(-) create mode 100644 backend/vxlan/device.go create mode 100644 backend/vxlan/routes.go create mode 100644 backend/vxlan/vxlan.go diff --git a/README.md b/README.md index 51e0e33a0f..1fbe283a62 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,10 @@ of available backends and the keys that can be put into the this dictionary are * alloc: only perform subnet allocation (no forwarding of data packets) * ```Type``` (string): ```alloc``` +* vxlan: use in-kernel VXLAN to encapsulate the packets. + * ```Type``` (string): ```vxlan``` + * ```VNI``` (number): VXLAN Identifier (VNI) to be used. Defaults to 1 + ### Example configuration JSON The following configuration illustrates the use of most options. @@ -90,8 +94,9 @@ The following configuration illustrates the use of most options. ``` ### Firewalls -flannel uses UDP port 8285 for sending encapsulated packets. Make sure that your firewall rules allow -this traffic for all hosts participating in the overlay network. +When using ```udp``` backend, flannel uses UDP port 8285 for sending encapsulated packets. +When using ```vxlan``` backend, kernel uses UDP port 8472 for sending encapsulated packets. +Make sure that your firewall rules allow this traffic for all hosts participating in the overlay network. ## Running @@ -115,6 +120,12 @@ MTU that it supports. -v=0: log level for V logs. Set to 1 to see messages related to data path ``` +## Zero-downtime restarts +When running in VXLAN mode, the kernel is providing the data path with flanneld acting as the control plane. As such, flanneld +can be restarted (even to do an upgrade) without disturbing existing flows. However, this needs to be done in few seconds as ARP +entries can start to timeout requiring the flanneld daemon to refresh them. Also, to avoid interruptions during restart, the configuration +must not be changed (e.g. VNI, --iface value). + ## Docker integration Docker daemon accepts ```--bip``` argument to configure the subnet of the docker0 bridge. It also accepts ```--mtu``` to set the MTU diff --git a/backend/alloc/alloc.go b/backend/alloc/alloc.go index cc0978a70c..f09f927eaa 100644 --- a/backend/alloc/alloc.go +++ b/backend/alloc/alloc.go @@ -24,11 +24,11 @@ func New(sm *subnet.SubnetManager) backend.Backend { } func (m *AllocBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) { - attrs := subnet.BaseAttrs{ + attrs := subnet.LeaseAttrs{ PublicIP: ip.FromIP(extIP), } - sn, err := m.sm.AcquireLease(ip.FromIP(extIP), &attrs, m.stop) + sn, err := m.sm.AcquireLease(&attrs, m.stop) if err != nil { if err == task.ErrCanceled { return nil, err diff --git a/backend/udp/cproxy.go b/backend/udp/cproxy.go index bfa51afabf..062d277fbd 100644 --- a/backend/udp/cproxy.go +++ b/backend/udp/cproxy.go @@ -72,9 +72,8 @@ func removeRoute(ctl *os.File, dst ip.IP4Net) { func stopProxy(ctl *os.File) { cmd := C.command{ - cmd: C.CMD_STOP, + cmd: C.CMD_STOP, } writeCommand(ctl, &cmd) } - diff --git a/backend/udp/udp.go b/backend/udp/udp.go index 77fc75b0da..90f86171f1 100644 --- a/backend/udp/udp.go +++ b/backend/udp/udp.go @@ -58,11 +58,11 @@ func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (* } // Acquire the lease form subnet manager - attrs := subnet.BaseAttrs{ + attrs := subnet.LeaseAttrs{ PublicIP: ip.FromIP(extIP), } - sn, err := m.sm.AcquireLease(attrs.PublicIP, &attrs, m.stop) + sn, err := m.sm.AcquireLease(&attrs, m.stop) if err != nil { if err == task.ErrCanceled { return nil, err @@ -263,13 +263,7 @@ func (m *UdpBackend) processSubnetEvents(batch subnet.EventBatch) { case subnet.SubnetAdded: log.Info("Subnet added: ", evt.Lease.Network) - var attrs subnet.BaseAttrs - if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil { - log.Error("Error decoding subnet lease JSON: ", err) - continue - } - - setRoute(m.ctl, evt.Lease.Network, attrs.PublicIP, m.cfg.Port) + setRoute(m.ctl, evt.Lease.Network, evt.Lease.Attrs.PublicIP, m.cfg.Port) case subnet.SubnetRemoved: log.Info("Subnet removed: ", evt.Lease.Network) diff --git a/backend/vxlan/device.go b/backend/vxlan/device.go new file mode 100644 index 0000000000..2b46d48791 --- /dev/null +++ b/backend/vxlan/device.go @@ -0,0 +1,287 @@ +package vxlan + +import ( + "fmt" + "net" + "os" + "syscall" + "time" + + log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog" + "github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink" + "github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink/nl" + + "github.com/coreos/flannel/pkg/ip" +) + +type vxlanDeviceAttrs struct { + vni uint32 + name string + vtepIndex int + vtepAddr net.IP + vtepPort int +} + +type vxlanDevice struct { + link *netlink.Vxlan +} + +func sysctlSet(path, value string) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + _, err = f.Write([]byte(value)) + return err +} + +func newVXLANDevice(devAttrs *vxlanDeviceAttrs) (*vxlanDevice, error) { + link := &netlink.Vxlan{ + LinkAttrs: netlink.LinkAttrs{ + Name: devAttrs.name, + }, + VxlanId: int(devAttrs.vni), + VtepDevIndex: devAttrs.vtepIndex, + SrcAddr: devAttrs.vtepAddr, + Port: devAttrs.vtepPort, + Learning: true, + Proxy: true, + L2miss: true, + } + + link, err := ensureLink(link) + if err != nil { + return nil, err + } + + // this enables ARP requests being sent to userspace via netlink + sysctlPath := fmt.Sprintf("/proc/sys/net/ipv4/neigh/%s/app_solicit", devAttrs.name) + sysctlSet(sysctlPath, "3") + + return &vxlanDevice{ + link: link, + }, nil +} + +func ensureLink(vxlan *netlink.Vxlan) (*netlink.Vxlan, error) { + err := netlink.LinkAdd(vxlan) + if err != nil { + if err == syscall.EEXIST { + // it's ok if the device already exists as long as config is similar + existing, err := netlink.LinkByName(vxlan.Name) + if err != nil { + return nil, err + } + if incompat := vxlanLinksIncompat(vxlan, existing); incompat != "" { + log.Warningf("%q already exists with incompatable configuration: %v; recreating device", vxlan.Name, incompat) + + // delete existing + if err = netlink.LinkDel(existing); err != nil { + return nil, fmt.Errorf("failed to delete interface: %v", err) + } + + // create new + if err = netlink.LinkAdd(vxlan); err != nil { + return nil, fmt.Errorf("failed to create vxlan interface: %v", err) + } + } else { + return existing.(*netlink.Vxlan), nil + } + } else { + return nil, err + } + } + + ifindex := vxlan.Index + link, err := netlink.LinkByIndex(vxlan.Index) + if err != nil { + return nil, fmt.Errorf("can't locate created vxlan device with index %v", ifindex) + } + var ok bool + if vxlan, ok = link.(*netlink.Vxlan); !ok { + return nil, fmt.Errorf("created vxlan device with index %v is not vxlan", ifindex) + } + + return vxlan, nil +} + +func (dev *vxlanDevice) Configure(ipn ip.IP4Net) error { + setAddr4(dev.link, ipn.ToIPNet()) + + if err := netlink.LinkSetUp(dev.link); err != nil { + return fmt.Errorf("failed to set interface %s to UP state: %s", dev.link.Attrs().Name, err) + } + + // explicitly add a route since there might be a route for a subnet already + // installed by Docker and then it won't get auto added + route := netlink.Route{ + LinkIndex: dev.link.Attrs().Index, + Scope: netlink.SCOPE_UNIVERSE, + Dst: ipn.Network().ToIPNet(), + } + if err := netlink.RouteAdd(&route); err != nil && err != syscall.EEXIST { + return fmt.Errorf("failed to add route (%s -> %s): %v", ipn.Network().String(), dev.link.Attrs().Name, err) + } + + return nil +} + +func (dev *vxlanDevice) Destroy() { + netlink.LinkDel(dev.link) +} + +func (dev *vxlanDevice) MACAddr() net.HardwareAddr { + return dev.link.HardwareAddr +} + +func (dev *vxlanDevice) MTU() int { + return dev.link.MTU +} + +func (dev *vxlanDevice) MonitorMisses(misses chan *netlink.Neigh) { + nlsock, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH) + if err != nil { + log.Error("Failed to subscribe to netlink RTNLGRP_NEIGH messages") + return + } + + for { + msgs, err := nlsock.Recieve() + if err != nil { + log.Errorf("Failed to receive from netlink: %v ", err) + + // wait 1 sec before retrying but honor the cancel channel + time.Sleep(1*time.Second) + continue + } + + for _, msg := range msgs { + dev.processNeighMsg(msg, misses) + } + } +} + +func isNeighResolving(state int) bool { + return (state & (netlink.NUD_INCOMPLETE | netlink.NUD_STALE | netlink.NUD_DELAY | netlink.NUD_PROBE)) != 0 +} + +func (dev *vxlanDevice) processNeighMsg(msg syscall.NetlinkMessage, misses chan *netlink.Neigh) { + neigh, err := netlink.NeighDeserialize(msg.Data) + if err != nil { + log.Error("Failed to deserialize netlink ndmsg: %v", err) + return + } + + if int(neigh.LinkIndex) != dev.link.Index { + return + } + + if msg.Header.Type != syscall.RTM_GETNEIGH && msg.Header.Type != syscall.RTM_NEWNEIGH { + return + } + + if !isNeighResolving(neigh.State) { + // misses come with NUD_STALE bit set + return + } + + misses <- neigh +} + +func (dev *vxlanDevice) AddL2(mac net.HardwareAddr, vtep net.IP) error { + neigh := netlink.Neigh{ + LinkIndex: dev.link.Index, + State: netlink.NUD_REACHABLE, + Family: syscall.AF_BRIDGE, + Flags: netlink.NTF_SELF, + IP: vtep, + HardwareAddr: mac, + } + + log.Infof("calling NeighAdd: %v, %v", vtep, mac) + return netlink.NeighAdd(&neigh) +} + +func (dev *vxlanDevice) DelL2(mac net.HardwareAddr, vtep net.IP) error { + neigh := netlink.Neigh{ + LinkIndex: dev.link.Index, + Family: syscall.AF_BRIDGE, + Flags: netlink.NTF_SELF, + IP: vtep, + HardwareAddr: mac, + } + + log.Infof("calling NeighDel: %v, %v", vtep, mac) + return netlink.NeighDel(&neigh) +} + +func (dev *vxlanDevice) AddL3(ip net.IP, mac net.HardwareAddr) error { + neigh := netlink.Neigh{ + LinkIndex: dev.link.Index, + State: netlink.NUD_REACHABLE, + Type: syscall.RTN_UNICAST, + IP: ip, + HardwareAddr: mac, + } + + log.Infof("calling NeighSet: %v, %v", ip, mac) + return netlink.NeighSet(&neigh) +} + +func vxlanLinksIncompat(l1, l2 netlink.Link) string { + if l1.Type() != l2.Type() { + return fmt.Sprintf("link type: %v vs %v", l1.Type(), l2.Type()) + } + + v1 := l1.(*netlink.Vxlan) + v2 := l2.(*netlink.Vxlan) + + if v1.VxlanId != v2.VxlanId { + return fmt.Sprintf("vni: %v vs %v", v1.VxlanId, v2.VxlanId) + } + + if v1.VtepDevIndex > 0 && v2.VtepDevIndex > 0 && v1.VtepDevIndex != v2.VtepDevIndex { + return fmt.Sprintf("vtep (external) interface: %v vs %v", v1.VtepDevIndex, v2.VtepDevIndex) + } + + if len(v1.SrcAddr) > 0 && len(v2.SrcAddr) > 0 && !v1.SrcAddr.Equal(v2.SrcAddr) { + return fmt.Sprintf("vtep (external) IP: %v vs %v", v1.SrcAddr, v2.SrcAddr) + } + + if len(v1.Group) > 0 && len(v2.Group) > 0 && !v1.Group.Equal(v2.Group) { + return fmt.Sprintf("group address: %v vs %v", v1.Group, v2.Group) + } + + if v1.L2miss != v2.L2miss { + return fmt.Sprintf("l2miss: %v vs %v", v1.L2miss, v2.L2miss) + } + + if v1.Port > 0 && v2.Port > 0 && v1.Port != v2.Port { + return fmt.Sprintf("port: %v vs %v", v1.Port, v2.Port) + } + + return "" +} + +// sets IP4 addr on link removing any existing ones first +func setAddr4(link *netlink.Vxlan, ipn *net.IPNet) error { + addrs, err := netlink.AddrList(link, syscall.AF_INET) + if err != nil { + return err + } + + for _, addr := range addrs { + if err = netlink.AddrDel(link, &addr); err != nil { + return fmt.Errorf("failed to delete IPv4 addr %s from %s", addr.String(), link.Attrs().Name) + } + } + + addr := netlink.Addr{ ipn, "" } + if err = netlink.AddrAdd(link, &addr); err != nil { + return fmt.Errorf("failed to add IP address %s to %s: %s", ipn.String(), link.Attrs().Name, err) + } + + return nil +} diff --git a/backend/vxlan/routes.go b/backend/vxlan/routes.go new file mode 100644 index 0000000000..9091746c36 --- /dev/null +++ b/backend/vxlan/routes.go @@ -0,0 +1,55 @@ +package vxlan + +import ( + "bytes" + "net" + + "github.com/coreos/flannel/pkg/ip" +) + +type route struct { + network ip.IP4Net + vtepIP net.IP + vtepMAC net.HardwareAddr +} + +type routes []route + +func (rts *routes) set(nw ip.IP4Net, vtepIP net.IP, vtepMAC net.HardwareAddr) { + for i, rt := range *rts { + if rt.network.Equal(nw) { + (*rts)[i].vtepIP = vtepIP + (*rts)[i].vtepMAC = vtepMAC + return + } + } + *rts = append(*rts, route{nw, vtepIP, vtepMAC}) +} + +func (rts *routes) remove(nw ip.IP4Net) { + for i, rt := range *rts { + if rt.network.Equal(nw) { + (*rts)[i] = (*rts)[len(*rts)-1] + (*rts) = (*rts)[0 : len(*rts)-1] + return + } + } +} + +func (rts routes) findByNetwork(ipAddr ip.IP4) *route { + for i, rt := range rts { + if rt.network.Contains(ipAddr) { + return &rts[i] + } + } + return nil +} + +func (rts routes) findByVtepMAC(mac net.HardwareAddr) *route { + for i, rt := range rts { + if bytes.Equal(rt.vtepMAC, mac) { + return &rts[i] + } + } + return nil +} diff --git a/backend/vxlan/vxlan.go b/backend/vxlan/vxlan.go new file mode 100644 index 0000000000..b47eea2533 --- /dev/null +++ b/backend/vxlan/vxlan.go @@ -0,0 +1,277 @@ +package vxlan + +import ( + "encoding/json" + "fmt" + "net" + "sync" + + log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog" + "github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink" + + "github.com/coreos/flannel/backend" + "github.com/coreos/flannel/subnet" + "github.com/coreos/flannel/pkg/ip" + "github.com/coreos/flannel/pkg/task" +) + +const ( + defaultVNI = 1 +) + +type VXLANBackend struct { + sm *subnet.SubnetManager + rawCfg json.RawMessage + cfg struct { + Vni int + Port int + } + dev *vxlanDevice + stop chan bool + wg sync.WaitGroup + rts routes +} + +func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend { + vb := &VXLANBackend{ + sm: sm, + rawCfg: config, + stop: make(chan bool), + } + vb.cfg.Vni = defaultVNI + + return vb +} + +func newSubnetAttrs(pubIP net.IP, mac net.HardwareAddr) (*subnet.LeaseAttrs, error) { + sa := subnet.LeaseAttrs{ + PublicIP: ip.FromIP(pubIP), + BackendType: "vxlan", + } + + data, err := json.Marshal(vxlanLeaseAttrs{hardwareAddr(mac)}) + if err != nil { + return nil, err + } + + err = sa.BackendData.UnmarshalJSON(data) + if err != nil { + return nil, err + } + + return &sa, nil +} + +func (vb *VXLANBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) { + // Parse our configuration + if len(vb.rawCfg) > 0 { + if err := json.Unmarshal(vb.rawCfg, &vb.cfg); err != nil { + return nil, fmt.Errorf("error decoding UDP backend config: %v", err) + } + } + + devAttrs := vxlanDeviceAttrs{ + vni: uint32(vb.cfg.Vni), + name: fmt.Sprintf("flannel.%v", vb.cfg.Vni), + vtepIndex: extIface.Index, + vtepAddr: extIP, + vtepPort: vb.cfg.Port, + } + + var err error + vb.dev, err = newVXLANDevice(&devAttrs) + if err != nil { + return nil, err + } + + sa, err := newSubnetAttrs(extIP, vb.dev.MACAddr()) + if err != nil { + return nil, err + } + + sn, err := vb.sm.AcquireLease(sa, vb.stop) + if err != nil { + if err == task.ErrCanceled { + return nil, err + } else { + return nil, fmt.Errorf("failed to acquire lease: %v", err) + } + } + + // vxlan's subnet is that of the whole overlay network (e.g. /16) + // and not that of the individual host (e.g. /24) + vxlanNet := ip.IP4Net{ + IP: sn.IP, + PrefixLen: vb.sm.GetConfig().Network.PrefixLen, + } + if err = vb.dev.Configure(vxlanNet); err != nil { + return nil, err + } + + return &backend.SubnetDef{sn, vb.dev.MTU()}, nil +} + +func (vb *VXLANBackend) Run() { + vb.wg.Add(1) + go func() { + vb.sm.LeaseRenewer(vb.stop) + vb.wg.Done() + }() + + log.Info("Watching for L2/L3 misses") + misses := make(chan *netlink.Neigh, 100) + // Unfortunately MonitorMisses does not take a cancel channel + // as there's no wait to interrupt netlink socket recv + go vb.dev.MonitorMisses(misses) + + log.Info("Watching for new subnet leases") + evts := make(chan subnet.EventBatch) + vb.wg.Add(1) + go func() { + vb.sm.WatchLeases(evts, vb.stop) + vb.wg.Done() + }() + + defer vb.wg.Wait() + + for { + select { + case miss := <-misses: + vb.handleMiss(miss) + + case evtBatch := <-evts: + vb.handleSubnetEvents(evtBatch) + + case <-vb.stop: + return + } + } +} + +func (vb *VXLANBackend) Stop() { + close(vb.stop) +} + +func (vb *VXLANBackend) Name() string { + return "VXLAN" +} + +// So we can make it JSON (un)marshalable +type hardwareAddr net.HardwareAddr + +func (hw hardwareAddr) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("%q", net.HardwareAddr(hw))), nil +} + +func (hw *hardwareAddr) UnmarshalJSON(b []byte) error { + if len(b) < 2 || b[0] != '"' || b[len(b)-1] != '"' { + return fmt.Errorf("error parsing hardware addr") + } + + b = b[1:len(b)-1] + + mac, err := net.ParseMAC(string(b)) + if err != nil { + return err + } + + *hw = hardwareAddr(mac) + return nil +} + +type vxlanLeaseAttrs struct { + VtepMAC hardwareAddr +} + +func (vb *VXLANBackend) handleSubnetEvents(batch subnet.EventBatch) { + for _, evt := range batch { + switch evt.Type { + case subnet.SubnetAdded: + log.Info("Subnet added: ", evt.Lease.Network) + + if evt.Lease.Attrs.BackendType != "vxlan" { + log.Warningf("Ignoring non-vxlan subnet: type=%v", evt.Lease.Attrs.BackendType) + continue + } + + var attrs vxlanLeaseAttrs + if err := json.Unmarshal(evt.Lease.Attrs.BackendData, &attrs); err != nil { + log.Error("Error decoding subnet lease JSON: ", err) + continue + } + + vb.rts.set(evt.Lease.Network, evt.Lease.Attrs.PublicIP.ToIP(), net.HardwareAddr(attrs.VtepMAC)) + + case subnet.SubnetRemoved: + log.Info("Subnet removed: ", evt.Lease.Network) + + vb.rts.remove(evt.Lease.Network) + + if evt.Lease.Attrs.BackendType != "vxlan" { + log.Warningf("Ignoring non-vxlan subnet: type=%v", evt.Lease.Attrs.BackendType) + continue + } + + var attrs vxlanLeaseAttrs + if err := json.Unmarshal(evt.Lease.Attrs.BackendData, &attrs); err != nil { + log.Error("Error decoding subnet lease JSON: ", err) + continue + } + + if len(attrs.VtepMAC) > 0 { + vb.dev.DelL2(net.HardwareAddr(attrs.VtepMAC), evt.Lease.Attrs.PublicIP.ToIP()) + } + + default: + log.Error("Internal error: unknown event type: ", int(evt.Type)) + } + } +} + +func (vb *VXLANBackend) handleMiss(miss *netlink.Neigh) { + switch { + case len(miss.IP) == 0 && len(miss.HardwareAddr) == 0: + log.Info("Ignoring nil miss") + + case len(miss.IP) == 0: + vb.handleL2Miss(miss) + + case len(miss.HardwareAddr) == 0: + vb.handleL3Miss(miss) + + default: + log.Infof("Ignoring not a miss: %v, %v", miss.HardwareAddr, miss.IP) + } +} + +func (vb *VXLANBackend) handleL2Miss(miss *netlink.Neigh) { + log.Infof("L2 miss: %v", miss.HardwareAddr) + + rt := vb.rts.findByVtepMAC(miss.HardwareAddr) + if rt == nil { + log.Infof("Route for %v not found", miss.HardwareAddr) + return + } + + if err := vb.dev.AddL2(miss.HardwareAddr, rt.vtepIP); err != nil { + log.Errorf("AddL2 failed: %v", err) + } else { + log.Info("AddL2 succeeded") + } +} + +func (vb *VXLANBackend) handleL3Miss(miss *netlink.Neigh) { + log.Infof("L3 miss: %v", miss.IP) + + rt := vb.rts.findByNetwork(ip.FromIP(miss.IP)) + if rt == nil { + log.Infof("Route for %v not found", miss.IP) + return + } + + if err := vb.dev.AddL3(miss.IP, rt.vtepMAC); err != nil { + log.Errorf("AddL3 failed: %v", err) + } else { + log.Info("AddL3 succeeded") + } +} diff --git a/main.go b/main.go index 62b0c596de..5f819233a6 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/flannel/subnet" "github.com/coreos/flannel/backend/alloc" "github.com/coreos/flannel/backend/udp" + "github.com/coreos/flannel/backend/vxlan" ) type CmdLineOpts struct { @@ -159,6 +160,8 @@ func newBackend() (backend.Backend, error) { return udp.New(sm, config.Backend), nil case "alloc": return alloc.New(sm), nil + case "vxlan": + return vxlan.New(sm, config.Backend), nil default: return nil, fmt.Errorf("'%v': unknown backend type", bt.Type) } @@ -189,7 +192,6 @@ func run(be backend.Backend, exit chan int) { sn, err := be.Init(iface, ipaddr, opts.ipMasq) if err != nil { - log.Errorf("Could not init %v backend: %v", be.Name(), err) return } diff --git a/subnet/subnet.go b/subnet/subnet.go index aa2e85c209..465952fad3 100644 --- a/subnet/subnet.go +++ b/subnet/subnet.go @@ -38,9 +38,15 @@ var ( subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`) ) +type LeaseAttrs struct { + PublicIP ip.IP4 + BackendType string `json:",omitempty"` + BackendData json.RawMessage `json:",omitempty"` +} + type SubnetLease struct { Network ip.IP4Net - Data string + Attrs LeaseAttrs } type SubnetManager struct { @@ -66,15 +72,9 @@ func NewSubnetManager(etcdEndpoint []string, prefix string) (*SubnetManager, err return newSubnetManager(esr) } -func (sm *SubnetManager) AcquireLease(extIP ip.IP4, data interface{}, cancel chan bool) (ip.IP4Net, error) { - dataBytes, err := json.Marshal(data) - if err != nil { - return ip.IP4Net{}, err - } - - var sn ip.IP4Net +func (sm *SubnetManager) AcquireLease(attrs *LeaseAttrs, cancel chan bool) (ip.IP4Net, error) { for { - sn, err = sm.acquireLeaseOnce(extIP, string(dataBytes), cancel) + sn, err := sm.acquireLeaseOnce(attrs, cancel) switch { case err == nil: log.Info("Subnet lease acquired: ", sn) @@ -96,53 +96,71 @@ func (sm *SubnetManager) AcquireLease(extIP ip.IP4, data interface{}, cancel cha } } -func (sm *SubnetManager) acquireLeaseOnce(extIP ip.IP4, data string, cancel chan bool) (ip.IP4Net, error) { - for i := 0; i < registerRetries; i++ { - var err error - sm.leases, err = sm.getLeases() - if err != nil { - return ip.IP4Net{}, err +func findLeaseByIP(leases []SubnetLease, pubIP ip.IP4) *SubnetLease { + for _, l := range leases { + if pubIP == l.Attrs.PublicIP { + return &l } + } - // try to reuse a subnet if there's one that matches our IP - for _, l := range sm.leases { - var ba BaseAttrs - err = json.Unmarshal([]byte(l.Data), &ba) - if err != nil { - log.Error("Error parsing subnet lease JSON: ", err) - } else { - if extIP == ba.PublicIP { - resp, err := sm.registry.updateSubnet(l.Network.StringSep(".", "-"), data, subnetTTL) - if err != nil { - return ip.IP4Net{}, err - } - - sm.myLease.Network = l.Network - sm.leaseExp = *resp.Node.Expiration - return l.Network, nil - } - } - } + return nil +} - // no existing match, grab a new one - sn, err := sm.allocateSubnet() +func (sm *SubnetManager) tryAcquireLease(extIP ip.IP4, attrs []byte) (ip.IP4Net, error) { + var err error + sm.leases, err = sm.getLeases() + if err != nil { + return ip.IP4Net{}, err + } + + // try to reuse a subnet if there's one that matches our IP + if l := findLeaseByIP(sm.leases, extIP); l != nil { + resp, err := sm.registry.updateSubnet(l.Network.StringSep(".", "-"), string(attrs), subnetTTL) if err != nil { return ip.IP4Net{}, err } - resp, err := sm.registry.createSubnet(sn.StringSep(".", "-"), data, subnetTTL) - switch { - case err == nil: - sm.myLease.Network = sn - sm.leaseExp = *resp.Node.Expiration - return sn, nil + sm.myLease.Network = l.Network + sm.leaseExp = *resp.Node.Expiration + return l.Network, nil + } - // if etcd returned Key Already Exists, try again. - case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists: - break + // no existing match, grab a new one + sn, err := sm.allocateSubnet() + if err != nil { + return ip.IP4Net{}, err + } - default: + resp, err := sm.registry.createSubnet(sn.StringSep(".", "-"), string(attrs), subnetTTL) + switch { + case err == nil: + sm.myLease.Network = sn + sm.leaseExp = *resp.Node.Expiration + return sn, nil + + // if etcd returned Key Already Exists, try again. + case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists: + return ip.IP4Net{}, nil + + default: + return ip.IP4Net{}, err + } +} + +func (sm *SubnetManager) acquireLeaseOnce(attrs *LeaseAttrs, cancel chan bool) (ip.IP4Net, error) { + attrBytes, err := json.Marshal(attrs) + if err != nil { + log.Errorf("marshal failed: %#v, %v", attrs, err) + return ip.IP4Net{}, err + } + + for i := 0; i < registerRetries; i++ { + sn, err := sm.tryAcquireLease(attrs.PublicIP, attrBytes) + switch { + case err != nil: return ip.IP4Net{}, err + case sn.IP != 0: + return sn, nil } // before moving on, check for cancel @@ -154,12 +172,6 @@ func (sm *SubnetManager) acquireLeaseOnce(extIP ip.IP4, data string, cancel chan return ip.IP4Net{}, errors.New("Max retries reached trying to acquire a subnet") } -func (sm *SubnetManager) UpdateSubnet(data string) error { - resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), data, subnetTTL) - sm.leaseExp = *resp.Node.Expiration - return err -} - func (sm *SubnetManager) GetConfig() *Config { return sm.config } @@ -205,8 +217,11 @@ func (sm *SubnetManager) getLeases() ([]SubnetLease, error) { for _, node := range resp.Node.Nodes { sn, err := parseSubnetKey(node.Key) if err == nil { - lease := SubnetLease{sn, node.Value} - leases = append(leases, lease) + var attrs LeaseAttrs + if err = json.Unmarshal([]byte(node.Value), &attrs); err == nil { + lease := SubnetLease{sn, attrs} + leases = append(leases, lease) + } } } sm.lastIndex = resp.EtcdIndex @@ -261,39 +276,41 @@ func (sm *SubnetManager) applyLeases(newLeases []SubnetLease) EventBatch { return batch } -func (sm *SubnetManager) applySubnetChange(action string, ipn ip.IP4Net, data string) Event { +func (sm *SubnetManager) applySubnetChange(action string, ipn ip.IP4Net, data string) (Event, error) { switch action { case "delete", "expire": for i, l := range sm.leases { if l.Network.Equal(ipn) { deleteLease(sm.leases, i) - return Event{SubnetRemoved, l} + return Event{SubnetRemoved, l}, nil } } log.Errorf("Removed subnet (%s) was not found", ipn) return Event{ SubnetRemoved, - SubnetLease{ipn, ""}, - } + SubnetLease{ipn, LeaseAttrs{}}, + }, nil default: + var attrs LeaseAttrs + err := json.Unmarshal([]byte(data), &attrs) + if err != nil { + return Event{}, err + } + for i, l := range sm.leases { if l.Network.Equal(ipn) { - sm.leases[i] = SubnetLease{ipn, data} - return Event{SubnetAdded, sm.leases[i]} + sm.leases[i] = SubnetLease{ipn, attrs} + return Event{SubnetAdded, sm.leases[i]}, nil } } - sm.leases = append(sm.leases, SubnetLease{ipn, data}) - return Event{SubnetAdded, sm.leases[len(sm.leases)-1]} + sm.leases = append(sm.leases, SubnetLease{ipn, attrs}) + return Event{SubnetAdded, sm.leases[len(sm.leases)-1]}, nil } } -type BaseAttrs struct { - PublicIP ip.IP4 -} - func (sm *SubnetManager) allocateSubnet() (ip.IP4Net, error) { log.Infof("Picking subnet in range %s ... %s", sm.config.SubnetMin, sm.config.SubnetMax) @@ -369,7 +386,10 @@ func (sm *SubnetManager) parseSubnetWatchResponse(resp *etcd.Response) (batch *E // Don't process our own changes if !sm.myLease.Network.Equal(sn) { - evt := sm.applySubnetChange(resp.Action, sn, resp.Node.Value) + evt, err := sm.applySubnetChange(resp.Action, sn, resp.Node.Value) + if err != nil { + return nil, err + } batch = &EventBatch{evt} } @@ -404,7 +424,14 @@ func (sm *SubnetManager) LeaseRenewer(cancel chan bool) { for { select { case <-time.After(dur): - resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), sm.myLease.Data, subnetTTL) + attrBytes, err := json.Marshal(sm.myLease.Attrs) + if err != nil { + log.Error("Error renewing lease (trying again in 1 min): ", err) + dur = time.Minute + continue + } + + resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), string(attrBytes), subnetTTL) if err != nil { log.Error("Error renewing lease (trying again in 1 min): ", err) dur = time.Minute diff --git a/subnet/subnet_test.go b/subnet/subnet_test.go index 22507d77a8..803fdc7710 100644 --- a/subnet/subnet_test.go +++ b/subnet/subnet_test.go @@ -112,6 +112,7 @@ func (msr *mockSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd case sn = <-msr.addCh: n := etcd.Node{ Key: sn, + Value: `{"PublicIP": "1.1.1.1"}`, ModifiedIndex: msr.index, } msr.subnets.Nodes = append(msr.subnets.Nodes, &n) @@ -152,12 +153,12 @@ func TestAcquireLease(t *testing.T) { } extIP, _ := ip.ParseIP4("1.2.3.4") - data := BaseAttrs{ + attrs := LeaseAttrs{ PublicIP: extIP, } cancel := make(chan bool) - sn, err := sm.AcquireLease(extIP, data, cancel) + sn, err := sm.AcquireLease(&attrs, cancel) if err != nil { t.Fatal("AcquireLease failed: ", err) } @@ -167,7 +168,7 @@ func TestAcquireLease(t *testing.T) { } // Acquire again, should reuse - if sn, err = sm.AcquireLease(extIP, data, cancel); err != nil { + if sn, err = sm.AcquireLease(&attrs, cancel); err != nil { t.Fatal("AcquireLease failed: ", err) } @@ -258,14 +259,14 @@ func TestRenewLease(t *testing.T) { } extIP, _ := ip.ParseIP4("1.2.3.4") - data := BaseAttrs{ + attrs := LeaseAttrs{ PublicIP: extIP, } cancel := make(chan bool) defer close(cancel) - sn, err := sm.AcquireLease(extIP, data, cancel) + sn, err := sm.AcquireLease(&attrs, cancel) if err != nil { t.Fatal("AcquireLease failed: ", err) }