Skip to content

Commit

Permalink
Add massupload command
Browse files Browse the repository at this point in the history
  • Loading branch information
thePanz authored and mweibel committed Apr 2, 2019
1 parent 05194c3 commit e8e1816
Show file tree
Hide file tree
Showing 14 changed files with 578 additions and 335 deletions.
80 changes: 80 additions & 0 deletions cmd/rokka/cli/batch/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package batch

import (
"sync"

"github.com/rokka-io/rokka-go/rokka"
"gopkg.in/cheggaaa/pb.v1"
)

// Options are used for the CLI flags for all batch commands
type Options struct {
DryRun bool
Concurrency int
NoProgress bool
Force bool
}

// OperationResult contains the result of the operation
type OperationResult struct {
NotOK int
OK int
Error error
}

// Reader allows to read from an arbitrary location and inserts the image identifications to the channel for concurrent processing.
// The values within the images channel are string, and therefore can be pretty much anything. The writer used after reading
// needs to know and understand what an image string actually represents. E.g. in the case of copy-all it's an existing
// image hash. In the case of massUpload it's a filesystem path to an image.
type Reader interface {
Read(client *rokka.Client, images chan string, bar *pb.ProgressBar) error
}

// Writer operates on the previously found image list and executes a write operation. This can be pretty much anything.
// For example it could create source images on rokka. Or delete a source image on rokka.
type Writer interface {
Write(client *rokka.Client, images []string) OperationResult
}

// ProgressCounter returns the total images to be processed. It is used for the progress bar and confirmation messages of the batch
// CLIs. It's an optional interface to implement because not all operations allow to know beforehand how many images there are.
type ProgressCounter interface {
Count(client *rokka.Client) (int, error)
}

// WriteImages creates a group of goroutines bound by the concurrency option. It executes the Writer.Write command for each flushInterval
// amount of images.
func WriteImages(client *rokka.Client, images chan string, results chan OperationResult, w Writer, concurrency int, flushInterval int) {
waitGroup := sync.WaitGroup{}
waitGroup.Add(concurrency)
defer close(results)

for i := 0; i < concurrency; i++ {
go func() {
defer waitGroup.Done()

fileNames := make([]string, 0)

for fileName := range images {
fileNames = append(fileNames, fileName)

if len(fileNames) >= flushInterval {
results <- w.Write(client, fileNames)
fileNames = make([]string, 0)
}
}

// flush remaining items
if len(fileNames) > 0 {
results <- w.Write(client, fileNames)
}
}()
}
waitGroup.Wait()
}

// ReadImages is a simple wrapper around the Reader.Read call. In the future there may be things we can move here.
func ReadImages(client *rokka.Client, images chan string, r Reader, bar *pb.ProgressBar) error {
defer close(images)
return r.Read(client, images, bar)
}
8 changes: 8 additions & 0 deletions cmd/rokka/cli/batch/file_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// +build !windows

package batch

// Fixpath returns an absolute path on the current OS which is mostly relevant for windows.
func Fixpath(name string) string {
return name
}
29 changes: 29 additions & 0 deletions cmd/rokka/cli/batch/file_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package batch

import (
"path/filepath"
"strings"
)

