diff --git a/cmd/helm-s3/push.go b/cmd/helm-s3/push.go index 7aee90a7..dc30dd88 100644 --- a/cmd/helm-s3/push.go +++ b/cmd/helm-s3/push.go @@ -18,6 +18,7 @@ const pushDesc = `This command uploads a chart to the repository. 'helm s3 push' takes two arguments: - PATH - path to the chart file, - REPO - target repository. +- TAGS - S3 object tags. ` const pushExample = ` helm s3 push ./epicservice-0.5.1.tgz my-repo - uploads chart file 'epicservice-0.5.1.tgz' from the current directory to the repository with name 'my-repo'.` @@ -38,6 +39,8 @@ func newPushCommand(opts *options) *cobra.Command { force: false, ignoreIfExists: false, relative: false, + skipReindex: false, + tags: "", } cmd := &cobra.Command{ @@ -69,6 +72,8 @@ func newPushCommand(opts *options) *cobra.Command { flags.BoolVar(&act.force, "force", act.force, "Replace the chart if it already exists. This can cause the repository to lose existing chart; use it with care.") flags.BoolVar(&act.ignoreIfExists, "ignore-if-exists", act.ignoreIfExists, "If the chart already exists, exit normally and do not trigger an error.") flags.BoolVar(&act.relative, "relative", act.relative, "Use relative chart URL in the index instead of absolute.") + flags.BoolVar(&act.skipReindex, "skip-reindex", act.skipReindex, "Skip reindex after pushing the chart.") + flags.StringVar(&act.tags, "tags", act.tags, "S3 object tags.") // We don't use cobra's feature // @@ -99,6 +104,8 @@ type pushAction struct { force bool ignoreIfExists bool relative bool + skipReindex bool + tags string } func (act *pushAction) run(ctx context.Context) error { @@ -186,7 +193,7 @@ func (act *pushAction) run(ctx context.Context) error { if err != nil { return err } - if _, err := storage.PutChart(ctx, repoEntry.URL()+"/"+fname, fchart, string(chartMetaJSON), act.acl, hash, act.contentType); err != nil { + if _, err := storage.PutChart(ctx, repoEntry.URL()+"/"+fname, fchart, string(chartMetaJSON), act.acl, hash, act.contentType, act.tags); err != nil { return errors.WithMessage(err, "upload chart to s3") } } @@ -201,40 +208,42 @@ func (act *pushAction) run(ctx context.Context) error { if err != nil { return errors.WithMessage(err, "fetch current repo index") } + if !act.skipReindex { + idx := helmutil.NewIndex() + if err := idx.UnmarshalBinary(b); err != nil { + return errors.WithMessage(err, "load index from downloaded file") + } - idx := helmutil.NewIndex() - if err := idx.UnmarshalBinary(b); err != nil { - return errors.WithMessage(err, "load index from downloaded file") - } - - baseURL := repoEntry.URL() - if act.relative { - baseURL = "" - } - - filename := escapeIfRelative(fname, act.relative) + baseURL := repoEntry.URL() + if act.relative { + baseURL = "" + } - if err := idx.AddOrReplace(chart.Metadata().Value(), filename, baseURL, hash); err != nil { - return errors.WithMessage(err, "add/replace chart in the index") - } - idx.SortEntries() - idx.UpdateGeneratedTime() + filename := escapeIfRelative(fname, act.relative) - idxReader, err := idx.Reader() - if err != nil { - return errors.WithMessage(err, "get index reader") - } + if err := idx.AddOrReplace(chart.Metadata().Value(), filename, baseURL, hash); err != nil { + return errors.WithMessage(err, "add/replace chart in the index") + } + idx.SortEntries() + idx.UpdateGeneratedTime() - if !act.dryRun { - if err := storage.PutIndex(ctx, repoEntry.URL(), act.acl, idxReader); err != nil { - return errors.WithMessage(err, "upload index to s3") + idxReader, err := idx.Reader() + if err != nil { + return errors.WithMessage(err, "get index reader") } - if err := idx.WriteFile(repoEntry.CacheFile(), helmutil.DefaultIndexFilePerm); err != nil { - return errors.WithMessage(err, "update local index") + if !act.dryRun { + if err := storage.PutIndex(ctx, repoEntry.URL(), act.acl, idxReader); err != nil { + return errors.WithMessage(err, "upload index to s3") + } + + if err := idx.WriteFile(repoEntry.CacheFile(), helmutil.DefaultIndexFilePerm); err != nil { + return errors.WithMessage(err, "update local index") + } } + } else { + act.printer.Printf("[DEBUG] Skipping reindex cause skipReindex is set to %b.\n", act.skipReindex) } - act.printer.Printf("Successfully uploaded the chart to the repository.\n") return nil } diff --git a/cmd/helm-s3/reindex.go b/cmd/helm-s3/reindex.go index 99571d72..73c0425a 100644 --- a/cmd/helm-s3/reindex.go +++ b/cmd/helm-s3/reindex.go @@ -1,8 +1,13 @@ package main import ( + "bufio" + "bytes" "context" - "fmt" + "io" + "os" + "sync" + "time" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -10,6 +15,9 @@ import ( "github.com/hypnoglow/helm-s3/internal/awss3" "github.com/hypnoglow/helm-s3/internal/awsutil" "github.com/hypnoglow/helm-s3/internal/helmutil" + log "github.com/sirupsen/logrus" + "helm.sh/helm/v3/pkg/chart" + "helm.sh/helm/v3/pkg/repo" ) const reindexDesc = `This command performs a reindex of the repository. @@ -19,6 +27,7 @@ const reindexDesc = `This command performs a reindex of the repository. ` const reindexExample = ` helm s3 reindex my-repo - performs a reindex of the repository with name 'my-repo'.` +const batchSize = 1000 func newReindexCommand(opts *options) *cobra.Command { act := &reindexAction{ @@ -27,6 +36,7 @@ func newReindexCommand(opts *options) *cobra.Command { verbose: false, repoName: "", relative: false, + dryRun: false, } cmd := &cobra.Command{ @@ -50,6 +60,7 @@ func newReindexCommand(opts *options) *cobra.Command { flags := cmd.Flags() flags.BoolVar(&act.relative, "relative", act.relative, "Use relative chart URLs in the index instead of absolute.") + flags.BoolVar(&act.dryRun, "dry-run", act.dryRun, "Simulate reindex, don't push it to the dest repo.") return cmd } @@ -69,9 +80,15 @@ type reindexAction struct { // flags relative bool + + dryRun bool } func (act *reindexAction) run(ctx context.Context) error { + start := time.Now() + log.Infof("Starting reindex for %s", start) + act.printer.Printf("[DEBUG] Starting reindex.\n") + repoEntry, err := helmutil.LookupRepoEntry(act.repoName) if err != nil { return err @@ -83,52 +100,97 @@ func (act *reindexAction) run(ctx context.Context) error { } storage := awss3.New(sess) - items, errs := storage.Traverse(ctx, repoEntry.URL()) + items, _ := storage.Traverse(ctx, repoEntry.URL()) - builtIndex := make(chan helmutil.Index, 1) - go func() { - idx := helmutil.NewIndex() - for item := range items { - baseURL := repoEntry.URL() - if act.relative { - baseURL = "" - } + // Use a buffered channel to handle the concurrent indexing + builtIndex := make(chan *repo.IndexFile, len(items)/batchSize+1) + var wg sync.WaitGroup - if act.verbose { - act.printer.Printf("[DEBUG] Adding %s to index.\n", item.Filename) - } + act.printer.Printf("[DEBUG] Creating split indexes.\n") + + // Process items in batches of 1000 + log.Info("Processing items in batches of 1000") + log.Info("Total items: ", len(items)) + for i := 0; i < len(items); i += batchSize { + end := i + batchSize + if end > len(items) { + end = len(items) + } - filename := escapeIfRelative(item.Filename, act.relative) + wg.Add(1) + go func(batch []awss3.ChartInfo) { + defer wg.Done() + idx := repo.NewIndexFile() - if err := idx.Add(item.Meta.Value(), filename, baseURL, item.Hash); err != nil { - act.printer.PrintErrf("[ERROR] failed to add chart to the index: %s", err) + for _, item := range batch { + baseURL := repoEntry.URL() + if act.relative { + baseURL = "" + } + + if act.verbose { + act.printer.Printf("[DEBUG] Adding %s to index.\n", item.Filename) + } + + filename := escapeIfRelative(item.Filename, act.relative) + + if err := idx.MustAdd(item.Meta.Value().(*chart.Metadata), filename, baseURL, item.Hash); err != nil { + act.printer.PrintErrf("[ERROR] failed to add chart to the index: %s", err) + } } - } - idx.SortEntries() - idx.UpdateGeneratedTime() - builtIndex <- idx - }() + builtIndex <- idx + }(items[i:end]) + } - for err = range errs { - return fmt.Errorf("traverse the chart repository: %v", err) + log.Info("Waiting for all goroutines to finish") + // Wait for all goroutines to finish + wg.Wait() + close(builtIndex) + + log.Info("Processing indexes") + // Merge the individual index files into a single index file + finalIndex := repo.NewIndexFile() + for idx := range builtIndex { + finalIndex.Merge(idx) + } + + finalIndex.SortEntries() + + if err := finalIndex.WriteFile(repoEntry.CacheFile(), helmutil.DefaultIndexFilePerm); err != nil { + return errors.WithMessage(err, "update local index") } + log.Infof("Index file written to %s", repoEntry.CacheFile()) - idx := <-builtIndex + file, err := os.Open(repoEntry.CacheFile()) + if err != nil { + return errors.Wrap(err, "open index file") + } + defer file.Close() - r, err := idx.Reader() + // Get the file size + stat, err := file.Stat() if err != nil { - return errors.Wrap(err, "get index reader") + return errors.Wrap(err, "get file stats") } - if err := storage.PutIndex(ctx, repoEntry.URL(), act.acl, r); err != nil { - return errors.Wrap(err, "upload index to the repository") + // Read the file into a byte slice + ra := make([]byte, stat.Size()) + if _, err := bufio.NewReader(file).Read(ra); err != nil && err != io.EOF { + return errors.Wrap(err, "read index file") } - if err := idx.WriteFile(repoEntry.CacheFile(), helmutil.DefaultIndexFilePerm); err != nil { - return errors.WithMessage(err, "update local index") + r := bytes.NewReader(ra) + + if !act.dryRun { + if err := storage.PutIndex(ctx, repoEntry.URL(), act.acl, r); err != nil { + return errors.Wrap(err, "upload index to the repository") + } + } else { + act.printer.Printf("[DEBUG] Dry run, not pushing index to the repository.\n") } act.printer.Printf("Repository %s was successfully reindexed.\n", act.repoName) + log.Infof("Reindex done in %s", time.Since(start)) return nil } diff --git a/internal/awss3/storage.go b/internal/awss3/storage.go index 93bce29a..f16ef51b 100644 --- a/internal/awss3/storage.go +++ b/internal/awss3/storage.go @@ -8,13 +8,17 @@ import ( "net/url" "os" "strings" + "sync" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/ghodss/yaml" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" "github.com/hypnoglow/helm-s3/internal/helmutil" ) @@ -26,6 +30,7 @@ const ( // s3MetadataSoftLimitBytes is application-specific soft limit // for the number of bytes in S3 object metadata. s3MetadataSoftLimitBytes = 1900 + localBase = "/tmp/" ) var ( @@ -56,132 +61,176 @@ type Storage struct { } // Traverse traverses all charts in the repository. -func (s *Storage) Traverse(ctx context.Context, repoURI string) (<-chan ChartInfo, <-chan error) { - charts := make(chan ChartInfo, 1) - errs := make(chan error, 1) +func (s *Storage) Traverse(ctx context.Context, repoURI string) ([]ChartInfo, <-chan error) { + charts := make(chan ChartInfo) + errs := make(chan error) + var result []ChartInfo go s.traverse(ctx, repoURI, charts, errs) - return charts, errs + // Collect the results and handle errors + log.Info("collecting results") + for { + select { + case chart, ok := <-charts: + if !ok { + charts = nil + } else { + result = append(result, chart) + } + } + + // Exit the loop when both channels are closed + if charts == nil { + break + } + } + + log.Info("collected results") + + return result, errs } // traverse traverses all charts in the repository. // It writes an info item about every chart to items, and errors to errs. // It always closes both channels when returns. func (s *Storage) traverse(ctx context.Context, repoURI string, items chan<- ChartInfo, errs chan<- error) { + log.Info("traversing s3 bucket") + start := time.Now() defer close(items) defer close(errs) bucket, prefixKey, err := parseURI(repoURI) if err != nil { - errs <- err + log.Errorf("parse uri: %s", err) return } client := s3.New(s.session) var continuationToken *string + var wg sync.WaitGroup + for { + log.Info("listing objects") listOut, err := client.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{ Bucket: aws.String(bucket), Prefix: aws.String(prefixKey), ContinuationToken: continuationToken, }) if err != nil { - errs <- errors.Wrap(err, "list s3 bucket objects") + log.Errorf("list s3 objects: %s", err) return } - for _, obj := range listOut.Contents { - // We need to make object key relative to repo root. - key := strings.TrimPrefix(*obj.Key, prefixKey) - // Additionally trim prefix slash if exists, because repos can be: - // s3://bucket/repo/subdir OR s3://bucket/repo/subdir/ - key = strings.TrimPrefix(key, "/") - - if strings.Contains(key, "/") { - // This is a subfolder. Ignore it, because chart repository - // is flat and cannot contain nested directories. - continue - } + log.Info("listOut.Contents: ", len(listOut.Contents)) - if !strings.HasSuffix(key, ".tgz") { - // Ignore any file that isn't a chart - // This could include index.yaml - // or any other kind of file that might be in the repo - continue - } + wg.Add(1) - metaOut, err := client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ - Bucket: aws.String(bucket), - Key: obj.Key, - }) - if err != nil { - errs <- fmt.Errorf("head s3 object %q: %s", key, err) - return + go func() { + defer wg.Done() + // Process objects in parallel + for _, obj := range listOut.Contents { + processS3Object(ctx, client, bucket, obj, items, prefixKey) + log.Info("processing object: ", *obj.Key) } - - reindexItem := ChartInfo{Filename: key} - - serializedChartMeta, hasMeta := metaOut.Metadata[strings.Title(metaChartMetadata)] - chartDigest, hasDigest := metaOut.Metadata[strings.Title(metaChartDigest)] - if !hasMeta || !hasDigest { - // Some charts in the repository can have no metadata. - // - // This might happen in few cases: - // - Chart was uploaded manually, not using 'helm s3 push'; - // - Chart was pushed before we started adding metadata to objects; - // - Chart metadata was too big to add to the S3 object metadata (see issues - // https://github.com/hypnoglow/helm-s3/issues/120 and - // https://github.com/hypnoglow/helm-s3/issues/112 ) - // - // In this case we have to download the ch file itself. - objectOut, err := client.GetObjectWithContext(ctx, &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: obj.Key, - }) - if err != nil { - errs <- fmt.Errorf("get s3 object %q: %s", key, err) - return - } - - buf := &bytes.Buffer{} - tr := io.TeeReader(objectOut.Body, buf) - - ch, err := helmutil.LoadArchive(tr) - objectOut.Body.Close() - if err != nil { - errs <- fmt.Errorf("load archive from s3 object %q: %s", key, err) - return - } - - digest, err := helmutil.Digest(buf) - if err != nil { - errs <- fmt.Errorf("get chart hash for %q: %s", key, err) - return - } - - reindexItem.Meta = ch.Metadata() - reindexItem.Hash = digest - } else { - meta := helmutil.NewChartMetadata() - if err := meta.UnmarshalJSON([]byte(*serializedChartMeta)); err != nil { - errs <- fmt.Errorf("unserialize chart meta for %q: %s", key, err) - return - } - - reindexItem.Meta = meta - reindexItem.Hash = *chartDigest - } - - // process meta and hash - items <- reindexItem - } + }() // Decide if need to load more objects. if listOut.NextContinuationToken == nil { + log.Info("all objects processed") break } continuationToken = listOut.NextContinuationToken } + wg.Wait() + + log.Info("traverse took: ", time.Since(start)) +} + +func processS3Object(ctx context.Context, client *s3.S3, bucket string, obj *s3.Object, items chan<- ChartInfo, prefixKey string) { + log.Info("processing object: ", *obj.Key) + // We need to make object key relative to repo root. + key := strings.TrimPrefix(*obj.Key, prefixKey) + // Additionally trim prefix slash if exists, because repos can be: + // s3://bucket/repo/subdir OR s3://bucket/repo/subdir/ + key = strings.TrimPrefix(key, "/") + + if strings.Contains(key, "/") { + // This is a subfolder. Ignore it, because chart repository + // is flat and cannot contain nested directories. + return + } + + if !strings.HasSuffix(key, ".tgz") { + // Ignore any file that isn't a chart + // This could include index.yaml + // or any other kind of file that might be in the repo + return + } + + metaOut, err := client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: obj.Key, + }) + if err != nil { + log.Errorf("head s3 object %q: %s", key, err) + return + } + + reindexItem := ChartInfo{Filename: key} + + serializedChartMeta, hasMeta := metaOut.Metadata[strings.Title(metaChartMetadata)] + chartDigest, hasDigest := metaOut.Metadata[strings.Title(metaChartDigest)] + if !hasMeta || !hasDigest { + // Some charts in the repository can have no metadata. + // + // This might happen in few cases: + // - Chart was uploaded manually, not using 'helm s3 push'; + // - Chart was pushed before we started adding metadata to objects; + // - Chart metadata was too big to add to the S3 object metadata (see issues + // https://github.com/hypnoglow/helm-s3/issues/120 and + // https://github.com/hypnoglow/helm-s3/issues/112 ) + // + // In this case we have to download the ch file itself. + objectOut, err := client.GetObjectWithContext(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: obj.Key, + }) + if err != nil { + log.Errorf("get s3 object %q: %s", key, err) + return + } + + buf := &bytes.Buffer{} + tr := io.TeeReader(objectOut.Body, buf) + + ch, err := helmutil.LoadArchive(tr) + objectOut.Body.Close() + if err != nil { + log.Errorf("load archive from s3 object %q: %s", key, err) + return + } + + digest, err := helmutil.Digest(buf) + if err != nil { + log.Errorf("get chart hash for %q: %s", key, err) + return + } + + reindexItem.Meta = ch.Metadata() + reindexItem.Hash = digest + } else { + meta := helmutil.NewChartMetadata() + if err := meta.UnmarshalJSON([]byte(*serializedChartMeta)); err != nil { + log.Errorf("unserialize chart meta for %q: %s", key, err) + return + } + + reindexItem.Meta = meta + reindexItem.Hash = *chartDigest + } + + // process meta and hash + items <- reindexItem } // ChartInfo contains info about particular chart. @@ -246,7 +295,7 @@ func (s *Storage) Exists(ctx context.Context, uri string) (bool, error) { // PutChart puts the chart file to the storage. // uri must be in the form of s3 protocol: s3://bucket-name/key[...]. -func (s *Storage) PutChart(ctx context.Context, uri string, r io.Reader, chartMeta, acl string, chartDigest string, contentType string) (string, error) { +func (s *Storage) PutChart(ctx context.Context, uri string, r io.Reader, chartMeta, acl string, chartDigest string, contentType string, tags string) (string, error) { bucket, key, err := parseURI(uri) if err != nil { return "", err @@ -261,6 +310,7 @@ func (s *Storage) PutChart(ctx context.Context, uri string, r io.Reader, chartMe ServerSideEncryption: getSSE(), Body: r, Metadata: assembleObjectMetadata(chartMeta, chartDigest), + Tagging: &tags, }, ) if err != nil { @@ -298,6 +348,31 @@ func (s *Storage) PutIndex(ctx context.Context, uri string, acl string, r io.Rea return nil } +func (s *Storage) GetIndex(ctx context.Context, uri string, acl string, r io.Reader) error { + bucket, key, err := parseURI(uri) + if err != nil { + return err + } + + index, err := s3.New(s.session).GetObjectWithContext( + ctx, + &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }, + ) + if err != nil { + return errors.Wrap(err, "delete object from s3") + } + + indexBody, err := io.ReadAll(index.Body) + if err != nil { + return errors.Wrap(err, "read index body") + } + + return yaml.Unmarshal(indexBody, r) +} + // IndexExists returns true if index file exists in the storage for repository // with the provided uri. // uri must be in the form of s3 protocol: s3://bucket-name/key[...].