diff --git a/README.md b/README.md index f82afee..377e3f3 100644 --- a/README.md +++ b/README.md @@ -126,7 +126,7 @@ sudo ctr container create --snapshotter=starlight \ --mount type=bind,src=/tmp/test-redis-data,dst=/data,options=rbind:rw \ --env-file ./demo/config/all.env \ --net-host \ - harbor.yuri.moe/starlight/redis:6.2.7 \ + harbor.yuri.moe/x/redis:6.2.7 \ instance3 && \ sudo ctr task start instance3 ``` diff --git a/client/client.go b/client/client.go index 7414b94..fb1388d 100644 --- a/client/client.go +++ b/client/client.go @@ -26,6 +26,7 @@ import ( "github.com/mc256/starlight/client/snapshotter" "github.com/mc256/starlight/proxy" "github.com/mc256/starlight/util" + "github.com/mc256/starlight/util/common" "github.com/opencontainers/go-digest" v1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -70,9 +71,14 @@ type Client struct { operator *snapshotter.Operator plugin *snapshotter.Plugin + // layer layerMapLock sync.Mutex layerMap map[string]*mountPoint + // manager cache + managerMapLock sync.Mutex + managerMap map[string]*Manager + // Optimizer optimizerLock sync.Mutex defaultOptimizer bool @@ -185,7 +191,6 @@ func (c *Client) storeManifest(cfgName, d, ref, cfgd, sld string, man []byte) (e pd := digest.Digest(d) // create content store - err = content.WriteBlob( c.ctx, c.cs, pd.Hex(), bytes.NewReader(man), v1.Descriptor{Size: int64(len(man)), Digest: pd}, @@ -202,7 +207,7 @@ func (c *Client) storeManifest(cfgName, d, ref, cfgd, sld string, man []byte) (e return nil } -func (c *Client) updateManifest(d string) (err error) { +func (c *Client) updateManifest(d string, chainIds []digest.Digest) (err error) { pd := digest.Digest(d) cs := c.client.ContentStore() @@ -214,12 +219,19 @@ func (c *Client) updateManifest(d string) (err error) { } info.Labels[util.ContentLabelCompletion] = time.Now().Format(time.RFC3339) + + // garbage collection tags, more info: + // https://github.com/containerd/containerd/blob/83f44ddab5b17da74c5bd97dad7b2c5fa32871de/docs/garbage-collection.md + for idx, id := range chainIds { + info.Labels[fmt.Sprintf("%s/%d", util.ContentLabelSnapshotGC, idx)] = id.String() + } + info, err = cs.Update(c.ctx, info) if err != nil { return errors.Wrapf(err, "failed to mark manifest as completed") } - log.G(c.ctx).WithField("digest", info.Digest).Debug("download completed") + log.G(c.ctx).WithField("digest", info.Digest).Info("download completed") return nil } @@ -346,6 +358,9 @@ func (c *Client) UploadTraces(proxyCfg string, tc *fs.TraceCollection) error { return nil } +// PullImage pulls an image from a registry and stores it in the content store +// it also stores the manager in memory. +// In case there exists another manager in memory, it removes it and re-pull the image func (c *Client) PullImage(base containerd.Image, ref, platform, proxyCfg string, ready *chan bool) (img containerd.Image, err error) { // check local image reqFilter := getImageFilter(ref) @@ -384,6 +399,21 @@ func (c *Client) PullImage(base containerd.Image, ref, platform, proxyCfg string WithField("sl_digest", sld). Infof("pulling image %s", ref) + c.managerMapLock.Lock() + release := false + defer func() { + if !release { + c.managerMapLock.Unlock() + } + }() + if _, ok := c.managerMap[md]; ok { + log.G(c.ctx). + WithField("manifest", mSize). + WithField("sl_digest", sld). + Warn("found in cache. remove and re-pull") + delete(c.managerMap, md) + } + var ( buf *bytes.Buffer @@ -458,8 +488,16 @@ func (c *Client) PullImage(base containerd.Image, ref, platform, proxyCfg string log.G(c.ctx).WithField("image", ctrImg.Name).Debugf("created image") // keep going and download layers - star.Init(c.cfg, false, manifest, imageConfig, mdd) + star.Init(c.ctx, c.cfg, false, manifest, imageConfig, mdd) + // create manager + c.managerMap[md] = star + log.G(c.ctx).WithField("manifest", md).Debugf("client: added manager") + release = true + c.managerMapLock.Unlock() + + // check optimizer + // we should set optimizer before creating the filesystems if c.defaultOptimizer { if err = star.SetOptimizerOn(c.defaultOptimizeGroup); err != nil { return nil, errors.Wrapf(err, "failed to enable optimizer") @@ -470,7 +508,10 @@ func (c *Client) PullImage(base containerd.Image, ref, platform, proxyCfg string return nil, errors.Wrapf(err, "failed to initialize directories") } - if err = star.CreateSnapshots(c); err != nil { + // Create Snapshots + // should unlock the managerMapLock before calling CreateSnapshot + var chainIds []digest.Digest + if chainIds, err = star.CreateSnapshots(c); err != nil { return nil, errors.Wrapf(err, "failed to create snapshots") } @@ -483,13 +524,18 @@ func (c *Client) PullImage(base containerd.Image, ref, platform, proxyCfg string } // mark as completed - if err = c.updateManifest(md); err != nil { + // update garbage collection labels + if err = c.updateManifest(md, chainIds); err != nil { return nil, errors.Wrapf(err, "failed to update manifest") } return } +// LoadImage loads image manifest from content store to the memory, +// if it is in memory, return manager directly. +// +// This method should not use any snapshotter methods to avoid recursive lock. func (c *Client) LoadImage(manifest digest.Digest) (manager *Manager, err error) { var ( @@ -500,6 +546,15 @@ func (c *Client) LoadImage(manifest digest.Digest) (manager *Manager, err error) star Manager ) + // manager cache + c.managerMapLock.Lock() + defer c.managerMapLock.Unlock() + + if m, ok := c.managerMap[manifest.String()]; ok { + return m, nil + } + + // no cache, load from store cs := c.client.ContentStore() ii, err = cs.Info(c.ctx, manifest) if err != nil { @@ -507,7 +562,8 @@ func (c *Client) LoadImage(manifest digest.Digest) (manager *Manager, err error) } if len(ii.Labels[util.ContentLabelCompletion]) == 0 { - return nil, errors.New("image is incomplete, remove and pull again") + log.G(c.ctx).WithField("d", manifest.String()).Warn("using incomplete image") + //return nil, errors.Errorf("incomplete image, remove and repull") } starlight := digest.Digest(ii.Labels[fmt.Sprintf("%s.starlight", util.ContentLabelContainerdGC)]) @@ -533,19 +589,59 @@ func (c *Client) LoadImage(manifest digest.Digest) (manager *Manager, err error) return nil, err } - star.Init(c.cfg, true, man, cfg, manifest) + star.Init(c.ctx, c.cfg, true, man, cfg, manifest) + + // save to cache + c.managerMap[manifest.String()] = &star + log.G(c.ctx).WithField("manifest", manifest.String()).Debugf("client: added manager") + + // update layerMap + c.layerMapLock.Lock() + defer c.layerMapLock.Unlock() + + for idx, serial := range star.stackSerialMap { + layer := star.layers[serial] + if mp, has := c.layerMap[layer.Hash]; has { + if mp.manager == nil { + mp.manager = &star + mp.stack = int64(idx) + } + } else { + c.layerMap[layer.Hash] = &mountPoint{ + fs: nil, + manager: &star, + stack: int64(idx), + snapshots: make(map[string]*snapshots.Info), + } + } + } return &star, nil } func (c *Client) Close() { + // containerd client _ = c.client.Close() + + // snapshotter server if c.snServer != nil { c.snServer.Stop() } + // CLI server if c.cliServer != nil { c.cliServer.Stop() } + + // filesystems + c.managerMapLock.Lock() + defer c.managerMapLock.Unlock() + for _, mg := range c.managerMap { + log.G(c.ctx). + WithField("manager", mg.manifestDigest.String()). + Debugf("client: closing manager") + mg.Teardown() + } + os.Exit(1) } @@ -585,60 +681,61 @@ func (c *Client) getStarlightFS(ssId string) string { return filepath.Join(c.GetFilesystemPath(ssId), "slfs") } +func (c *Client) PrepareManager(manifest digest.Digest) (err error) { + _, err = c.LoadImage(manifest) + return +} + // Mount returns the mountpoint for the given snapshot // - md: manifest digest // - ld: uncompressed layer digest // - ssId: snapshot id -func (c *Client) Mount(md, ld, ssId string, sn *snapshots.Info) (mnt string, err error) { +func (c *Client) Mount(ld digest.Digest, ssId string, sn *snapshots.Info) (mnt string, err error) { c.layerMapLock.Lock() defer c.layerMapLock.Unlock() - if mp, has := c.layerMap[ld]; has { + if mp, has := c.layerMap[ld.String()]; has { // fs != nil, fs has already created if mp.fs != nil { mp.snapshots[sn.Name] = sn + log.G(c.ctx). + WithField("s", ssId). + WithField("m", mp.manager.manifestDigest.String()). + Debugf("mount: found fs") return mp.fs.GetMountPoint(), nil } // manager != nil but fs == nil // manager has been created but not yet mounted if mp.manager != nil { + // create mounting point mnt = filepath.Join(c.GetMountingPoint(ssId), "slfs") mp.fs, err = mp.manager.NewStarlightFS(mnt, mp.stack, &fusefs.Options{}, false) if err != nil { return "", errors.Wrapf(err, "failed to mount filesystem") } + + // create filesystem and serve go mp.fs.Serve() mp.snapshots[sn.Name] = sn + log.G(c.ctx). + WithField("s", ssId). + WithField("m", mp.manager.manifestDigest.String()). + Debugf("mount: found manager") return mnt, nil } - } - // create a new filesystem - var man *Manager - man, err = c.LoadImage(digest.Digest(md)) - if err != nil { - return "", errors.Wrapf(err, "failed to load image manager") - } + log.G(c.ctx). + WithField("s", ssId). + Warn("mount: no manager found") - // mount manager - for idx, layer := range man.Destination.Layers { - c.layerMap[layer.Hash] = &mountPoint{ - fs: nil, - manager: man, - stack: int64(idx), - snapshots: map[string]*snapshots.Info{sn.Name: sn}, - } - } + return "", common.ErrNoManager + } else { + // if it does not exist, + // it means the layer is neither in the process of downloading nor extracted - mnt = filepath.Join(c.GetMountingPoint(ssId), "slfs") - current := c.layerMap[ld] - current.fs, err = current.manager.NewStarlightFS(mnt, current.stack, &fusefs.Options{}, false) - if err != nil { - return "", errors.Wrapf(err, "failed to mount filesystem") + return "", errors.New("layer incomplete or not found. remove and pull again") } - go current.fs.Serve() - current.snapshots[sn.Name] = sn - return mnt, nil + } func (c *Client) Unmount(cd, sn string) error { @@ -667,6 +764,11 @@ func (c *Client) Unmount(cd, sn string) error { return nil } + log.G(c.ctx). + WithField("d", cd). + WithField("mnt", layer.fs.GetMountPoint()). + Debug("fs: unmount") + if err := layer.fs.Teardown(); err != nil { return err } @@ -995,7 +1097,8 @@ func NewClient(ctx context.Context, cfg *Configuration) (c *Client, err error) { cfg: cfg, client: nil, - layerMap: make(map[string]*mountPoint), + layerMap: make(map[string]*mountPoint), + managerMap: make(map[string]*Manager), } // containerd client diff --git a/client/fs/fs.go b/client/fs/fs.go index 9edd6aa..42be8cd 100644 --- a/client/fs/fs.go +++ b/client/fs/fs.go @@ -174,6 +174,7 @@ func (n *StarlightFsNode) Open(ctx context.Context, flags uint32) (fs.FileHandle log.G(ctx).WithFields(logrus.Fields{ "f": name, + "_s": n.instance.stack, "_r": r, }).Trace("open") diff --git a/client/fs/instance.go b/client/fs/instance.go index 6161a46..b219d0b 100644 --- a/client/fs/instance.go +++ b/client/fs/instance.go @@ -34,6 +34,7 @@ func (fi *Instance) GetMountPoint() string { return fi.mountPoint } func (fi *Instance) GetServer() *fuse.Server { return fi.server } // Teardown unmounts the file system and close the logging file if there is one writing +// should you need this function, please consider using Manager.Teardown instead. func (fi *Instance) Teardown() error { return fi.GetServer().Unmount() } diff --git a/client/fs/tracer.go b/client/fs/tracer.go index 751251f..13df069 100644 --- a/client/fs/tracer.go +++ b/client/fs/tracer.go @@ -70,6 +70,8 @@ func (bo ByAccessTimeOptimized) Swap(i, j int) { } type Tracer struct { + ctx context.Context + // label could be the name of the application or the workload. // Different workload might have OptimizeGroup string `json:"group"` @@ -104,10 +106,16 @@ func (t *Tracer) Close() error { b, _ := json.Marshal(t) _, _ = t.fh.Write(b) + log.G(t.ctx).WithFields(logrus.Fields{ + "optimizeGroup": t.OptimizeGroup, + "digest": t.Image, + "logPath": t.logPath, + }).Info("tracer: stopped") + return t.fh.Close() } -func NewTracer(optimizeGroup, digest, outputDir string) (*Tracer, error) { +func NewTracer(ctx context.Context, optimizeGroup, digest, outputDir string) (*Tracer, error) { err := os.MkdirAll(outputDir, 0775) if err != nil { return nil, err @@ -119,7 +127,14 @@ func NewTracer(optimizeGroup, digest, outputDir string) (*Tracer, error) { return nil, err } + log.G(ctx).WithFields(logrus.Fields{ + "optimizeGroup": optimizeGroup, + "digest": digest, + "logPath": logPath, + }).Info("tracer: started") + return &Tracer{ + ctx: ctx, OptimizeGroup: optimizeGroup, StartTime: time.Now(), Image: digest, diff --git a/client/manager.go b/client/manager.go index b007066..901ca4a 100644 --- a/client/manager.go +++ b/client/manager.go @@ -8,8 +8,10 @@ package client import ( "bytes" "compress/gzip" + "context" "encoding/json" "fmt" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/snapshots" fusefs "github.com/hanwen/go-fuse/v2/fs" "github.com/mc256/starlight/client/fs" @@ -30,10 +32,11 @@ import ( // Manager should be unmarshalled from a json file and then Populate() should be called to populate other fields type Manager struct { receive.DeltaBundle + ctx context.Context // non-exported fields - compressLayerDigest []digest.Digest - diffDigest []digest.Digest + // compressLayerDigest []digest.Digest + // diffDigest []digest.Digest cfg *Configuration @@ -196,10 +199,13 @@ func (m *Manager) PrepareDirectories(c *Client) error { return nil } -func (m *Manager) CreateSnapshots(c *Client) (err error) { +// CreateSnapshots for chainIds +// should unlock the managerMapLock before calling CreateSnapshot +func (m *Manager) CreateSnapshots(c *Client) (chainIds []digest.Digest, err error) { diffs := m.imageConfig.RootFS.DiffIDs - chainIds := identity.ChainIDs(diffs) + chainIds = identity.ChainIDs(diffs) prev := "" + for idx, chain := range chainIds { d := m.layers[m.stackSerialMap[idx]].Hash @@ -223,21 +229,25 @@ func (m *Manager) CreateSnapshots(c *Client) (err error) { chain.String(), prev, m.manifestDigest.String(), d, int64(idx), ) if err != nil { - return errors.Wrapf(err, "failed prepare new image snapshots %s", chain.String()) + return nil, errors.Wrapf(err, "failed prepare new image snapshots %s", chain.String()) } prev = chain.String() } - return nil + return chainIds, nil } // Init populates the manager with the necessary information and data structures. // Use json.Unmarshal to unmarshal the json file from data storage into a Manager struct. -// - ready: if set to false, we will then use Extract() to get the content of the file -// - cfg: configuration of the client -// - image, manifest, imageConfig: information about the image (maybe we don't need this) -func (m *Manager) Init(cfg *Configuration, ready bool, +// +// - ready: if set to false, we will then use Extract() to get the content of the file +// - cfg: configuration of the client +// - image, manifest, imageConfig: information about the image (maybe we don't need this) +// +// do not change any outside state, only the manager itself +func (m *Manager) Init(ctx context.Context, cfg *Configuration, ready bool, manifest *v1.Manifest, imageConfig *v1.Image, manifestDigest digest.Digest) { // init variables + m.ctx = ctx m.cfg = cfg m.stackSerialMap = make([]int64, 0, len(m.Destination.Layers)) m.layers = make(map[int64]*receive.ImageLayer) @@ -321,13 +331,15 @@ func (m *Manager) Init(cfg *Configuration, ready bool, func (m *Manager) SetOptimizerOn(optimizeGroup string) (err error) { if m.tracer == nil { - m.tracer, err = fs.NewTracer(optimizeGroup, m.manifestDigest.String(), m.cfg.TracesDir) + log.G(m.ctx).Debug("manager: start tracer") + m.tracer, err = fs.NewTracer(m.ctx, optimizeGroup, m.manifestDigest.String(), m.cfg.TracesDir) } return } func (m *Manager) SetOptimizerOff() (err error) { if m.tracer != nil { + log.G(m.ctx).Debug("manager: stop tracer") err = m.tracer.Close() m.tracer = nil } @@ -336,9 +348,11 @@ func (m *Manager) SetOptimizerOff() (err error) { func (m *Manager) Teardown() { if m.tracer != nil { + log.G(m.ctx).Debug("manager: stop tracer") _ = m.tracer.Close() } for _, v := range m.fs { + log.G(m.ctx).WithField("mnt", v.GetMountPoint()).Debug("manager: unmounting filesystem") _ = v.Teardown() } } diff --git a/client/manager_test.go b/client/manager_test.go index fb0e4ea..f3598a4 100644 --- a/client/manager_test.go +++ b/client/manager_test.go @@ -22,6 +22,18 @@ import ( "time" ) +var ( + ctx = context.Background() +) + +func TestMain(m *testing.M) { + // setup code here + ctx = context.Background() + code := m.Run() + // teardown code here + os.Exit(code) +} + func TestManager_Extract(t *testing.T) { cfg, _, _, _ := LoadConfig("/root/daemon.json") @@ -51,7 +63,7 @@ func TestManager_Extract(t *testing.T) { // keep going and download layers md := digest.Digest("sha256:50a0f37293a4d0880a49e0c41dd71e1d556d06d8fa6c8716afc467b1c7c52965") - m.Init(cfg, true, nil, nil, md) + m.Init(ctx, cfg, true, nil, nil, md) err = m.Extract(&rc) if err != nil { @@ -86,7 +98,7 @@ func TestManager_Init(t *testing.T) { */ // keep going and download layers md := digest.Digest("sha256:50a0f37293a4d0880a49e0c41dd71e1d556d06d8fa6c8716afc467b1c7c52965") - m.Init(cfg, true, nil, nil, md) + m.Init(ctx, cfg, true, nil, nil, md) } @@ -109,7 +121,7 @@ func TestManager_NewStarlightFS(t *testing.T) { // keep going and download layers md := digest.Digest("sha256:50a0f37293a4d0880a49e0c41dd71e1d556d06d8fa6c8716afc467b1c7c52965") - m.Init(cfg, true, nil, nil, md) + m.Init(ctx, cfg, true, nil, nil, md) opt := fusefs.Options{} stack := int64(2) f, err := m.NewStarlightFS("/opt/test", stack, &opt, true) @@ -144,7 +156,7 @@ func TestManager_NewStarlightFSMultiple(t *testing.T) { // keep going and download layers md := digest.Digest("sha256:50a0f37293a4d0880a49e0c41dd71e1d556d06d8fa6c8716afc467b1c7c52965") - m.Init(cfg, true, nil, nil, md) + m.Init(ctx, cfg, true, nil, nil, md) opt := fusefs.Options{} fss := make([]*fs.Instance, 0) @@ -191,7 +203,7 @@ func TestManager_NewStarlightFSMultiple2(t *testing.T) { // keep going and download layers md := digest.Digest("sha256:50a0f37293a4d0880a49e0c41dd71e1d556d06d8fa6c8716afc467b1c7c52965") - m.Init(cfg, true, nil, nil, md) + m.Init(ctx, cfg, true, nil, nil, md) opt := fusefs.Options{} fss := make([]*fs.Instance, 0) @@ -238,8 +250,8 @@ func TestManager_NewStarlightFSMultiple3(t *testing.T) { // keep going and download layers md := digest.Digest("sha256:50a0f37293a4d0880a49e0c41dd71e1d556d06d8fa6c8716afc467b1c7c52965") - m.Init(cfg, true, nil, nil, md) - err = m.SetOptimizerOn("default", "sha256:50a0f37293a4d0880a49e0c41dd71e1d556d06d8fa6c8716afc467b1c7c52965") + m.Init(ctx, cfg, true, nil, nil, md) + err = m.SetOptimizerOn("default") if err != nil { t.Error(err) return @@ -326,8 +338,8 @@ func TestManager_NewStarlightSnapshotterTest(t *testing.T) { // keep going and download layers md := digest.Digest("sha256:50a0f37293a4d0880a49e0c41dd71e1d556d06d8fa6c8716afc467b1c7c52965") - m.Init(cfg, false, manifest, configFile, md) - _ = m.CreateSnapshots(cc) + m.Init(ctx, cfg, false, manifest, configFile, md) + _, _ = m.CreateSnapshots(cc) } diff --git a/client/snapshotter/operator.go b/client/snapshotter/operator.go index 3a18331..d78e866 100644 --- a/client/snapshotter/operator.go +++ b/client/snapshotter/operator.go @@ -96,7 +96,7 @@ func (op *Operator) ScanExistingFilesystems() { h := fmt.Sprintf("sha256:%s%s%s%s", d1.Name(), d2.Name(), d3.Name(), d4.Name(), ) - completeFile := filepath.Join(d, "complete.json") + completeFile := filepath.Join(d, "completed.json") if _, err = os.Stat(completeFile); err != nil { _ = os.RemoveAll(filepath.Join(op.client.GetFilesystemRoot(), "layers", d1.Name(), d2.Name(), d3.Name(), d4.Name(), diff --git a/client/snapshotter/plugin.go b/client/snapshotter/plugin.go index 3b98ae7..b34045d 100644 --- a/client/snapshotter/plugin.go +++ b/client/snapshotter/plugin.go @@ -14,6 +14,9 @@ import ( "github.com/containerd/containerd/snapshots/storage" "github.com/containerd/continuity/fs" "github.com/mc256/starlight/util" + "github.com/mc256/starlight/util/common" + "github.com/opencontainers/go-digest" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "os" "path/filepath" @@ -31,11 +34,17 @@ type PluginClient interface { // getUpper, getWork, getStarlightFS functions instead of using this function directly GetMountingPoint(ssId string) string + // PrepareManager inform the client to load specified manager in memory. + // It requires the manifest, Starlight Metadatain and image config is present in containerd's content storage. + // In case the above requirements are not met, the client should return an error. + // The plugin should then try the next referenced manager (manifest digest). + PrepareManager(manifest digest.Digest) (err error) + // Unmount starlightfs Unmount(compressDigest, key string) error // Mount starlightfs - Mount(md, ld, ssId string, sn *snapshots.Info) (mnt string, err error) + Mount(layerDigest digest.Digest, snapshotId string, sn *snapshots.Info) (mnt string, err error) } type Plugin struct { @@ -93,7 +102,7 @@ func (s *Plugin) Stat(ctx context.Context, key string) (snapshots.Info, error) { log.G(s.ctx).WithFields(logrus.Fields{ "name": info.Name, - }).Info("stat") + }).Debug("stat") return info, nil } @@ -116,7 +125,7 @@ func (s *Plugin) Update(ctx context.Context, info snapshots.Info, fieldpaths ... log.G(s.ctx).WithFields(logrus.Fields{ "name": info.Name, "usage": fieldpaths, - }).Info("updated") + }).Debug("updated") return info, nil } @@ -142,7 +151,7 @@ func (s *Plugin) Usage(ctx context.Context, key string) (snapshots.Usage, error) log.G(s.ctx).WithFields(logrus.Fields{ "key": key, "usage": usage, - }).Info("usage") + }).Debug("usage") return usage, nil } @@ -167,7 +176,7 @@ func (s *Plugin) Mounts(ctx context.Context, key string) ([]mount.Mount, error) log.G(ctx).WithFields(logrus.Fields{ "key": key, "mnt": mnt, - }).Info("mount") + }).Debug("mount") return mnt, nil } @@ -181,6 +190,12 @@ func (s *Plugin) View(ctx context.Context, key, parent string, opts ...snapshots func (s *Plugin) newSnapshot(ctx context.Context, key, parent string, readonly bool, opts ...snapshots.Opt) ([]mount.Mount, error) { + log.G(s.ctx).WithFields(logrus.Fields{ + "key": key, + "parent": parent, + "_readonly": readonly, + }).Debug("prepare") + c, t, err := s.ms.TransactionContext(ctx, true) if err != nil { return nil, err @@ -228,6 +243,7 @@ func (s *Plugin) newSnapshot(ctx context.Context, key, parent string, readonly b // mount snapshot mnt, err := s.mounts(c, ss.ID, &inf) if err != nil { + log.G(s.ctx).WithError(err).Error("mount failed") return nil, err } @@ -237,13 +253,12 @@ func (s *Plugin) newSnapshot(ctx context.Context, key, parent string, readonly b } log.G(s.ctx).WithFields(logrus.Fields{ - "key": key, - "parent": parent, - "readonly": readonly, - "id": ss.ID, - "starlight": usingSL, - "mnt": mnt, - }).Info("prepared") + "key": key, + "parent": parent, + "_readonly": readonly, + "id": ss.ID, + "_starlight": usingSL, + }).Debug("prepared") return mnt, nil } @@ -310,7 +325,7 @@ func (s *Plugin) Commit(ctx context.Context, name, key string, opts ...snapshots "name": name, "key": key, "usage": usage, - }).Info("committed") + }).Debug("committed") return t.Commit() } @@ -368,11 +383,10 @@ func (s *Plugin) Remove(ctx context.Context, key string) (err error) { // log log.G(s.ctx).WithFields(logrus.Fields{ - "key": key, - "id": info.Name, - "parents": info.Parent, - "starlight": usingSL, - }).Info("remove") + "id": info.Name, + "parents": info.Parent, + "_starlight": usingSL, + }).Debug("remove") return nil } @@ -433,8 +447,10 @@ func (si SnapshotItem) IsStarlightFS() bool { return ok && und != "" } -func (si SnapshotItem) GetStarlightFeature() (md, und string, stack int64) { - return si.inf.Labels[util.SnapshotLabelRefImage], si.inf.Labels[util.SnapshotLabelRefUncompressed], stack +// GetStarlightFeature returns manifest digest, uncompressed digest, and stack number +func (si SnapshotItem) GetStarlightFeature() (md, und digest.Digest, stack int64) { + return digest.Digest(si.inf.Labels[util.SnapshotLabelRefImage]), + digest.Digest(si.inf.Labels[util.SnapshotLabelRefUncompressed]), stack } func (s *Plugin) getFsStack(ctx context.Context, cur *snapshots.Info) (pSi []*SnapshotItem, err error) { @@ -455,7 +471,9 @@ func (s *Plugin) getFsStack(ctx context.Context, cur *snapshots.Info) (pSi []*Sn } func (s *Plugin) mounts(ctx context.Context, ssId string, inf *snapshots.Info) (mnt []mount.Mount, err error) { - stack, err := s.getFsStack(ctx, inf) // from upper to lower + stack, err := s.getFsStack(ctx, inf) + // from upper to lower, not include current layer + if err != nil { return nil, err } @@ -469,7 +487,10 @@ func (s *Plugin) mounts(ctx context.Context, ssId string, inf *snapshots.Info) ( var m string if current.IsStarlightFS() { md, und, _ := current.GetStarlightFeature() - m, err = s.client.Mount(md, und, ssId, inf) + if err = s.client.PrepareManager(md); err != nil { + return nil, err + } + m, err = s.client.Mount(und, ssId, inf) if err != nil { return nil, err } @@ -485,13 +506,55 @@ func (s *Plugin) mounts(ctx context.Context, ssId string, inf *snapshots.Info) ( } } + // Looking for manifest digest + // manifest digest should be determined by the top layer of the lower dirs. + // it is rare that an image's upper layer are reusing other images layer but we cannot avoid this case. + // so: + // 1. all the referenced manifest will be checked and loaded. + // 2. if the referenced manifest does not exists in the content store, labels will be updated + mdsm := map[string]bool{} + mdsl := make([]digest.Digest, 0) + for _, si := range stack { + md, _, _ := si.GetStarlightFeature() + if _, has := mdsm[md.String()]; !has { + mdsm[md.String()] = true + mdsl = append(mdsl, md) + } + } + mdsidx := 0 + + log.G(s.ctx). + WithField("managers", mdsl). + Debug("mount: prepare manager") + lower := make([]string, 0) for i := len(stack) - 1; i >= 0; i-- { var m string if stack[i].IsStarlightFS() { // starlight layer - md, und, _ := stack[i].GetStarlightFeature() - m, err = s.client.Mount(md, und, stack[i].ssId, &stack[i].inf) + _, und, _ := stack[i].GetStarlightFeature() + for { + m, err = s.client.Mount(und, stack[i].ssId, &stack[i].inf) + if err == nil { + break + } else if err == common.ErrNoManager { + // if there is no manager for the layer, prepare a new one + // prepare the next available manager + for { + if mdsidx >= len(mdsl) { + return nil, errors.Wrapf(err, "no manager for %s", und) + } + err = s.client.PrepareManager(mdsl[mdsidx]) + if err == nil { + break + } + mdsidx += 1 + } + } else { + return nil, err + } + } + if err != nil { return nil, err } @@ -508,18 +571,6 @@ func (s *Plugin) mounts(ctx context.Context, ssId string, inf *snapshots.Info) ( fmt.Sprintf("workdir=%s", s.getWork(ssId)), fmt.Sprintf("upperdir=%s", s.getUpper(ssId)), ) - } else { - var m string - if current.IsStarlightFS() { - md, und, _ := current.GetStarlightFeature() - m, err = s.client.Mount(und, md, ssId, inf) - if err != nil { - return nil, err - } - } else { - m = s.getUpper(ssId) - } - lower = append(lower, m) } options = append(options, fmt.Sprintf("lowerdir=%s", strings.Join(lower, ":"))) diff --git a/cmd/ctr-starlight/pull/pull.go b/cmd/ctr-starlight/pull/pull.go index b5bbd1f..324f961 100644 --- a/cmd/ctr-starlight/pull/pull.go +++ b/cmd/ctr-starlight/pull/pull.go @@ -17,7 +17,7 @@ import ( ) func pullImage(client pb.DaemonClient, ref *pb.ImageReference, quiet bool) { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*30) defer cancel() resp, err := client.PullImage(ctx, ref) if err != nil { diff --git a/cmd/starlight-daemon/main.go b/cmd/starlight-daemon/main.go index 094f546..818151f 100644 --- a/cmd/starlight-daemon/main.go +++ b/cmd/starlight-daemon/main.go @@ -231,8 +231,8 @@ func DefaultAction(context *cli.Context, cfg *client.Configuration) (err error) signal.Notify(si, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) go func() { <-si - close(wait) slc.Close() + close(wait) }() <-wait return nil diff --git a/util/common/errors.go b/util/common/errors.go index dfbc298..3e2fde2 100644 --- a/util/common/errors.go +++ b/util/common/errors.go @@ -25,20 +25,18 @@ import ( ) var ( - ErrImageMediaType = errors.New("unknown image type") - ErrImagePlatform = errors.New("found manifest but platform does not match") + ErrNotImplemented = errors.New("this feature has not yet been implemented") ErrLayerNotFound = errors.New("cannot find layer") ErrMountingPointNotFound = errors.New("cannot find mounting point") ErrNotConsolidated = errors.New("delta image has not yet been consolidated") ErrAlreadyConsolidated = errors.New("delta image has been consolidated already") ErrHashCollision = errors.New("found two files have the same hash but different size") - ErrNotImplemented = errors.New("this feature has not yet been implemented") ErrMergedImageNotFound = errors.New("the requested image has not been merged") ErrWrongImageFormat = errors.New("please use this format :") ErrOrphanNode = errors.New("an entry node has no parent") ErrNoRoPath = errors.New("entry does not have path to RO layer") ErrImageNotFound = errors.New("cannot find image") - ErrUnknownManifest = errors.New("starlight proxy cannot find the image manifest") + ErrNoManager = errors.New("no manager found") ErrUnknownSnapshotParameter = errors.New("snapshots should follow a standard format") ErrTocUnknown = errors.New("please prefetch the delta image") ) diff --git a/util/config.go b/util/config.go index 6787f01..c19b7ba 100644 --- a/util/config.go +++ b/util/config.go @@ -48,6 +48,7 @@ const ( ContentLabelStarlightMediaType = "mediaType.starlight.mc256.dev" // ContentLabelContainerdGC prevents containerd from removing the content ContentLabelContainerdGC = "containerd.io/gc.ref.content" + ContentLabelSnapshotGC = "containerd.io/gc.ref.snapshot.starlight" ContentLabelCompletion = "complete.starlight.mc256.dev" // --------------------------------------------------------------------------------- diff --git a/util/receive/image.go b/util/receive/image.go index 2df6663..54ad1e1 100644 --- a/util/receive/image.go +++ b/util/receive/image.go @@ -18,9 +18,11 @@ import ( ) type ImageLayer struct { - Size int64 `json:"s"` - Serial int64 `json:"f"` - Hash string `json:"h"` + Size int64 `json:"s"` + Serial int64 `json:"f"` + + // Hash is the digest of the compressed layer + Hash string `json:"h"` // path to the local storage Local string