Skip to content

Commit

Permalink
use iterators for digesting and staging files (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
srerickson authored Nov 12, 2024
1 parent 0b68cd1 commit 2a6ddf4
Show file tree
Hide file tree
Showing 22 changed files with 1,110 additions and 619 deletions.
14 changes: 12 additions & 2 deletions backend/s3/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,19 @@ func (f *BucketFS) RemoveAll(ctx context.Context, name string) error {
return removeAll(ctx, f.S3, f.Bucket, name)
}

func (f *BucketFS) Files(ctx context.Context, dir string) ocfl.FileSeq {
func (f *BucketFS) WalkFiles(ctx context.Context, dir string) (ocfl.FileSeq, func() error) {
f.debugLog(ctx, "s3:list_files", "bucket", f.Bucket, "prefix", dir)
return filesIter(ctx, f.S3, f.Bucket, dir)
files, errFn := walkFiles(ctx, f.S3, f.Bucket, dir)
// FIXME: the values yielded by walkfiles don't include the FS, we need to
// add it here.
return func(yield func(*ocfl.FileRef) bool) {
for file := range files {
file.FS = f
if !yield(file) {
break
}
}
}, errFn
}

type S3API interface {
Expand Down
34 changes: 15 additions & 19 deletions backend/s3/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ const (
)

var (
_ ocfl.FS = (*s3.BucketFS)(nil)
_ ocfl.CopyFS = (*s3.BucketFS)(nil)
_ ocfl.WriteFS = (*s3.BucketFS)(nil)
_ ocfl.FilesFS = (*s3.BucketFS)(nil)
_ ocfl.FS = (*s3.BucketFS)(nil)
_ ocfl.CopyFS = (*s3.BucketFS)(nil)
_ ocfl.WriteFS = (*s3.BucketFS)(nil)
_ ocfl.FileWalker = (*s3.BucketFS)(nil)
)

func TestOpenFile(t *testing.T) {
Expand Down Expand Up @@ -428,14 +428,14 @@ func TestCopy(t *testing.T) {
}
}

func TestFiles(t *testing.T) {
func TestWalkFiles(t *testing.T) {
ctx := context.Background()
type testCase struct {
desc string
mock func(t *testing.T) *mock.S3API
bucket string
dir string
expect func(*testing.T, *mock.S3API, []ocfl.FileInfo, error)
expect func(*testing.T, *mock.S3API, []*ocfl.FileRef, error)
}
cases := []testCase{
{
Expand All @@ -451,11 +451,12 @@ func TestFiles(t *testing.T) {
)
},
bucket: bucket,
expect: func(t *testing.T, state *mock.S3API, files []ocfl.FileInfo, err error) {
expect: func(t *testing.T, state *mock.S3API, files []*ocfl.FileRef, err error) {
be.NilErr(t, err)
be.Equal(t, 5, len(files))
for _, f := range files {
be.True(t, strings.HasPrefix(f.Path, "obj/"))
be.Nonzero(t, f.Info)
be.True(t, strings.HasPrefix(f.FullPath(), "obj/"))
}
},
},
Expand All @@ -466,7 +467,7 @@ func TestFiles(t *testing.T) {
return mock.New(bucket)
},
bucket: bucket,
expect: func(t *testing.T, state *mock.S3API, files []ocfl.FileInfo, err error) {
expect: func(t *testing.T, state *mock.S3API, files []*ocfl.FileRef, err error) {
isInvalidPathError(t, err)
},
},
Expand All @@ -478,17 +479,12 @@ func TestFiles(t *testing.T) {
api = tcase.mock(t)
}
fsys := s3.BucketFS{Bucket: tcase.bucket, S3: api}
files := []ocfl.FileInfo{}
var iterErr error
fsys.Files(ctx, tcase.dir)(func(info ocfl.FileInfo, err error) bool {
if err != nil {
iterErr = err
return false
}
files := []*ocfl.FileRef{}
fileInfos, errFn := fsys.WalkFiles(ctx, tcase.dir)
for info := range fileInfos {
files = append(files, info)
return true
})
tcase.expect(t, api, files, iterErr)
}
tcase.expect(t, api, files, errFn())
})
}
}
Expand Down
34 changes: 23 additions & 11 deletions backend/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,13 @@ func removeAll(ctx context.Context, api RemoveAllAPI, buck string, name string)
return nil
}

// filesIter returns an iterator that yields PathInfo for files in the dir
func filesIter(ctx context.Context, api FilesAPI, buck string, dir string) func(func(ocfl.FileInfo, error) bool) {
return func(yield func(ocfl.FileInfo, error) bool) {
// walkFiles returns an iterator that yields PathInfo for files in the dir
func walkFiles(ctx context.Context, api FilesAPI, buck string, dir string) (ocfl.FileSeq, func() error) {
var err error
seq := func(yield func(*ocfl.FileRef) bool) {
const op = "list_files"
if !fs.ValidPath(dir) {
yield(ocfl.FileInfo{}, pathErr(op, dir, fs.ErrInvalid))
err = pathErr(op, dir, fs.ErrInvalid)
return
}
params := &s3v2.ListObjectsV2Input{
Expand All @@ -322,18 +323,28 @@ func filesIter(ctx context.Context, api FilesAPI, buck string, dir string) func(
params.Prefix = aws.String(dir + "/")
}
for {
listPage, err := api.ListObjectsV2(ctx, params)
var listPage *s3v2.ListObjectsV2Output
listPage, err = api.ListObjectsV2(ctx, params)
if err != nil {
yield(ocfl.FileInfo{}, pathErr(op, dir, err))
err = pathErr(op, dir, err)
return
}
for _, s3obj := range listPage.Contents {
info := ocfl.FileInfo{
Path: *s3obj.Key,
Size: *s3obj.Size,
Type: fs.ModeIrregular,
refPath := *s3obj.Key
if dir != "." {
refPath = strings.TrimPrefix(refPath, dir+"/")
}
if !yield(info, nil) {
info := &ocfl.FileRef{
BaseDir: dir,
Path: refPath,
Info: &iofsInfo{
name: path.Base(*s3obj.Key),
size: *s3obj.Size,
mode: fs.ModeIrregular,
modTime: *s3obj.LastModified,
},
}
if !yield(info) {
return
}
}
Expand All @@ -343,6 +354,7 @@ func filesIter(ctx context.Context, api FilesAPI, buck string, dir string) func(
}
}
}
return seq, func() error { return err }
}

// s3File implements fs.File
Expand Down
71 changes: 0 additions & 71 deletions digest.go

This file was deleted.

6 changes: 6 additions & 0 deletions digest/digester.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ func (s Set) ConflictsWith(other Set) []string {
return keys
}

func (s Set) Delete(id string) string {
val := s[id]
delete(s, id)
return val
}

// Validate digests the reader using all algorithms used in s found in reg.
// An error is returned in the resulting digests values conflict with those
// in s.
Expand Down
65 changes: 0 additions & 65 deletions digest_test.go

This file was deleted.

Loading

0 comments on commit 2a6ddf4

Please sign in to comment.