Skip to content

Commit

Permalink
fix: filesystem mounting and restart
Browse files Browse the repository at this point in the history
  • Loading branch information
mc256 committed Nov 12, 2022
1 parent 9acf9be commit 5222c4f
Show file tree
Hide file tree
Showing 14 changed files with 301 additions and 103 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ sudo ctr container create --snapshotter=starlight \
--mount type=bind,src=/tmp/test-redis-data,dst=/data,options=rbind:rw \
--env-file ./demo/config/all.env \
--net-host \
harbor.yuri.moe/starlight/redis:6.2.7 \
harbor.yuri.moe/x/redis:6.2.7 \
instance3 && \
sudo ctr task start instance3
```
Expand Down
173 changes: 138 additions & 35 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/mc256/starlight/client/snapshotter"
"github.com/mc256/starlight/proxy"
"github.com/mc256/starlight/util"
"github.com/mc256/starlight/util/common"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -70,9 +71,14 @@ type Client struct {
operator *snapshotter.Operator
plugin *snapshotter.Plugin

// layer
layerMapLock sync.Mutex
layerMap map[string]*mountPoint

// manager cache
managerMapLock sync.Mutex
managerMap map[string]*Manager

// Optimizer
optimizerLock sync.Mutex
defaultOptimizer bool
Expand Down Expand Up @@ -185,7 +191,6 @@ func (c *Client) storeManifest(cfgName, d, ref, cfgd, sld string, man []byte) (e
pd := digest.Digest(d)

// create content store

err = content.WriteBlob(
c.ctx, c.cs, pd.Hex(), bytes.NewReader(man),
v1.Descriptor{Size: int64(len(man)), Digest: pd},
Expand All @@ -202,7 +207,7 @@ func (c *Client) storeManifest(cfgName, d, ref, cfgd, sld string, man []byte) (e
return nil
}

func (c *Client) updateManifest(d string) (err error) {
func (c *Client) updateManifest(d string, chainIds []digest.Digest) (err error) {
pd := digest.Digest(d)
cs := c.client.ContentStore()

Expand All @@ -214,12 +219,19 @@ func (c *Client) updateManifest(d string) (err error) {
}

info.Labels[util.ContentLabelCompletion] = time.Now().Format(time.RFC3339)

// garbage collection tags, more info:
// https://github.com/containerd/containerd/blob/83f44ddab5b17da74c5bd97dad7b2c5fa32871de/docs/garbage-collection.md
for idx, id := range chainIds {
info.Labels[fmt.Sprintf("%s/%d", util.ContentLabelSnapshotGC, idx)] = id.String()
}

info, err = cs.Update(c.ctx, info)

if err != nil {
return errors.Wrapf(err, "failed to mark manifest as completed")
}
log.G(c.ctx).WithField("digest", info.Digest).Debug("download completed")
log.G(c.ctx).WithField("digest", info.Digest).Info("download completed")
return nil
}

Expand Down Expand Up @@ -346,6 +358,9 @@ func (c *Client) UploadTraces(proxyCfg string, tc *fs.TraceCollection) error {
return nil
}

// PullImage pulls an image from a registry and stores it in the content store
// it also stores the manager in memory.
// In case there exists another manager in memory, it removes it and re-pull the image
func (c *Client) PullImage(base containerd.Image, ref, platform, proxyCfg string, ready *chan bool) (img containerd.Image, err error) {
// check local image
reqFilter := getImageFilter(ref)
Expand Down Expand Up @@ -384,6 +399,21 @@ func (c *Client) PullImage(base containerd.Image, ref, platform, proxyCfg string
WithField("sl_digest", sld).
Infof("pulling image %s", ref)

c.managerMapLock.Lock()
release := false
defer func() {
if !release {
c.managerMapLock.Unlock()
}
}()
if _, ok := c.managerMap[md]; ok {
log.G(c.ctx).
WithField("manifest", mSize).
WithField("sl_digest", sld).
Warn("found in cache. remove and re-pull")
delete(c.managerMap, md)
}

var (
buf *bytes.Buffer

Expand Down Expand Up @@ -458,8 +488,16 @@ func (c *Client) PullImage(base containerd.Image, ref, platform, proxyCfg string
log.G(c.ctx).WithField("image", ctrImg.Name).Debugf("created image")

// keep going and download layers
star.Init(c.cfg, false, manifest, imageConfig, mdd)
star.Init(c.ctx, c.cfg, false, manifest, imageConfig, mdd)

// create manager
c.managerMap[md] = star
log.G(c.ctx).WithField("manifest", md).Debugf("client: added manager")
release = true
c.managerMapLock.Unlock()

// check optimizer
// we should set optimizer before creating the filesystems
if c.defaultOptimizer {
if err = star.SetOptimizerOn(c.defaultOptimizeGroup); err != nil {
return nil, errors.Wrapf(err, "failed to enable optimizer")
Expand All @@ -470,7 +508,10 @@ func (c *Client) PullImage(base containerd.Image, ref, platform, proxyCfg string
return nil, errors.Wrapf(err, "failed to initialize directories")
}

if err = star.CreateSnapshots(c); err != nil {
// Create Snapshots
// should unlock the managerMapLock before calling CreateSnapshot
var chainIds []digest.Digest
if chainIds, err = star.CreateSnapshots(c); err != nil {
return nil, errors.Wrapf(err, "failed to create snapshots")
}

Expand All @@ -483,13 +524,18 @@ func (c *Client) PullImage(base containerd.Image, ref, platform, proxyCfg string
}

// mark as completed
if err = c.updateManifest(md); err != nil {
// update garbage collection labels
if err = c.updateManifest(md, chainIds); err != nil {
return nil, errors.Wrapf(err, "failed to update manifest")
}

return
}

// LoadImage loads image manifest from content store to the memory,
// if it is in memory, return manager directly.
//
// This method should not use any snapshotter methods to avoid recursive lock.
func (c *Client) LoadImage(manifest digest.Digest) (manager *Manager, err error) {

var (
Expand All @@ -500,14 +546,24 @@ func (c *Client) LoadImage(manifest digest.Digest) (manager *Manager, err error)
star Manager
)

// manager cache
c.managerMapLock.Lock()
defer c.managerMapLock.Unlock()

if m, ok := c.managerMap[manifest.String()]; ok {
return m, nil
}

// no cache, load from store
cs := c.client.ContentStore()
ii, err = cs.Info(c.ctx, manifest)
if err != nil {
return nil, errors.Wrapf(err, "failed to get manifest info")
}

if len(ii.Labels[util.ContentLabelCompletion]) == 0 {
return nil, errors.New("image is incomplete, remove and pull again")
log.G(c.ctx).WithField("d", manifest.String()).Warn("using incomplete image")
//return nil, errors.Errorf("incomplete image, remove and repull")
}

starlight := digest.Digest(ii.Labels[fmt.Sprintf("%s.starlight", util.ContentLabelContainerdGC)])
Expand All @@ -533,19 +589,59 @@ func (c *Client) LoadImage(manifest digest.Digest) (manager *Manager, err error)
return nil, err
}

star.Init(c.cfg, true, man, cfg, manifest)
star.Init(c.ctx, c.cfg, true, man, cfg, manifest)

// save to cache
c.managerMap[manifest.String()] = &star
log.G(c.ctx).WithField("manifest", manifest.String()).Debugf("client: added manager")

// update layerMap
c.layerMapLock.Lock()
defer c.layerMapLock.Unlock()

for idx, serial := range star.stackSerialMap {
layer := star.layers[serial]
if mp, has := c.layerMap[layer.Hash]; has {
if mp.manager == nil {
mp.manager = &star
mp.stack = int64(idx)
}
} else {
c.layerMap[layer.Hash] = &mountPoint{
fs: nil,
manager: &star,
stack: int64(idx),
snapshots: make(map[string]*snapshots.Info),
}
}
}

return &star, nil
}

func (c *Client) Close() {
// containerd client
_ = c.client.Close()

// snapshotter server
if c.snServer != nil {
c.snServer.Stop()
}
// CLI server
if c.cliServer != nil {
c.cliServer.Stop()
}

// filesystems
c.managerMapLock.Lock()
defer c.managerMapLock.Unlock()
for _, mg := range c.managerMap {
log.G(c.ctx).
WithField("manager", mg.manifestDigest.String()).
Debugf("client: closing manager")
mg.Teardown()
}

os.Exit(1)
}

Expand Down Expand Up @@ -585,60 +681,61 @@ func (c *Client) getStarlightFS(ssId string) string {
return filepath.Join(c.GetFilesystemPath(ssId), "slfs")
}

func (c *Client) PrepareManager(manifest digest.Digest) (err error) {
_, err = c.LoadImage(manifest)
return
}

// Mount returns the mountpoint for the given snapshot
// - md: manifest digest
// - ld: uncompressed layer digest
// - ssId: snapshot id
func (c *Client) Mount(md, ld, ssId string, sn *snapshots.Info) (mnt string, err error) {
func (c *Client) Mount(ld digest.Digest, ssId string, sn *snapshots.Info) (mnt string, err error) {
c.layerMapLock.Lock()
defer c.layerMapLock.Unlock()

if mp, has := c.layerMap[ld]; has {
if mp, has := c.layerMap[ld.String()]; has {
// fs != nil, fs has already created
if mp.fs != nil {
mp.snapshots[sn.Name] = sn
log.G(c.ctx).
WithField("s", ssId).
WithField("m", mp.manager.manifestDigest.String()).
Debugf("mount: found fs")
return mp.fs.GetMountPoint(), nil
}
// manager != nil but fs == nil
// manager has been created but not yet mounted
if mp.manager != nil {
// create mounting point
mnt = filepath.Join(c.GetMountingPoint(ssId), "slfs")
mp.fs, err = mp.manager.NewStarlightFS(mnt, mp.stack, &fusefs.Options{}, false)
if err != nil {
return "", errors.Wrapf(err, "failed to mount filesystem")
}

// create filesystem and serve
go mp.fs.Serve()
mp.snapshots[sn.Name] = sn
log.G(c.ctx).
WithField("s", ssId).
WithField("m", mp.manager.manifestDigest.String()).
Debugf("mount: found manager")
return mnt, nil
}
}

// create a new filesystem
var man *Manager
man, err = c.LoadImage(digest.Digest(md))
if err != nil {
return "", errors.Wrapf(err, "failed to load image manager")
}
log.G(c.ctx).
WithField("s", ssId).
Warn("mount: no manager found")

// mount manager
for idx, layer := range man.Destination.Layers {
c.layerMap[layer.Hash] = &mountPoint{
fs: nil,
manager: man,
stack: int64(idx),
snapshots: map[string]*snapshots.Info{sn.Name: sn},
}
}
return "", common.ErrNoManager
} else {
// if it does not exist,
// it means the layer is neither in the process of downloading nor extracted

mnt = filepath.Join(c.GetMountingPoint(ssId), "slfs")
current := c.layerMap[ld]
current.fs, err = current.manager.NewStarlightFS(mnt, current.stack, &fusefs.Options{}, false)
if err != nil {
return "", errors.Wrapf(err, "failed to mount filesystem")
return "", errors.New("layer incomplete or not found. remove and pull again")
}
go current.fs.Serve()
current.snapshots[sn.Name] = sn
return mnt, nil

}

func (c *Client) Unmount(cd, sn string) error {
Expand Down Expand Up @@ -667,6 +764,11 @@ func (c *Client) Unmount(cd, sn string) error {
return nil
}

log.G(c.ctx).
WithField("d", cd).
WithField("mnt", layer.fs.GetMountPoint()).
Debug("fs: unmount")

if err := layer.fs.Teardown(); err != nil {
return err
}
Expand Down Expand Up @@ -995,7 +1097,8 @@ func NewClient(ctx context.Context, cfg *Configuration) (c *Client, err error) {
cfg: cfg,
client: nil,

layerMap: make(map[string]*mountPoint),
layerMap: make(map[string]*mountPoint),
managerMap: make(map[string]*Manager),
}

// containerd client
Expand Down
1 change: 1 addition & 0 deletions client/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (n *StarlightFsNode) Open(ctx context.Context, flags uint32) (fs.FileHandle

log.G(ctx).WithFields(logrus.Fields{
"f": name,
"_s": n.instance.stack,
"_r": r,
}).Trace("open")

Expand Down
1 change: 1 addition & 0 deletions client/fs/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (fi *Instance) GetMountPoint() string { return fi.mountPoint }
func (fi *Instance) GetServer() *fuse.Server { return fi.server }

// Teardown unmounts the file system and close the logging file if there is one writing
// should you need this function, please consider using Manager.Teardown instead.
func (fi *Instance) Teardown() error {
return fi.GetServer().Unmount()
}
Expand Down
Loading

0 comments on commit 5222c4f

Please sign in to comment.