From c775870630046701b9b7f0f888ea7db121c8db4c Mon Sep 17 00:00:00 2001 From: Jun Lin Chen Date: Fri, 25 Nov 2022 23:48:52 -0500 Subject: [PATCH] FIX: k3s support --- client/client.go | 279 ++++++++++++++++++++------------- client/client_test.go | 72 ++++++--- demo/chart/templates/NOTES.txt | 1 + 3 files changed, 221 insertions(+), 131 deletions(-) diff --git a/client/client.go b/client/client.go index f0918b2..39567de 100644 --- a/client/client.go +++ b/client/client.go @@ -38,6 +38,7 @@ import ( "net" "os" "path/filepath" + "regexp" "strings" "sync" "time" @@ -83,20 +84,21 @@ type Client struct { // ----------------------------------------------------------------------------- // Base Image Searching -func escapeSlashes(s string) string { - s = strings.ReplaceAll(s, "\\", "\\\\") - return strings.ReplaceAll(s, "/", "\\/") -} - -func getImageFilter(ref string) string { +// getImageFilter returns a filter string for containerd to find the image +// if completed is set to true, it will only return fully downloaded container images +func getImageFilter(ref string, completed bool) string { + c := "" + if completed { + c = "," + "labels." + util.ContentLabelCompletion + } return fmt.Sprintf( - "name~=/^%s.*/,labels.%s==%s,%s", + "name~=^%s,labels.%s==%s%s", // choose images with the same name (just the tags are different) - escapeSlashes(ref), + regexp.QuoteMeta(ref), // choose images that are pulled by starlight util.ImageLabelPuller, "starlight", // choose completed images - "labels."+util.ContentLabelCompletion, + c, ) } @@ -138,9 +140,9 @@ func (c *Client) FindBaseImage(ctr *containerd.Client, base, ref string) (img co if baseFilter == "" { return nil, fmt.Errorf("invalid image reference: %s, missing tag", ref) } - baseFilter = getImageFilter(baseFilter) + baseFilter = getImageFilter(baseFilter, true) } else { - baseFilter = getImageFilter(base) + baseFilter = getImageFilter(base, true) } img, err = c.findImage(ctr, baseFilter) @@ -462,19 +464,88 @@ func (c *Client) UploadTraces(proxyCfg string, tc *fs.TraceCollection) error { return nil } +type PullFinishedMessage struct { + img *images.Image + base string + err error +} + +func (c *Client) pullImageSync(ctr *containerd.Client, base containerd.Image, + ref, platform, proxyCfg string) (img *images.Image, err error) { + msg := make(chan PullFinishedMessage) + c.PullImage(ctr, base, ref, platform, proxyCfg, &msg) + ret := <-msg + return ret.img, ret.err +} + +func (c *Client) pullImageGrpc(ns, base, ref, proxy string, ret *chan PullFinishedMessage) { + // connect to containerd + ctr, err := containerd.New(c.cfg.Containerd, containerd.WithDefaultNamespace(ns)) + if err != nil { + *ret <- PullFinishedMessage{nil, "", errors.Wrapf(err, "failed to connect to containerd")} + return + } + defer ctr.Close() + + // find base image + var baseImg containerd.Image + baseImg, err = c.FindBaseImage(ctr, base, ref) + if err != nil { + *ret <- PullFinishedMessage{nil, "", errors.Wrapf(err, "failed to identify base image")} + return + } + + // pull image + log.G(c.ctx).WithFields(logrus.Fields{ + "ref": ref, + }).Info("pulling image") + c.PullImage(ctr, baseImg, ref, platforms.DefaultString(), proxy, ret) +} + // PullImage pulls an image from a registry and stores it in the content store // it also stores the manager in memory. // In case there exists another manager in memory, it removes it and re-pull the image -func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, - ref, platform, proxyCfg string, ready *chan bool) (img containerd.Image, closedReady bool, err error) { +func (c *Client) PullImage( + ctr *containerd.Client, base containerd.Image, + ref, platform, proxyCfg string, ready *chan PullFinishedMessage, +) { + // init vars + is := ctr.ImageService() + closedReady := false + defer func() { + if !closedReady { + close(*ready) + } + }() + localCtx := context.Background() + // check local image - reqFilter := getImageFilter(ref) - img, err = c.findImage(ctr, reqFilter) + reqFilter := getImageFilter(ref, false) + img, err := c.findImage(ctr, reqFilter) if err != nil { - return nil, false, errors.Wrapf(err, "failed to check requested image %s", ref) + *ready <- PullFinishedMessage{nil, "", errors.Wrapf(err, "failed to check requested image %s", ref)} + return } + if img != nil { - return nil, false, fmt.Errorf("requested image %s already exists", ref) + labels := img.Labels() + if _, has := labels[util.ContentLabelCompletion]; has { + meta := img.Metadata() + *ready <- PullFinishedMessage{&meta, "", fmt.Errorf("requested image %s already exists", ref)} + return + } + log.G(c.ctx). + WithField("image", ref). + Info("requested image found but incomplete, remove and re-pull") + + // remove image + if err = is.Delete(localCtx, ref); err != nil { + log.G(c.ctx). + WithField("image", ref). + Info("failed to remove incomplete image") + *ready <- PullFinishedMessage{nil, "", errors.Wrapf(err, "failed to remove unfinished image %s", ref)} + return + } } // connect to proxy @@ -492,7 +563,8 @@ func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, // pull image body, mSize, cSize, sSize, md, sld, err := p.DeltaImage(baseRef, ref, platform) if err != nil { - return nil, false, errors.Wrapf(err, "failed to pull image %s", ref) + *ready <- PullFinishedMessage{nil, "", errors.Wrapf(err, "failed to pull image %s", ref)} + return } defer func() { if body != nil { @@ -514,9 +586,9 @@ func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, // 1. check manager in memory, if it exists, remove it and re-pull the image // (This behavior is different from LoadImage() which does not remove the manager in memory) c.managerMapLock.Lock() - release := false defer func() { - if !release { + if _, ok := c.managerMap[md]; !ok { + // something went wrong, the lock has not been released, unlock it c.managerMapLock.Unlock() } }() @@ -528,11 +600,11 @@ func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, delete(c.managerMap, md) } - var ( - buf *bytes.Buffer - - man, con []byte + // check if the image is in containerd's image pool + var ( + buf *bytes.Buffer + man, con []byte ctrImg images.Image manifest *v1.Manifest imageConfig *v1.Image @@ -545,51 +617,59 @@ func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, // manifest buf, err = c.readBody(body, mSize) if err != nil { - return nil, false, errors.Wrapf(err, "failed to read manifest") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to read manifest")} + return } manifest, man, err = c.handleManifest(buf) if err != nil { - return nil, false, errors.Wrapf(err, "failed to handle manifest") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to handle manifest")} + return } err = c.storeManifest(cs, pcn, md, ref, manifest.Config.Digest.String(), sld, man) if err != nil { - return nil, false, errors.Wrapf(err, "failed to store manifest") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to store manifest")} + return } // config buf, err = c.readBody(body, cSize) if err != nil { - return nil, false, errors.Wrapf(err, "failed to read config") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to read config")} + return } imageConfig, con, err = c.handleConfig(buf) if err != nil { - return nil, false, errors.Wrapf(err, "failed to handle config") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to handle config")} + return } err = c.storeConfig(cs, pcn, ref, manifest.Config.Digest, con) if err != nil { - return nil, false, errors.Wrapf(err, "failed to store config") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to store config")} + return } // starlight header buf, err = c.readBody(body, sSize) if err != nil { - return nil, false, errors.Wrapf(err, "failed to read starlight header") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to read starlight header")} + return } star, sta, err := c.handleStarlightHeader(buf) if err != nil { - return nil, false, errors.Wrapf(err, "failed to handle starlight header") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to handle starlight header")} + return } err = c.storeStarlightHeader(cs, pcn, ref, sld, sta) if err != nil { - return nil, false, errors.Wrapf(err, "failed to store starlight header") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to store starlight header")} + return } // create image mdd := digest.Digest(md) - is := ctr.ImageService() - ctrImg, err = is.Create(c.ctx, images.Image{ + ctrImg, err = is.Create(localCtx, images.Image{ Name: ref, Target: v1.Descriptor{ MediaType: util.ImageMediaTypeManifestV2, @@ -603,7 +683,14 @@ func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, CreatedAt: time.Now(), UpdatedAt: time.Now(), }) - log.G(c.ctx).WithField("image", ctrImg.Name).Debugf("created image") + if err != nil { + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to create image %s", ref)} + return + } + + log.G(c.ctx). + WithField("image", ctrImg.Name). + Debugf("created image") // 3. create manager // keep going and download layers @@ -614,7 +701,6 @@ func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, log.G(c.ctx). WithField("manifest", md). Info("client: added manager") - release = true c.managerMapLock.Unlock() // check optimizer @@ -624,12 +710,14 @@ func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, log.G(c.ctx). WithError(err). Error("failed to set optimizer on") - return nil, false, errors.Wrapf(err, "failed to enable optimizer") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to enable optimizer")} + return } } if err = star.PrepareDirectories(c); err != nil { - return nil, false, errors.Wrapf(err, "failed to initialize directories") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to initialize directories")} + return } // 4. update in-memory layer map @@ -640,7 +728,8 @@ func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, log.G(c.ctx). WithError(err). Error("failed to create snapshots") - return nil, false, errors.Wrapf(err, "failed to create snapshots") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to create snapshots")} + return } // ------------------------------------------------------------------------------------ @@ -648,7 +737,8 @@ func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, // // 5. Send signal // Image is ready (content is still on the way) - close(*ready) + // close(*ready) + *ready <- PullFinishedMessage{&ctrImg, baseRef, nil} closedReady = true // 6. Extract file content @@ -657,7 +747,8 @@ func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, WithField("m", md). Info("start decompressing content") if err = star.Extract(&body); err != nil { - return nil, true, errors.Wrapf(err, "failed to extract starlight image") + *ready <- PullFinishedMessage{nil, baseRef, errors.Wrapf(err, "failed to extract starlight image")} + return } log.G(c.ctx). WithField("m", md). @@ -669,13 +760,19 @@ func (c *Client) PullImage(ctr *containerd.Client, base containerd.Image, // mark image as completed ctrImg.Labels[util.ContentLabelCompletion] = t.Format(time.RFC3339) - if ctrImg, err = is.Update(c.ctx, ctrImg, "labels."+util.ContentLabelCompletion); err != nil { - return nil, true, errors.Wrapf(err, "failed to mark image as completed") + if ctrImg, err = is.Update(localCtx, ctrImg, "labels."+util.ContentLabelCompletion); err != nil { + log.G(c.ctx). + WithError(err). + Error("failed to mark image as completed") + return } // update garbage collection labels if err = c.updateManifest(ctr, md, chainIds, t); err != nil { - return nil, true, errors.Wrapf(err, "failed to update manifest") + log.G(c.ctx). + WithError(err). + Error("failed to update manifest") + return } return @@ -1017,7 +1114,7 @@ func (s *StarlightDaemonAPIServer) AddProxyProfile(ctx context.Context, req *pb. "protocol": req.Protocol, "address": req.Address, "username": req.Username, - }).Trace("grpc: add proxy profile") + }).Debug("grpc: add proxy profile") s.client.cfg.Proxies[req.ProfileName] = &ProxyConfig{ Protocol: req.Protocol, @@ -1044,84 +1141,44 @@ func (s *StarlightDaemonAPIServer) AddProxyProfile(ctx context.Context, req *pb. func (s *StarlightDaemonAPIServer) PullImage(ctx context.Context, ref *pb.ImageReference) (resp *pb.ImagePullResponse, err error) { log.G(s.client.ctx).WithFields(logrus.Fields{ - "base": ref.Base, - "ref": ref.Reference, - }).Trace("grpc: pull image") - - var ( - ctr *containerd.Client - base containerd.Image - ) + "base": ref.Base, + "ref": ref.Reference, + "socket": s.client.cfg.Containerd, + }).Debug("grpc: pull image") ns := ref.Namespace if ns == "" { ns = s.client.cfg.Namespace } - ctr, err = containerd.New(s.client.cfg.Containerd, containerd.WithDefaultNamespace(ns)) - if err != nil { - return &pb.ImagePullResponse{ - Success: false, - Message: err.Error(), - }, nil - } - defer ctr.Close() - base, err = s.client.FindBaseImage(ctr, ref.Base, ref.Reference) - if err != nil { - return &pb.ImagePullResponse{ - Success: false, - Message: err.Error(), - }, nil - } - - log.G(s.client.ctx).WithFields(logrus.Fields{ - "ref": ref.Reference, - }).Info("pulling image") + ready := make(chan PullFinishedMessage) + go s.client.pullImageGrpc(ns, ref.Base, ref.Reference, ref.ProxyConfig, &ready) + ret := <-ready - ready := make(chan bool) - go func() { - var ( - e error - closed bool - ) - defer func() { - if !closed { - close(ready) - } - }() - _, closed, e = s.client.PullImage( - ctr, - base, ref.Reference, - platforms.DefaultString(), ref.ProxyConfig, - &ready) - if e != nil { - log.G(s.client.ctx).WithError(e).Errorf("failed to pull image") + if ret.err != nil { + if ret.img != nil { + return &pb.ImagePullResponse{ + Success: true, + Message: ret.err.Error(), + BaseImage: ret.base, + }, nil + } else { + return &pb.ImagePullResponse{ + Success: false, + Message: ret.err.Error(), + BaseImage: ret.base, + }, nil } - }() - - <-ready - - baseImage := "" - if base != nil { - baseImage = fmt.Sprintf("%s@%s", base.Name(), base.Target().Digest) - } - - if err != nil { - return &pb.ImagePullResponse{ - Success: false, - Message: err.Error(), - BaseImage: baseImage, - }, nil } - return &pb.ImagePullResponse{Success: true, Message: "ok", BaseImage: baseImage}, nil + return &pb.ImagePullResponse{Success: true, Message: "ok", BaseImage: ret.base}, nil } func (s *StarlightDaemonAPIServer) SetOptimizer(ctx context.Context, req *pb.OptimizeRequest) (*pb.OptimizeResponse, error) { okRes, failRes := make(map[string]string), make(map[string]string) log.G(s.client.ctx).WithFields(logrus.Fields{ "enable": req.Enable, - }).Trace("grpc: set optimizer") + }).Debug("grpc: set optimizer") s.client.optimizerLock.Lock() defer s.client.optimizerLock.Unlock() @@ -1178,7 +1235,7 @@ func (s *StarlightDaemonAPIServer) SetOptimizer(ctx context.Context, req *pb.Opt func (s *StarlightDaemonAPIServer) ReportTraces(ctx context.Context, req *pb.ReportTracesRequest) (*pb.ReportTracesResponse, error) { log.G(s.client.ctx).WithFields(logrus.Fields{ "profile": req.ProxyConfig, - }).Trace("grpc: report") + }).Debug("grpc: report") tc, err := fs.NewTraceCollection(s.client.ctx, s.client.cfg.TracesDir) if err != nil { @@ -1205,7 +1262,7 @@ func (s *StarlightDaemonAPIServer) ReportTraces(ctx context.Context, req *pb.Rep func (s *StarlightDaemonAPIServer) NotifyProxy(ctx context.Context, req *pb.NotifyRequest) (*pb.NotifyResponse, error) { log.G(s.client.ctx).WithFields(logrus.Fields{ "profile": req.ProxyConfig, - }).Trace("grpc: notify") + }).Debug("grpc: notify") reference, err := name.ParseReference(req.Reference) if err != nil { @@ -1232,7 +1289,7 @@ func (s *StarlightDaemonAPIServer) NotifyProxy(ctx context.Context, req *pb.Noti func (s *StarlightDaemonAPIServer) PingTest(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) { log.G(s.client.ctx).WithFields(logrus.Fields{ "profile": req.ProxyConfig, - }).Trace("grpc: ping test") + }).Debug("grpc: ping test") rtt, proto, server, err := s.client.Ping(req.ProxyConfig) if err != nil { diff --git a/client/client_test.go b/client/client_test.go index ce09c0f..3bffbcb 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -22,25 +22,67 @@ import ( "testing" ) -func TestNewClient(t *testing.T) { - cfg, _, _, _ := LoadConfig("") +func TestImageFilter(t *testing.T) { + cfg, p, _, _ := LoadConfig("/sandbox/etc/starlight/starlight-daemon.json") + fmt.Println("config path: ", p) c, err := NewClient(context.Background(), cfg) if err != nil { t.Error(err) return } + fmt.Println("containerd: ", cfg.Containerd) + fmt.Println("namespace: ", cfg.Namespace) + client, err := containerd.New(cfg.Containerd, containerd.WithDefaultNamespace(cfg.Namespace)) + if err != nil { + t.Error(err) + return + } + imgFilterRef := "starlight-registry.default.svc.cluster.local:5000/starlight/redis:6.2.1" + img, err := c.findImage(client, getImageFilter(imgFilterRef, true)) + if err != nil { + t.Error(err) + return + } + if img != nil { + t.Error("image should be nil") + } + + img, err = c.findImage(client, getImageFilter(imgFilterRef, false)) + if err != nil { + t.Error(err) + return + } + if img == nil { + t.Error("image should not be nil") + } + +} +func TestClient_RemoveImage(t *testing.T) { + cfg, p, _, _ := LoadConfig("/sandbox/etc/starlight/starlight-daemon.json") + fmt.Println("config path: ", p) + c, err := NewClient(context.Background(), cfg) + if err != nil { + t.Error(err) + return + } + fmt.Println("containerd: ", cfg.Containerd) + fmt.Println("namespace: ", cfg.Namespace) client, err := containerd.New(cfg.Containerd, containerd.WithDefaultNamespace(cfg.Namespace)) if err != nil { t.Error(err) return } - img, err := c.findImage(client, getImageFilter("harbor.yuri.moe/public/redis:test2")) + imgFilterRef := "starlight-registry.default.svc.cluster.local:5000/starlight/redis:6.2.1" + + is := client.ImageService() + // remove image + err = is.Delete(c.ctx, imgFilterRef) if err != nil { t.Error(err) return } - t.Log(img) + } func TestClient_PullImageNotUpdate(t *testing.T) { @@ -59,12 +101,8 @@ func TestClient_PullImageNotUpdate(t *testing.T) { } operator := snapshotter.NewOperator(c.ctx, c, client.SnapshotService("starlight")) - ready := make(chan bool) - img, _, err := c.PullImage(client, nil, - "harbor.yuri.moe/starlight/redis:6.2.7", - "linux/amd64", - "", - &ready) + img, err := c.pullImageSync(client, nil, + "harbor.yuri.moe/starlight/redis:6.2.7", "linux/amd64", "") if err != nil { t.Error(err) return @@ -161,12 +199,10 @@ func TestClient_PullImageWithUpdate(t *testing.T) { return } - ready := make(chan bool) - img, _, err := c.PullImage(client, base, + img, err := c.pullImageSync(client, base, "harbor.yuri.moe/starlight/redis:7.0.5", "linux/amd64", - "", - &ready) + "") if err != nil { t.Error(err) return @@ -190,12 +226,8 @@ func TestClient_CreateImageService(t *testing.T) { //plt := platforms.Format(platforms.DefaultSpec()) //t.Log("pulling image", "platform", plt) - ready := make(chan bool) - img, _, err := c.PullImage(client, nil, - "starlight/redis:6.2.7", - "linux/amd64", - "", - &ready) + img, err := c.pullImageSync(client, nil, + "starlight/redis:6.2.7", "linux/amd64", "") if err != nil { t.Error(err) return diff --git a/demo/chart/templates/NOTES.txt b/demo/chart/templates/NOTES.txt index 40d92c8..2c373d4 100644 --- a/demo/chart/templates/NOTES.txt +++ b/demo/chart/templates/NOTES.txt @@ -78,6 +78,7 @@ ON THE WORKER NODE: EOF If using k3s, please create `/var/lib/rancher/k3s/agent/etc/containerd/config.toml.tmpl` and add the configuration. + You will need to change `containerd` socket address to `/run/k3s/containerd/containerd.sock` in `/etc/starlight/proxy.json`. 3. and restart the container