From bd6d8fe1cef609d08c6cf449b60d958795913291 Mon Sep 17 00:00:00 2001 From: Renzo Toma Date: Fri, 2 Dec 2016 22:02:36 +0100 Subject: [PATCH] Added endpoint labelling feature. Change-Id: I7c18a9223eef740fdfe6bb6463d1717398a2351c --- .gitignore | 1 + README.md | 59 +++++ driver/network_driver.go | 229 ++++++++++++++++-- .../libnetwork_env_var_test.go | 94 +++++++ .../libnetwork_suite_test.go | 16 ++ tests/default_environment/libnetwork_test.go | 22 ++ 6 files changed, 400 insertions(+), 21 deletions(-) create mode 100644 tests/custom_wep_labelling/libnetwork_env_var_test.go create mode 100644 tests/custom_wep_labelling/libnetwork_suite_test.go diff --git a/.gitignore b/.gitignore index e58a490..3b25e0d 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ cover calicoctl *.tar *.created +.go-pkg-cache/ diff --git a/README.md b/README.md index 4917164..5cd7a80 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,19 @@ Running the plugin in a container requires a few specific options - `-v /run/docker/plugins:/run/docker/plugins` allows the docker daemon to discover the plugin - `-v /var/run/docker.sock:/var/run/docker.sock` allows the plugin to query the docker daemon +## How to Test It + +### On Linux + +`make test` is all you need. + +### On OSX/Windows + +On OSX/Windows you can't run Docker natively. To allow the Makefile to write the build libnetwork-plugin to your host's filesystem and to allow the test to access the Docker daemon via the unix socket, the user id and group id of the docker user are needed. For boot2docker the user id is 1000 and group id 100. + +Run `make test` like this: `LOCAL_USER_ID=1000 LOCAL_GROUP_ID=100 make test-containerized` + + ## Known limitations The following is a list of known limitations when using the Calico libnetwork driver: @@ -33,13 +46,59 @@ driver: ## Configuring To change the prefix used for the interface in containers that Docker runs, set the `CALICO_LIBNETWORK_IFPREFIX` environment variable. + * The default value is "cali" +To enable debug logging set the `CALICO_DEBUG` environment variable. + +The plugin creates a Calico profile resource for the Docker network used (e.g. `docker run --net ...`). This is enabled by default. It can be disabled by setting the environment: `CALICO_LIBNETWORK_CREATE_PROFILES=false`. + +The plugin can copy Docker container labels to the corresponding Calico workloadendpoint. This feature is disabled by default. It can be enabled by setting the environment: `CALICO_LIBNETWORK_LABEL_ENDPOINTS=true`. + +## Workloadendpoint labelling +If you want to use Calico policies you need labels on the Calico workloadendpoint. The plugin can set labels by copying a subset of the Docker container labels. + +To enable this feature you need to set the environment: `CALICO_LIBNETWORK_LABEL_ENDPOINTS=true`. + +Only container labels starting with `org.projectcalico.label.` are used. This prefix is removed and the remaining key is used a label key in the workloadendpoint. + +Example: `docker run --label org.projectcalico.label.foo=bar --net ...` will create a workloadendpoint with label `foo=bar`. Of course you can use multiple `--label org.projectcalico.label.=` options. + + +*NOTE:* the labels are added to the workloadendpoint using an update, because the container information is not available at the moment the workloadendpoint resource is created. + ## Troubleshooting ### Logging Logs are sent to STDOUT. If using Docker these can be viewed with the `docker logs` command. +### Monitoring + +Check the plugin health by executing API calls. + +NetworkDriver: + +``` +# echo -e "GET /NetworkDriver.GetCapabilities HTTP/1.0\r\n\r\n" | nc -U /run/docker/plugins/calico.sock +HTTP/1.0 200 OK +Content-Type: application/vnd.docker.plugins.v1.1+json +Date: Thu, 08 Dec 2016 10:00:41 GMT +Content-Length: 19 + +{"Scope":"global"} +``` + +IpamDriver: + +``` +# echo -e "GET /IpamDriver.GetCapabilities HTTP/1.0\r\n\r\n" | nc -U /run/docker/plugins/calico-ipam.sock +HTTP/1.0 200 OK +Content-Type: application/vnd.docker.plugins.v1.1+json +Date: Thu, 08 Dec 2016 10:02:51 GMT +Content-Length: 29 + +{"RequiresMACAddress":false} +``` [![Analytics](https://calico-ga-beacon.appspot.com/UA-52125893-3/libnetwork-plugin/README.md?pixel)](https://github.com/igrigorik/ga-beacon) diff --git a/driver/network_driver.go b/driver/network_driver.go index 95f47e2..c3b7abc 100644 --- a/driver/network_driver.go +++ b/driver/network_driver.go @@ -3,6 +3,9 @@ package driver import ( "context" "net" + "os" + "strings" + "time" log "github.com/Sirupsen/logrus" "github.com/pkg/errors" @@ -20,6 +23,11 @@ import ( osutils "github.com/projectcalico/libnetwork-plugin/utils/os" ) +const DOCKER_LABEL_PREFIX = "org.projectcalico.label." +const LABEL_POLL_TIMEOUT_ENVKEY = "CALICO_LIBNETWORK_LABEL_POLL_TIMEOUT" +const CREATE_PROFILES_ENVKEY = "CALICO_LIBNETWORK_CREATE_PROFILES" +const LABEL_ENDPOINTS_ENVKEY = "CALICO_LIBNETWORK_LABEL_ENDPOINTS" + // NetworkDriver is the Calico network driver representation. // Must be used with Calico IPAM and supports IPv4 only. type NetworkDriver struct { @@ -31,10 +39,15 @@ type NetworkDriver struct { ifPrefix string DummyIPV4Nexthop string + + labelPollTimeout time.Duration + + createProfiles bool + labelEndpoints bool } func NewNetworkDriver(client *datastoreClient.Client) network.Driver { - return NetworkDriver{ + driver := NetworkDriver{ client: client, // The MAC address of the interface in the container is arbitrary, for @@ -49,7 +62,22 @@ func NewNetworkDriver(client *datastoreClient.Client) network.Driver { ifPrefix: IFPrefix, DummyIPV4Nexthop: "169.254.1.1", + + // default: enabled, disable by setting env key to false (case insensitive) + createProfiles: !strings.EqualFold(os.Getenv(CREATE_PROFILES_ENVKEY), "false"), + + // default: disabled, enable by setting env key to true (case insensitive) + labelEndpoints: strings.EqualFold(os.Getenv(LABEL_ENDPOINTS_ENVKEY), "true"), + } + + if !driver.createProfiles { + log.Info("Feature disabled: no Calico profiles will be created per network") + } + if driver.labelEndpoints { + log.Info("Feature enabled: Calico workloadendpoints will be labelled with Docker labels") + driver.labelPollTimeout = getLabelPollTimeout() } + return driver } func (d NetworkDriver) GetCapabilities() (*network.CapabilitiesResponse, error) { @@ -167,26 +195,28 @@ func (d NetworkDriver) CreateEndpoint(request *network.CreateEndpointRequest) (* return nil, err } - // Now that we know the network name, set it on the endpoint. - endpoint.Spec.Profiles = append(endpoint.Spec.Profiles, networkData.Name) - - // If a profile for the network name doesn't exist then it needs to be created. - // We always attempt to create the profile and rely on the datastore to reject - // the request if the profile already exists. - profile := &api.Profile{ - Metadata: api.ProfileMetadata{ - Name: networkData.Name, - Tags: []string{networkData.Name}, - }, - Spec: api.ProfileSpec{ - EgressRules: []api.Rule{{Action: "allow"}}, - IngressRules: []api.Rule{{Action: "allow", Source: api.EntityRule{Tag: networkData.Name}}}, - }, - } - if _, err := d.client.Profiles().Create(profile); err != nil { - if _, ok := err.(libcalicoErrors.ErrorResourceAlreadyExists); !ok { - log.Errorln(err) - return nil, err + if d.createProfiles { + // Now that we know the network name, set it on the endpoint. + endpoint.Spec.Profiles = append(endpoint.Spec.Profiles, networkData.Name) + + // If a profile for the network name doesn't exist then it needs to be created. + // We always attempt to create the profile and rely on the datastore to reject + // the request if the profile already exists. + profile := &api.Profile{ + Metadata: api.ProfileMetadata{ + Name: networkData.Name, + Tags: []string{networkData.Name}, + }, + Spec: api.ProfileSpec{ + EgressRules: []api.Rule{{Action: "allow"}}, + IngressRules: []api.Rule{{Action: "allow", Source: api.EntityRule{Tag: networkData.Name}}}, + }, + } + if _, err := d.client.Profiles().Create(profile); err != nil { + if _, ok := err.(libcalicoErrors.ErrorResourceAlreadyExists); !ok { + log.Errorln(err) + return nil, err + } } } @@ -199,6 +229,11 @@ func (d NetworkDriver) CreateEndpoint(request *network.CreateEndpointRequest) (* } log.Debugf("Workload created, data: %+v\n", endpoint) + + if d.labelEndpoints { + go d.populateWorkloadEndpointWithLabels(request.NetworkID, endpoint) + } + var endpointInterface network.EndpointInterface if !userProvidedMac { endpointInterface = network.EndpointInterface{ @@ -318,3 +353,155 @@ func (d NetworkDriver) ProgramExternalConnectivity(*network.ProgramExternalConne func (d NetworkDriver) RevokeExternalConnectivity(*network.RevokeExternalConnectivityRequest) error { return nil } + +// Try to get the container's labels and update the WorkloadEndpoint with them +// Since we do not get container info in the libnetwork API methods we need to +// get them ourselves. +// +// This is how: +// - first we try to get a list of containers attached to the custom network +// - if there is a container with our endpointID, we try to inspect that container +// - any labels for that container prefixed by our 'magic' prefix are added to +// our WorkloadEndpoint resource +// +// Above may take 1 or more retries, because Docker has to update the +// container list in the NetworkInspect and make the Container available +// for inspecting. +func (d NetworkDriver) populateWorkloadEndpointWithLabels(networkID string, endpoint *api.WorkloadEndpoint) { + endpointID := endpoint.Metadata.Name + + retrySleep := time.Duration(100 * time.Millisecond) + + start := time.Now() + deadline := start.Add(d.labelPollTimeout) + + dockerCli, err := dockerClient.NewEnvClient() + if err != nil { + err = errors.Wrap(err, "Error while attempting to instantiate docker client from env") + log.Errorln(err) + return + } + defer dockerCli.Close() + +RETRY_NETWORK_INSPECT: + if time.Now().After(deadline) { + log.Errorf("Getting labels for workloadEndpoint timed out in network inspect loop. Took %s", time.Since(start)) + return + } + + // inspect our custom network + networkData, err := dockerCli.NetworkInspect(context.Background(), networkID) + if err != nil { + err = errors.Wrapf(err, "Error inspecting network %s - retrying (T=%s)", networkID, time.Since(start)) + log.Warningln(err) + // was unable to inspect network, let's retry + time.Sleep(retrySleep) + goto RETRY_NETWORK_INSPECT + } + logutils.JSONMessage("NetworkInspect response", networkData) + + // try to find the container for which we created an endpoint + containerID := "" + for id, containerInNetwork := range networkData.Containers { + if containerInNetwork.EndpointID == endpointID { + // skip funky identified containers - observed with dind 1.13.0-rc3, gone in -rc5 + // { + // "Containers": { + // "ep-736ccfa7cd61ced93b67f7465ddb79633ea6d1f718a8ca7d9d19226f5d3521b0": { + // "Name": "run1466946597", + // "EndpointID": "736ccfa7cd61ced93b67f7465ddb79633ea6d1f718a8ca7d9d19226f5d3521b0", + // ... + // } + // } + // } + if strings.HasPrefix(id, "ep-") { + log.Debugf("Skipping container entry with matching endpointID, but illegal id: %s", id) + } else { + containerID = id + log.Debugf("Container %s found in NetworkInspect result (T=%s)", containerID, time.Since(start)) + break + } + } + } + + if containerID == "" { + // cause: Docker has not yet processed the libnetwork CreateEndpoint response. + log.Warnf("Container not found in NetworkInspect result - retrying (T=%s)", time.Since(start)) + // let's retry + time.Sleep(retrySleep) + goto RETRY_NETWORK_INSPECT + } + +RETRY_CONTAINER_INSPECT: + if time.Now().After(deadline) { + log.Errorf("Getting labels for workloadEndpoint timed out in container inspect loop. Took %s", time.Since(start)) + return + } + + containerInfo, err := dockerCli.ContainerInspect(context.Background(), containerID) + if err != nil { + err = errors.Wrapf(err, "Error inspecting container %s for labels - retrying (T=%s)", containerID, time.Since(start)) + log.Warningln(err) + // was unable to inspect container, let's retry + time.Sleep(100 * time.Millisecond) + goto RETRY_CONTAINER_INSPECT + } + + log.Debugf("Container inspected, processing labels now (T=%s)", time.Since(start)) + + // make sure we have a labels map in the workloadEndpoint + if endpoint.Metadata.Labels == nil { + endpoint.Metadata.Labels = map[string]string{} + } + + labelsFound := 0 + for label, labelValue := range containerInfo.Config.Labels { + if !strings.HasPrefix(label, DOCKER_LABEL_PREFIX) { + continue + } + labelsFound++ + labelClean := strings.TrimPrefix(label, DOCKER_LABEL_PREFIX) + endpoint.Metadata.Labels[labelClean] = labelValue + log.Debugf("Found label for WorkloadEndpoint: %s=%s", labelClean, labelValue) + } + + if labelsFound == 0 { + log.Debugf("No labels found for container (T=%s)", endpointID, time.Since(start)) + return + } + + // lets update the workloadEndpoint + _, err = d.client.WorkloadEndpoints().Update(endpoint) + if err != nil { + err = errors.Wrapf(err, "Unable to update WorkloadEndpoint with labels (T=%s)", time.Since(start)) + log.Errorln(err) + return + } + + log.Infof("WorkloadEndpoint %s updated with labels: %v (T=%s)", + endpointID, endpoint.Metadata.Labels, time.Since(start)) + +} + +// Returns the label poll timeout. Default is returned unless an environment +// key is set to a valid time.Duration. +func getLabelPollTimeout() time.Duration { + // 5 seconds should be more than enough for this plugin to get the + // container labels. More info in func populateWorkloadEndpointWithLabels + defaultTimeout := time.Duration(5 * time.Second) + + timeoutVal := os.Getenv(LABEL_POLL_TIMEOUT_ENVKEY) + if timeoutVal == "" { + return defaultTimeout + } + + labelPollTimeout, err := time.ParseDuration(timeoutVal) + if err != nil { + err = errors.Wrapf(err, "Label poll timeout specified via env key %s is invalid, using default %s", + LABEL_POLL_TIMEOUT_ENVKEY, defaultTimeout) + log.Warningln(err) + return defaultTimeout + } + log.Infof("Using custom label poll timeout: %s", labelPollTimeout) + return labelPollTimeout +} diff --git a/tests/custom_wep_labelling/libnetwork_env_var_test.go b/tests/custom_wep_labelling/libnetwork_env_var_test.go new file mode 100644 index 0000000..202fc4b --- /dev/null +++ b/tests/custom_wep_labelling/libnetwork_env_var_test.go @@ -0,0 +1,94 @@ +package custom_wep_labelling + +import ( + "fmt" + "math/rand" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + mathutils "github.com/projectcalico/libnetwork-plugin/utils/math" + . "github.com/projectcalico/libnetwork-plugin/utils/test" +) + +var _ = Describe("Running plugin with custom ENV", func() { + Describe("docker run", func() { + It("creates a container on a network with WEP labelling enabled", func() { + RunPlugin("-e CALICO_LIBNETWORK_LABEL_ENDPOINTS=true") + + // Since running the plugin starts etcd, the pool needs to be created after. + CreatePool("192.169.1.0/24") + + name := fmt.Sprintf("run%d", rand.Uint32()) + DockerString(fmt.Sprintf("docker network create %s -d calico --ipam-driver calico-ipam", name)) + + // Create a container that will just sit in the background + DockerString(fmt.Sprintf("docker run --net %s -tid --label not=expected --label org.projectcalico.label.foo=bar --label org.projectcalico.label.baz=quux --name %s busybox", name, name)) + + // Gather information for assertions + docker_endpoint := GetDockerEndpoint(name, name) + ip := docker_endpoint.IPAddress + mac := docker_endpoint.MacAddress + endpoint_id := docker_endpoint.EndpointID + interface_name := "cali" + endpoint_id[:mathutils.MinInt(11, len(endpoint_id))] + + // Sleep to allow the plugin to query the started container and update the WEP + // Alternative: query etcd until we hit jackpot or timeout + time.Sleep(time.Second) + + // Check that the endpoint is created in etcd + etcd_endpoint := GetEtcdString(fmt.Sprintf("/calico/v1/host/test/workload/libnetwork/libnetwork/endpoint/%s", endpoint_id)) + Expect(etcd_endpoint).Should(MatchJSON(fmt.Sprintf( + `{"state":"active","name":"%s","mac":"%s","profile_ids":["%s"],"ipv4_nets":["%s/32"],"ipv6_nets":[],"labels":{"baz":"quux","foo": "bar"}}`, + interface_name, mac, name, ip))) + + // Check profile + tags := GetEtcdString(fmt.Sprintf("/calico/v1/policy/profile/%s/tags", name)) + labels := GetEtcdString(fmt.Sprintf("/calico/v1/policy/profile/%s/labels", name)) + rules := GetEtcdString(fmt.Sprintf("/calico/v1/policy/profile/%s/rules", name)) + Expect(tags).Should(MatchJSON(fmt.Sprintf(`["%s"]`, name))) + Expect(labels).Should(MatchJSON("{}")) + Expect(rules).Should(MatchJSON(fmt.Sprintf(`{"inbound_rules": [{"action": "allow","src_tag": "%s"}],"outbound_rules":[{"action": "allow"}]}`, name))) + + // Delete container + DockerString(fmt.Sprintf("docker rm -f %s", name)) + }) + }) + + Describe("docker run", func() { + It("creates a container on a network with WEP labelling enabled and profile creation disabled", func() { + // Run the plugin with custom IFPREFIX + RunPlugin("-e CALICO_LIBNETWORK_LABEL_ENDPOINTS=true -e CALICO_LIBNETWORK_CREATE_PROFILES=false") + + // Since running the plugin starts etcd, the pool needs to be created after. + CreatePool("192.169.2.0/24") + + name := fmt.Sprintf("run%d", rand.Uint32()) + DockerString(fmt.Sprintf("docker network create %s -d calico --ipam-driver calico-ipam", name)) + + // Create a container that will just sit in the background + DockerString(fmt.Sprintf("docker run --net %s -tid --label not=expected --label org.projectcalico.label.foo=bar --label org.projectcalico.label.baz=quux --name %s busybox", name, name)) + + // Gather information for assertions + docker_endpoint := GetDockerEndpoint(name, name) + ip := docker_endpoint.IPAddress + mac := docker_endpoint.MacAddress + endpoint_id := docker_endpoint.EndpointID + interface_name := "cali" + endpoint_id[:mathutils.MinInt(11, len(endpoint_id))] + + // Sleep to allow the plugin to query the started container and update the WEP + // Alternative: query etcd until we hit jackpot or timeout + time.Sleep(time.Second) + + // Check that the endpoint is created in etcd + etcd_endpoint := GetEtcdString(fmt.Sprintf("/calico/v1/host/test/workload/libnetwork/libnetwork/endpoint/%s", endpoint_id)) + Expect(etcd_endpoint).Should(MatchJSON(fmt.Sprintf( + `{"state":"active","name":"%s","mac":"%s","profile_ids":null,"ipv4_nets":["%s/32"],"ipv6_nets":[],"labels":{"baz":"quux","foo": "bar"}}`, + interface_name, mac, ip))) + + // Delete container + DockerString(fmt.Sprintf("docker rm -f %s", name)) + }) + }) + +}) diff --git a/tests/custom_wep_labelling/libnetwork_suite_test.go b/tests/custom_wep_labelling/libnetwork_suite_test.go new file mode 100644 index 0000000..4c9a9d4 --- /dev/null +++ b/tests/custom_wep_labelling/libnetwork_suite_test.go @@ -0,0 +1,16 @@ +package custom_wep_labelling + +// Tests in this suite are for when the plugin has been configured to label +// workload endpoints + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestLibnetwork(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Custom WEP labelling Libnetwork Suite") +} diff --git a/tests/default_environment/libnetwork_test.go b/tests/default_environment/libnetwork_test.go index cd03d3a..f4f244e 100644 --- a/tests/default_environment/libnetwork_test.go +++ b/tests/default_environment/libnetwork_test.go @@ -262,7 +262,29 @@ var _ = Describe("Libnetwork Tests", func() { DockerString(fmt.Sprintf("docker network rm %s", name_subnet)) }) + It("creates a container with labels, but do not expect those in endpoint", func() { + // Create a container that will just sit in the background + DockerString(fmt.Sprintf("docker run --net %s -tid --label org.projectcalico.label.foo=bar --label org.projectcalico.label.baz=quux --name %s busybox", name, name)) + + // Gather information for assertions + docker_endpoint := GetDockerEndpoint(name, name) + ip := docker_endpoint.IPAddress + mac := docker_endpoint.MacAddress + endpoint_id := docker_endpoint.EndpointID + interface_name := "cali" + endpoint_id[:mathutils.MinInt(11, len(endpoint_id))] + + // Check that the endpoint is created in etcd + etcd_endpoint := GetEtcdString(fmt.Sprintf("/calico/v1/host/test/workload/libnetwork/libnetwork/endpoint/%s", endpoint_id)) + Expect(etcd_endpoint).Should(MatchJSON(fmt.Sprintf( + `{"state":"active","name":"%s","mac":"%s","profile_ids":["%s"],"ipv4_nets":["%s/32"],"ipv6_nets":[]}`, + interface_name, mac, name, ip))) + + // Delete container + DockerString(fmt.Sprintf("docker rm -f %s", name)) + }) + }) + //docker stop/rm - stop and rm are the same as far as the plugin is concerned // TODO - check that the endpoint is removed from etcd and that the veth is removed })