// Fixpath returns an absolute path on windows to be able to open long
// file names. See https://github.com/restic/restic/blob/master/internal/fs/file_windows.go
func Fixpath(name string) string {
abspath, err := filepath.Abs(name)
if err == nil {
// Check if \\?\UNC\ already exist
if strings.HasPrefix(abspath, `\\?\UNC\`) {
return abspath
}
// Check if \\?\ already exist
if strings.HasPrefix(abspath, `\\?\`) {
return abspath
}
// Check if path starts with \\
if strings.HasPrefix(abspath, `\\`) {
return strings.Replace(abspath, `\\`, `\\?\UNC\`, 1)
}
// Normal path
return `\\?\` + abspath
}
return name
}
102 changes: 102 additions & 0 deletions cmd/rokka/cli/batch/massupload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package batch

import (
"os"
"path/filepath"
"sort"
"strings"

"github.com/rokka-io/rokka-go/rokka"
"gopkg.in/cheggaaa/pb.v1"
)

// MassUploadOptions are specific CLI flags for the mass upload CLI cmd.
type MassUploadOptions struct {
Recursive bool
Extensions []string
}

// MassUploader is both a Reader and Writer which reads from the fileSystem and creates source images in the writer.
type MassUploader struct {
BasePath string
Recursive bool
Extensions []string
Organization string
UserMetadata map[string]interface{}
}

// Read walks the directory specified in the CLI and adds the found images (filtered by extensions) to the image channel.
func (mu *MassUploader) Read(client *rokka.Client, images chan string, bar *pb.ProgressBar) error {
// Keep extensions sorted to use a binary search for matching
extensions := mu.Extensions
sort.Strings(extensions)

return filepath.Walk(Fixpath(mu.BasePath), func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

if info == nil {
return nil
}

// Skip subfolders if enabled, but still scan the root directory
if info.IsDir() && mu.BasePath != path && !mu.Recursive {
return filepath.SkipDir
}

if !info.IsDir() {
// Exclude files without extension
ext := strings.TrimPrefix(filepath.Ext(path), ".")
if ext == "" {
return nil
}

// Exclude empty files
if info.Size() == 0 {
return nil
}

i := sort.SearchStrings(extensions, ext)
if i >= len(extensions) || extensions[i] != ext {
// If the current extension is not found among the allowed ones, just skip the file
return nil
}

bar.Total++

// Add the image to the list of images to be uploaded
images <- path
}

return nil
})
}

// Write creates source images for each image.
func (mu *MassUploader) Write(client *rokka.Client, images []string) OperationResult {
var lastErr error

OK := 0
notOK := 0
for _, path := range images {
if err := mu.uploadFile(client, path); err != nil {
lastErr = err
notOK++
} else {
OK++
}
}
return OperationResult{OK: OK, NotOK: notOK, Error: lastErr}
}

func (mu *MassUploader) uploadFile(client *rokka.Client, path string) error {
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()

_, err = client.CreateSourceImageWithMetadata(mu.Organization, filepath.Base(path), file, mu.UserMetadata, nil)
return err
}
85 changes: 85 additions & 0 deletions cmd/rokka/cli/batch/source_images.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package batch

import (
"github.com/rokka-io/rokka-go/rokka"
"gopkg.in/cheggaaa/pb.v1"
)

// SourceImagesReader reads images from rokka on an organization.
type SourceImagesReader struct {
Organization string
}

// Read uses the search API to paginate through all images.
func (sir *SourceImagesReader) Read(client *rokka.Client, images chan string, bar *pb.ProgressBar) error {
cursor := ""
for {
opt := rokka.ListSourceImagesOptions{
Offset: cursor,
}
res, err := client.ListSourceImages(sir.Organization, opt)
if err != nil {
return err
}
bar.Total = int64(res.Total)

for _, element := range res.Items {
images <- element.Hash
}
if res.Cursor == "" || cursor == res.Cursor || len(res.Items) == 0 {
return nil
}
cursor = res.Cursor
}
}

// Count fetches one image in order to get the Total amount of images available.
func (sir *SourceImagesReader) Count(client *rokka.Client) (int, error) {
listSourceImagesOptions := rokka.ListSourceImagesOptions{
Limit: 1,
}
res, err := client.ListSourceImages(sir.Organization, listSourceImagesOptions)
if err != nil {
return 0, err
}
return res.Total, nil
}

// CopyAllSourceImagesWriter uses the copy all API to transfer images from one organization to a destination organization.
type CopyAllSourceImagesWriter struct {
SourceOrganization string
DestinationOrganization string
}

func (cas *CopyAllSourceImagesWriter) Write(client *rokka.Client, images []string) OperationResult {
OK, notOK, err := client.CopySourceImages(cas.SourceOrganization, images, cas.DestinationOrganization)
return OperationResult{OK: OK, NotOK: notOK, Error: err}
}

// DeleteAllSourceImagesWriter deletes source images of an organization.
type DeleteAllSourceImagesWriter struct {
Organization string
}

func (das *DeleteAllSourceImagesWriter) Write(client *rokka.Client, images []string) OperationResult {
var lastErr error

OK := 0
notOK := 0
for _, hash := range images {
lastErr = client.DeleteSourceImage(das.Organization, hash)
if lastErr != nil {
notOK++
} else {
OK++
}
}
return OperationResult{OK: OK, NotOK: notOK, Error: lastErr}
}

// NoopWriter does not do anything. It is used for the dry run.
type NoopWriter struct{}

func (nw *NoopWriter) Write(client *rokka.Client, images []string) OperationResult {
return OperationResult{OK: len(images)}
}
Loading

0 comments on commit e8e1816

Please sign in to comment.