diff --git a/api/services/control/control.pb.go b/api/services/control/control.pb.go index 8955e808247f..69d81a17f08b 100644 --- a/api/services/control/control.pb.go +++ b/api/services/control/control.pb.go @@ -627,6 +627,9 @@ type CacheOptionsEntry struct { // Attrs are like mode=(min,max), ref=example.com:5000/foo/bar . // See cache importer/exporter implementations' documentation. Attrs map[string]string `protobuf:"bytes,2,rep,name=Attrs,proto3" json:"Attrs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // ID identifies this exporter. + // ID should be treated by the exporter as opaque. + ID string `protobuf:"bytes,3,opt,name=ID,proto3" json:"ID,omitempty"` } func (x *CacheOptionsEntry) Reset() { @@ -673,6 +676,13 @@ func (x *CacheOptionsEntry) GetAttrs() map[string]string { return nil } +func (x *CacheOptionsEntry) GetID() string { + if x != nil { + return x.ID + } + return "" +} + type SolveResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2301,13 +2311,14 @@ var file_github_com_moby_buildkit_api_services_control_control_proto_rawDesc = [ 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, - 0xa7, 0x01, 0x0a, 0x11, 0x43, 0x61, 0x63, 0x68, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0xb7, 0x01, 0x0a, 0x11, 0x43, 0x61, 0x63, 0x68, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x44, 0x0a, 0x05, 0x41, 0x74, 0x74, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2e, 0x2e, 0x6d, 0x6f, 0x62, 0x79, 0x2e, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x6b, 0x69, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x61, 0x63, 0x68, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x41, 0x74, - 0x74, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x41, 0x74, 0x74, 0x72, 0x73, 0x1a, + 0x74, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x41, 0x74, 0x74, 0x72, 0x73, 0x12, + 0x0e, 0x0a, 0x02, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x44, 0x1a, 0x38, 0x0a, 0x0a, 0x41, 0x74, 0x74, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, diff --git a/api/services/control/control.proto b/api/services/control/control.proto index d34622312e03..318f086f2757 100644 --- a/api/services/control/control.proto +++ b/api/services/control/control.proto @@ -102,6 +102,9 @@ message CacheOptionsEntry { // Attrs are like mode=(min,max), ref=example.com:5000/foo/bar . // See cache importer/exporter implementations' documentation. map Attrs = 2; + // ID identifies this exporter. + // ID should be treated by the exporter as opaque. + string ID = 3; } message SolveResponse { diff --git a/api/services/control/control_vtproto.pb.go b/api/services/control/control_vtproto.pb.go index 755ad23f1a58..5c33671d01ff 100644 --- a/api/services/control/control_vtproto.pb.go +++ b/api/services/control/control_vtproto.pb.go @@ -233,6 +233,7 @@ func (m *CacheOptionsEntry) CloneVT() *CacheOptionsEntry { } r := new(CacheOptionsEntry) r.Type = m.Type + r.ID = m.ID if rhs := m.Attrs; rhs != nil { tmpContainer := make(map[string]string, len(rhs)) for k, v := range rhs { @@ -1187,6 +1188,9 @@ func (this *CacheOptionsEntry) EqualVT(that *CacheOptionsEntry) bool { return false } } + if this.ID != that.ID { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -2635,6 +2639,13 @@ func (m *CacheOptionsEntry) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.ID) > 0 { + i -= len(m.ID) + copy(dAtA[i:], m.ID) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.ID))) + i-- + dAtA[i] = 0x1a + } if len(m.Attrs) > 0 { for k := range m.Attrs { v := m.Attrs[k] @@ -4447,6 +4458,10 @@ func (m *CacheOptionsEntry) SizeVT() (n int) { n += mapEntrySize + 1 + protohelpers.SizeOfVarint(uint64(mapEntrySize)) } } + l = len(m.ID) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -7033,6 +7048,38 @@ func (m *CacheOptionsEntry) UnmarshalVT(dAtA []byte) error { } m.Attrs[mapkey] = mapvalue iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ID = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/client/solve.go b/client/solve.go index b221e4e4918f..11964004118b 100644 --- a/client/solve.go +++ b/client/solve.go @@ -47,22 +47,21 @@ type SolveOpt struct { CacheImports []CacheOptionsEntry Session []session.Attachable AllowedEntitlements []entitlements.Entitlement - // When the session is custom-initialized, ParseExporterOpts need to be used to correctly - // set up the session for export. + // When the session is custom-initialized, Init can be used to + // set up the session for export automatically. SharedSession *session.Session // TODO: refactor to better session syncing SessionPreInitialized bool // TODO: refactor to better session syncing Internal bool SourcePolicy *spb.Policy Ref string - // internal exporter state + // internal solver state s solverState } type solverState struct { - // storesToUpdate maps exporter ID -> oci store - storesToUpdate map[string]ociStore - cacheOpt *cacheOptions + exporterOpt *exporterOptions + cacheOpt *cacheOptions // Only one of runGateway or def can be set. // runGateway optionally defines the gateway callback runGateway runGatewayCB @@ -70,15 +69,21 @@ type solverState struct { def *llb.Definition } +type exporterOptions struct { + // storesToUpdate maps exporter ID -> oci store + storesToUpdate map[string]ociStore +} + type cacheOptions struct { options controlapi.CacheOptions contentStores map[string]content.Store // key: ID of content store ("local:" + csDir) - storesToUpdate map[string]string // key: path to content store, value: tag + storesToUpdate map[string]ociStore // key: exporter ID frontendAttrs map[string]string } type ociStore struct { path string + tag string } type ExportEntry struct { @@ -86,11 +91,19 @@ type ExportEntry struct { Attrs map[string]string Output filesync.FileOutputFunc // for ExporterOCI and ExporterDocker OutputDir string // for ExporterLocal + + // id identifies the exporter in the configuration. + // Will be assigned automatically and should not be set by the user. + id string } type CacheOptionsEntry struct { Type string Attrs map[string]string + + // id identifies the exporter in the configuration. + // Will be assigned automatically and should not be set by the user. + id string } // Solve calls Solve on the controller. @@ -115,9 +128,32 @@ func (c *Client) Solve(ctx context.Context, def *llb.Definition, opt SolveOpt, s type runGatewayCB func(ref string, s *session.Session, opts map[string]string) error -// ParseExporterOpts configures the specified session with the underlying exporter configuration. +// Init initializes the SolveOpt. +// It parses and initializes the cache exports/imports and output exporters. +func (opt *SolveOpt) Init(ctx context.Context, s *session.Session) error { + opt.initExporterIDs() + if err := opt.parseCacheOptions(ctx); err != nil { + return err + } + return opt.parseExporterOptions(s) +} + +func (opt *SolveOpt) initExporterIDs() { + for i := range opt.Exports { + opt.Exports[i].id = strconv.Itoa(i) + } + for i := range opt.CacheExports { + opt.CacheExports[i].id = strconv.Itoa(i) + } +} + +// parseExporterOptions configures the specified session with the underlying exporter configuration. // It needs to be invoked *after* ParseCacheOpts -func ParseExporterOpts(opt *SolveOpt, s *session.Session) error { +func (opt *SolveOpt) parseExporterOptions(s *session.Session) error { + if opt.s.exporterOpt != nil { + return nil + } + mounts, err := prepareMounts(opt) if err != nil { return err @@ -145,8 +181,9 @@ func ParseExporterOpts(opt *SolveOpt, s *session.Session) error { contentStores[key2] = store } + opt.s.exporterOpt = &exporterOptions{} var syncTargets []filesync.FSSyncTarget - for exID, ex := range opt.Exports { + for _, ex := range opt.Exports { var supportFile bool var supportDir bool switch ex.Type { @@ -171,7 +208,7 @@ func ParseExporterOpts(opt *SolveOpt, s *session.Session) error { if ex.Output == nil { return errors.Errorf("output file writer is required for %s exporter", ex.Type) } - syncTargets = append(syncTargets, filesync.WithFSSync(exID, ex.Output)) + syncTargets = append(syncTargets, filesync.WithFSSync(ex.id, ex.Output)) } if supportDir { if ex.OutputDir == "" { @@ -187,12 +224,12 @@ func ParseExporterOpts(opt *SolveOpt, s *session.Session) error { return err } contentStores["export"] = cs - if opt.s.storesToUpdate == nil { - opt.s.storesToUpdate = make(map[string]ociStore) + if opt.s.exporterOpt.storesToUpdate == nil { + opt.s.exporterOpt.storesToUpdate = make(map[string]ociStore) } - opt.s.storesToUpdate[strconv.Itoa(exID)] = ociStore{path: ex.OutputDir} + opt.s.exporterOpt.storesToUpdate[ex.id] = ociStore{path: ex.OutputDir} default: - syncTargets = append(syncTargets, filesync.WithFSSyncDir(exID, ex.OutputDir)) + syncTargets = append(syncTargets, filesync.WithFSSyncDir(ex.id, ex.OutputDir)) } } } @@ -236,13 +273,15 @@ func (c *Client) solve(ctx context.Context, opt SolveOpt, statusChan chan *Solve } } - err := ParseCacheOptions(ctx, &opt) + opt.initExporterIDs() + + err := opt.parseCacheOptions(ctx) if err != nil { return nil, err } if !opt.SessionPreInitialized { - if err := ParseExporterOpts(&opt, s); err != nil { + if err := opt.parseExporterOptions(s); err != nil { return nil, err } @@ -299,8 +338,7 @@ func (c *Client) solve(ctx context.Context, opt SolveOpt, statusChan chan *Solve exports = append(exports, &controlapi.Exporter{ Type: exp.Type, Attrs: exp.Attrs, - // Keep this in sync with SetupExporters id assignment - ID: strconv.Itoa(i), + ID: exp.id, }) } @@ -330,6 +368,7 @@ func (c *Client) solve(ctx context.Context, opt SolveOpt, statusChan chan *Solve for _, resp := range resp.ExporterResponses { res.ExporterResponses = append(res.ExporterResponses, ExporterResponse{ ID: resp.Metadata.ID, + Type: resp.Metadata.Type, Data: resp.Data, }) } @@ -389,26 +428,27 @@ func (c *Client) solve(ctx context.Context, opt SolveOpt, statusChan chan *Solve if err := eg.Wait(); err != nil { return nil, err } - // Update index.json of exported cache content store - // FIXME(AkihiroSuda): dedupe const definition of cache/remotecache.ExporterResponseManifestDesc = "cache.manifest" - if manifestDescJSON := res.ExporterResponse["cache.manifest"]; manifestDescJSON != "" { - var manifestDesc ocispecs.Descriptor - if err = json.Unmarshal([]byte(manifestDescJSON), &manifestDesc); err != nil { + + for id, store := range opt.s.cacheOpt.storesToUpdate { + // Update index.json of exported cache content store + manifestDesc, err := getCacheManifestDescriptor(id, res) + if err != nil { return nil, err } - for storePath, tag := range opt.s.cacheOpt.storesToUpdate { - idx := ociindex.NewStoreIndex(storePath) - if err := idx.Put(manifestDesc, ociindex.Tag(tag)); err != nil { - return nil, err - } + if manifestDesc == nil { + continue + } + idx := ociindex.NewStoreIndex(store.path) + if err := idx.Put(*manifestDesc, ociindex.Tag(store.tag)); err != nil { + return nil, err } } - if len(opt.s.storesToUpdate) == 0 { + if len(opt.s.exporterOpt.storesToUpdate) == 0 { return res, nil } - for id, store := range opt.s.storesToUpdate { - manifestDesc, err := getManifestDescriptor(id, res) + for id, store := range opt.s.exporterOpt.storesToUpdate { + manifestDesc, err := getImageManifestDescriptor(id, res) if err != nil { return nil, err } @@ -433,25 +473,43 @@ func (c *Client) solve(ctx context.Context, opt SolveOpt, statusChan chan *Solve return res, nil } -func getManifestDescriptor(exporterID string, resp *SolveResponse) (*ocispecs.Descriptor, error) { +func getCacheManifestDescriptor(exporterID string, resp *SolveResponse) (*ocispecs.Descriptor, error) { + const exporterResponseManifestDesc = "cache.manifest" + if resp := resp.cacheExporter(exporterID); resp != nil { + // FIXME(AkihiroSuda): dedupe const definition of cache/remotecache.ExporterResponseManifestDesc = "cache.manifest" + if manifestDescDt := resp.Data[exporterResponseManifestDesc]; manifestDescDt != "" { + return unmarshalManifestDescriptor(manifestDescDt) + } + } + if manifestDescDt := resp.ExporterResponse[exporterResponseManifestDesc]; manifestDescDt != "" { + return unmarshalManifestDescriptor(manifestDescDt) + } + return nil, nil +} + +func getImageManifestDescriptor(exporterID string, resp *SolveResponse) (*ocispecs.Descriptor, error) { if resp := resp.exporter(exporterID); resp != nil { if manifestDescDt := resp.Data[exptypes.ExporterImageDescriptorKey]; manifestDescDt != "" { - return unmarshalManifestDescriptor(manifestDescDt) + return unmarshalEncodedManifestDescriptor(manifestDescDt) } } if manifestDescDt := resp.ExporterResponse[exptypes.ExporterImageDescriptorKey]; manifestDescDt != "" { - return unmarshalManifestDescriptor(manifestDescDt) + return unmarshalEncodedManifestDescriptor(manifestDescDt) } return nil, nil } -func unmarshalManifestDescriptor(manifestDesc string) (*ocispecs.Descriptor, error) { - manifestDescDt, err := base64.StdEncoding.DecodeString(manifestDesc) +func unmarshalEncodedManifestDescriptor(base64Payload string) (*ocispecs.Descriptor, error) { + manifestDescDt, err := base64.StdEncoding.DecodeString(base64Payload) if err != nil { return nil, err } + return unmarshalManifestDescriptor(string(manifestDescDt)) +} + +func unmarshalManifestDescriptor(manifestDescJSON string) (*ocispecs.Descriptor, error) { var desc ocispecs.Descriptor - if err = json.Unmarshal([]byte(manifestDescDt), &desc); err != nil { + if err := json.Unmarshal([]byte(manifestDescJSON), &desc); err != nil { return nil, err } return &desc, nil @@ -502,13 +560,16 @@ func prepareSyncedFiles(def *llb.Definition, localMounts map[string]fsutil.FS) ( return result, nil } -func ParseCacheOptions(ctx context.Context, opt *SolveOpt) error { +func (opt *SolveOpt) parseCacheOptions(ctx context.Context) error { + if opt.s.cacheOpt != nil { + return nil + } var ( cacheExports []*controlapi.CacheOptionsEntry cacheImports []*controlapi.CacheOptionsEntry ) contentStores := make(map[string]content.Store) - storesToUpdate := make(map[string]string) + storesToUpdate := make(map[string]ociStore) frontendAttrs := make(map[string]string) for _, ex := range opt.CacheExports { if ex.Type == "local" { @@ -529,8 +590,7 @@ func ParseCacheOptions(ctx context.Context, opt *SolveOpt) error { if t, ok := ex.Attrs["tag"]; ok { tag = t } - // TODO(AkihiroSuda): support custom index JSON path and tag - storesToUpdate[csDir] = tag + storesToUpdate[ex.id] = ociStore{path: csDir, tag: tag} } if ex.Type == "registry" { regRef := ex.Attrs["ref"] @@ -541,6 +601,7 @@ func ParseCacheOptions(ctx context.Context, opt *SolveOpt) error { cacheExports = append(cacheExports, &controlapi.CacheOptionsEntry{ Type: ex.Type, Attrs: ex.Attrs, + ID: ex.id, }) } for _, im := range opt.CacheImports { diff --git a/control/control.go b/control/control.go index 671aa281580e..986321894113 100644 --- a/control/control.go +++ b/control/control.go @@ -435,7 +435,7 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (* if !ok { return nil, errors.Errorf("unknown cache exporter: %q", e.Type) } - var exp llbsolver.RemoteCacheExporter + exp := llbsolver.RemoteCacheExporter{ID: e.ID} exp.Exporter, err = cacheExporterFunc(ctx, session.NewGroup(req.Session), e.Attrs) if err != nil { return nil, errors.Wrapf(err, "failed to configure %v cache exporter", e.Type) diff --git a/session/filesync/filesync.go b/session/filesync/filesync.go index 9f2c3849368d..103fd0c20183 100644 --- a/session/filesync/filesync.go +++ b/session/filesync/filesync.go @@ -241,7 +241,7 @@ type FSSyncTarget interface { } type fsSyncTarget struct { - id int + id string outdir string f FileOutputFunc } @@ -250,14 +250,14 @@ func (target *fsSyncTarget) target() *fsSyncTarget { return target } -func WithFSSync(id int, f FileOutputFunc) FSSyncTarget { +func WithFSSync(id string, f FileOutputFunc) FSSyncTarget { return &fsSyncTarget{ id: id, f: f, } } -func WithFSSyncDir(id int, outdir string) FSSyncTarget { +func WithFSSyncDir(id, outdir string) FSSyncTarget { return &fsSyncTarget{ id: id, outdir: outdir, @@ -265,8 +265,8 @@ func WithFSSyncDir(id int, outdir string) FSSyncTarget { } func NewFSSyncTarget(targets ...FSSyncTarget) session.Attachable { - fs := make(map[int]FileOutputFunc) - outdirs := make(map[int]string) + fs := make(map[string]FileOutputFunc) + outdirs := make(map[string]string) for _, t := range targets { t := t.target() if t.f != nil { @@ -283,28 +283,26 @@ func NewFSSyncTarget(targets ...FSSyncTarget) session.Attachable { } type fsSyncAttachable struct { - fs map[int]FileOutputFunc - outdirs map[int]string + // maps exporter id -> file output handler + fs map[string]FileOutputFunc + // maps exporter id -> output directory + outdirs map[string]string } func (sp *fsSyncAttachable) Register(server *grpc.Server) { RegisterFileSendServer(server, sp) } -func (sp *fsSyncAttachable) chooser(ctx context.Context) int { +func (sp *fsSyncAttachable) chooser(ctx context.Context) string { md, ok := metadata.FromIncomingContext(ctx) if !ok { - return 0 + return "" } values := md[keyExporterID] if len(values) == 0 { - return 0 + return "" } - id, err := strconv.ParseInt(values[0], 10, 64) - if err != nil { - return 0 - } - return int(id) + return values[0] } func (sp *fsSyncAttachable) DiffCopy(stream FileSend_DiffCopyServer) (err error) { @@ -314,7 +312,7 @@ func (sp *fsSyncAttachable) DiffCopy(stream FileSend_DiffCopyServer) (err error) } f, ok := sp.fs[id] if !ok { - return errors.Errorf("exporter %d not found", id) + return errors.Errorf("exporter %s not found", id) } opts, _ := metadata.FromIncomingContext(stream.Context()) // if no metadata continue with empty object diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index f181d27da858..edae16089959 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -68,6 +68,9 @@ type RemoteCacheExporter struct { remotecache.Exporter solver.CacheExportMode IgnoreError bool + + // ID identifies the exporter + ID string } // ResolveWorkerFunc returns default worker for the temporary default non-distributed use cases @@ -723,11 +726,7 @@ func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j * } resp, err := exp.Finalize(ctx) exporterResponses[i] = &controlapi.ExporterResponse{ - // FIXME(dima): this needs proper disambiguation with IDs similar to output exporters - // as the same type cache exporters can potentially be specified multiple times (e.g. local - // cache exports/imports). - // This, it should be possible to reuse controlapi.ExporterResponse for both - Metadata: &controlapi.ExporterMetadata{ID: exp.Name()}, + Metadata: &controlapi.ExporterMetadata{ID: exp.ID}, Data: resp, } return prepareDone(err)