diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index 1dc60acb9b0f..523c605104f2 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -850,6 +850,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err LeaseManager: w.LeaseManager(), ContentStore: w.ContentStore(), HistoryConfig: cfg.History, + GarbageCollect: w.GarbageCollect, }) } diff --git a/control/control.go b/control/control.go index a007e660e42d..f3a10b5b0993 100644 --- a/control/control.go +++ b/control/control.go @@ -70,6 +70,7 @@ type Opt struct { LeaseManager *leaseutil.Manager ContentStore *containerdsnapshot.Store HistoryConfig *config.HistoryConfig + GarbageCollect func(context.Context) error } type Controller struct { // TODO: ControlService @@ -89,10 +90,11 @@ func NewController(opt Opt) (*Controller, error) { gatewayForwarder := controlgateway.NewGatewayForwarder() hq, err := llbsolver.NewHistoryQueue(llbsolver.HistoryQueueOpt{ - DB: opt.HistoryDB, - LeaseManager: opt.LeaseManager, - ContentStore: opt.ContentStore, - CleanConfig: opt.HistoryConfig, + DB: opt.HistoryDB, + LeaseManager: opt.LeaseManager, + ContentStore: opt.ContentStore, + CleanConfig: opt.HistoryConfig, + GarbageCollect: opt.GarbageCollect, }) if err != nil { return nil, errors.Wrap(err, "failed to create history queue") diff --git a/solver/llbsolver/history.go b/solver/llbsolver/history.go index e9b62bbdd02a..f342952e80ba 100644 --- a/solver/llbsolver/history.go +++ b/solver/llbsolver/history.go @@ -42,10 +42,11 @@ const ( ) type HistoryQueueOpt struct { - DB db.Transactor - LeaseManager *leaseutil.Manager - ContentStore *containerdsnapshot.Store - CleanConfig *config.HistoryConfig + DB db.Transactor + LeaseManager *leaseutil.Manager + ContentStore *containerdsnapshot.Store + CleanConfig *config.HistoryConfig + GarbageCollect func(context.Context) error } type HistoryQueue struct { @@ -323,7 +324,7 @@ func (h *HistoryQueue) gc() error { now := time.Now() for _, r := range records[h.opt.CleanConfig.MaxEntries:] { if now.Add(-h.opt.CleanConfig.MaxAge.Duration).After(r.CompletedAt.AsTime()) { - if err := h.delete(r.Ref, false); err != nil { + if _, err := h.delete(r.Ref); err != nil { return err } } @@ -365,7 +366,7 @@ func (h *HistoryQueue) clearOrphans() error { for _, r := range records { bklog.G(ctx).Warnf("deleting build record %s due to missing blobs", r.Ref) - if err := h.delete(r.Ref, false); err != nil { + if _, err := h.delete(r.Ref); err != nil { return err } } @@ -373,10 +374,10 @@ func (h *HistoryQueue) clearOrphans() error { return nil } -func (h *HistoryQueue) delete(ref string, sync bool) error { +func (h *HistoryQueue) delete(ref string) (bool, error) { if _, ok := h.refs[ref]; ok { h.deleted[ref] = struct{}{} - return nil + return false, nil } delete(h.deleted, ref) h.ps.Send(&controlapi.BuildHistoryEvent{ @@ -389,19 +390,15 @@ func (h *HistoryQueue) delete(ref string, sync bool) error { return errors.Wrapf(os.ErrNotExist, "failed to retrieve bucket %s", recordsBucket) } err1 := b.Delete([]byte(ref)) - var opts []leases.DeleteOpt - if sync { - opts = append(opts, leases.SynchronousDelete) - } - err2 := h.hLeaseManager.Delete(context.TODO(), leases.Lease{ID: h.leaseID(ref)}, opts...) + err2 := h.hLeaseManager.Delete(context.TODO(), leases.Lease{ID: h.leaseID(ref)}) if err1 != nil { return err1 } return err2 }); err != nil { - return err + return false, err } - return nil + return true, nil } func (h *HistoryQueue) init() error { @@ -683,7 +680,14 @@ func (h *HistoryQueue) Delete(ctx context.Context, ref string) error { h.mu.Lock() defer h.mu.Unlock() - return h.delete(ref, true) + v, err := h.delete(ref) + if err != nil { + return err + } + if v { + return h.opt.GarbageCollect(ctx) + } + return nil } func (h *HistoryQueue) OpenBlobWriter(ctx context.Context, mt string) (_ *Writer, err error) { @@ -909,7 +913,7 @@ func (h *HistoryQueue) Listen(ctx context.Context, req *controlapi.BuildHistoryR if _, ok := h.deleted[req.Ref]; ok { if h.refs[req.Ref] == 0 { delete(h.refs, req.Ref) - h.delete(req.Ref, false) + h.delete(req.Ref) } } h.mu.Unlock() diff --git a/worker/base/worker.go b/worker/base/worker.go index 0a1743929a91..3f3a77c8be71 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -211,6 +211,14 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) { }, nil } +func (w *Worker) GarbageCollect(ctx context.Context) error { + if w.WorkerOpt.GarbageCollect == nil { + return nil + } + _, err := w.WorkerOpt.GarbageCollect(ctx) + return err +} + func (w *Worker) Close() error { var rerr error if err := w.MetadataStore.Close(); err != nil { diff --git a/worker/worker.go b/worker/worker.go index e5d6ef564c5d..ca5292e9eced 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -40,6 +40,7 @@ type Worker interface { Executor() executor.Executor CacheManager() cache.Manager LeaseManager() *leaseutil.Manager + GarbageCollect(context.Context) error } type Infos interface {