Skip to content

Commit

Permalink
Merge pull request versity#926 from versity/listbuckets-pagination
Browse files Browse the repository at this point in the history
Listbuckets pagination
  • Loading branch information
benmcclelland authored Oct 28, 2024
2 parents c2f6e48 + 24fea30 commit a53667c
Show file tree
Hide file tree
Showing 14 changed files with 334 additions and 119 deletions.
82 changes: 45 additions & 37 deletions backend/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ func (az *Azure) String() string {

func (az *Azure) CreateBucket(ctx context.Context, input *s3.CreateBucketInput, acl []byte) error {
meta := map[string]*string{
string(keyAclCapital): backend.GetStringPtr(encodeBytes(acl)),
string(keyOwnership): backend.GetStringPtr(encodeBytes([]byte(input.ObjectOwnership))),
string(keyAclCapital): backend.GetPtrFromString(encodeBytes(acl)),
string(keyOwnership): backend.GetPtrFromString(encodeBytes([]byte(input.ObjectOwnership))),
}

acct, ok := ctx.Value("account").(auth.Account)
Expand All @@ -170,7 +170,7 @@ func (az *Azure) CreateBucket(ctx context.Context, input *s3.CreateBucketInput,
return fmt.Errorf("parse default bucket lock state: %w", err)
}

meta[string(keyBucketLock)] = backend.GetStringPtr(encodeBytes(defaultLockParsed))
meta[string(keyBucketLock)] = backend.GetPtrFromString(encodeBytes(defaultLockParsed))
}

_, err := az.client.CreateContainer(ctx, *input.Bucket, &container.CreateOptions{Metadata: meta})
Expand All @@ -195,48 +195,56 @@ func (az *Azure) CreateBucket(ctx context.Context, input *s3.CreateBucketInput,
return azureErrToS3Err(err)
}

func (az *Azure) ListBuckets(ctx context.Context, owner string, isAdmin bool) (s3response.ListAllMyBucketsResult, error) {
func (az *Azure) ListBuckets(ctx context.Context, input s3response.ListBucketsInput) (s3response.ListAllMyBucketsResult, error) {
fmt.Printf("%+v\n", input)
pager := az.client.NewListContainersPager(
&service.ListContainersOptions{
Include: service.ListContainersInclude{
Metadata: true,
},
Marker: &input.ContinuationToken,
MaxResults: &input.MaxBuckets,
Prefix: &input.Prefix,
})

var buckets []s3response.ListAllMyBucketsEntry
var result s3response.ListAllMyBucketsResult
result := s3response.ListAllMyBucketsResult{
Prefix: input.Prefix,
}

for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return result, azureErrToS3Err(err)
}
for _, v := range resp.ContainerItems {
if isAdmin {
resp, err := pager.NextPage(ctx)
if err != nil {
return result, azureErrToS3Err(err)
}
for _, v := range resp.ContainerItems {
if input.IsAdmin {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: *v.Name,
// TODO: using modification date here instead of creation, is that ok?
CreationDate: *v.Properties.LastModified,
})
} else {
acl, err := getAclFromMetadata(v.Metadata, keyAclLower)
if err != nil {
return result, err
}

if acl.Owner == input.Owner {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: *v.Name,
// TODO: using modification date here instead of creation, is that ok?
CreationDate: *v.Properties.LastModified,
})
} else {
acl, err := getAclFromMetadata(v.Metadata, keyAclLower)
if err != nil {
return result, err
}

if acl.Owner == owner {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: *v.Name,
// TODO: using modification date here instead of creation, is that ok?
CreationDate: *v.Properties.LastModified,
})
}
}
}
}

if resp.NextMarker != nil {
result.ContinuationToken = *resp.NextMarker
}

result.Buckets.Bucket = buckets
result.Owner.ID = owner
result.Owner.ID = input.Owner

return result, nil
}
Expand Down Expand Up @@ -303,13 +311,13 @@ func (az *Azure) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respon
opts.HTTPHeaders.BlobContentDisposition = po.ContentDisposition
if strings.HasSuffix(*po.Key, "/") {
// Hardcode "application/x-directory" for direcoty objects
opts.HTTPHeaders.BlobContentType = backend.GetStringPtr(backend.DirContentType)
opts.HTTPHeaders.BlobContentType = backend.GetPtrFromString(backend.DirContentType)
} else {
opts.HTTPHeaders.BlobContentType = po.ContentType
}

if opts.HTTPHeaders.BlobContentType == nil {
opts.HTTPHeaders.BlobContentType = backend.GetStringPtr(backend.DefaultContentType)
opts.HTTPHeaders.BlobContentType = backend.GetPtrFromString(backend.DefaultContentType)
}

uploadResp, err := az.client.UploadStream(ctx, *po.Bucket, *po.Key, po.Body, opts)
Expand Down Expand Up @@ -408,7 +416,7 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G

contentType := blobDownloadResponse.ContentType
if contentType == nil {
contentType = backend.GetStringPtr(backend.DefaultContentType)
contentType = backend.GetPtrFromString(backend.DefaultContentType)
}

return &s3.GetObjectOutput{
Expand Down Expand Up @@ -712,8 +720,8 @@ func (az *Azure) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput
} else {
errs = append(errs, types.Error{
Key: obj.Key,
Code: backend.GetStringPtr("InternalError"),
Message: backend.GetStringPtr(err.Error()),
Code: backend.GetPtrFromString("InternalError"),
Message: backend.GetPtrFromString(err.Error()),
})
}
}
Expand Down Expand Up @@ -849,7 +857,7 @@ func (az *Azure) CreateMultipartUpload(ctx context.Context, input *s3.CreateMult

// set blob legal hold status in metadata
if input.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn {
meta[string(keyObjLegalHold)] = backend.GetStringPtr("1")
meta[string(keyObjLegalHold)] = backend.GetPtrFromString("1")
}

// set blob retention date
Expand All @@ -862,7 +870,7 @@ func (az *Azure) CreateMultipartUpload(ctx context.Context, input *s3.CreateMult
if err != nil {
return s3response.InitiateMultipartUploadResult{}, azureErrToS3Err(err)
}
meta[string(keyObjRetention)] = backend.GetStringPtr(string(retParsed))
meta[string(keyObjRetention)] = backend.GetPtrFromString(string(retParsed))
}

uploadId := uuid.New().String()
Expand Down Expand Up @@ -1319,12 +1327,12 @@ func (az *Azure) PutObjectRetention(ctx context.Context, bucket, object, version
meta := blobProps.Metadata
if meta == nil {
meta = map[string]*string{
string(keyObjRetention): backend.GetStringPtr(string(retention)),
string(keyObjRetention): backend.GetPtrFromString(string(retention)),
}
} else {
objLockCfg, ok := meta[string(keyObjRetention)]
if !ok {
meta[string(keyObjRetention)] = backend.GetStringPtr(string(retention))
meta[string(keyObjRetention)] = backend.GetPtrFromString(string(retention))
} else {
var lockCfg types.ObjectLockRetention
if err := json.Unmarshal([]byte(*objLockCfg), &lockCfg); err != nil {
Expand All @@ -1342,7 +1350,7 @@ func (az *Azure) PutObjectRetention(ctx context.Context, bucket, object, version
}
}

meta[string(keyObjRetention)] = backend.GetStringPtr(string(retention))
meta[string(keyObjRetention)] = backend.GetPtrFromString(string(retention))
}
}

Expand Down Expand Up @@ -1690,7 +1698,7 @@ func (az *Azure) setContainerMetaData(ctx context.Context, bucket, key string, v
}

str := encodeBytes(value)
mdmap[key] = backend.GetStringPtr(str)
mdmap[key] = backend.GetPtrFromString(str)

_, err = client.SetMetadata(ctx, &container.SetMetadataOptions{Metadata: mdmap})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Backend interface {
Shutdown()

// bucket operations
ListBuckets(_ context.Context, owner string, isAdmin bool) (s3response.ListAllMyBucketsResult, error)
ListBuckets(context.Context, s3response.ListBucketsInput) (s3response.ListAllMyBucketsResult, error)
HeadBucket(context.Context, *s3.HeadBucketInput) (*s3.HeadBucketOutput, error)
GetBucketAcl(context.Context, *s3.GetBucketAclInput) ([]byte, error)
CreateBucket(_ context.Context, _ *s3.CreateBucketInput, defaultACL []byte) error
Expand Down Expand Up @@ -108,7 +108,7 @@ func (BackendUnsupported) Shutdown() {}
func (BackendUnsupported) String() string {
return "Unsupported"
}
func (BackendUnsupported) ListBuckets(context.Context, string, bool) (s3response.ListAllMyBucketsResult, error) {
func (BackendUnsupported) ListBuckets(context.Context, s3response.ListBucketsInput) (s3response.ListAllMyBucketsResult, error) {
return s3response.ListAllMyBucketsResult{}, s3err.GetAPIError(s3err.ErrNotImplemented)
}
func (BackendUnsupported) HeadBucket(context.Context, *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) {
Expand Down
11 changes: 7 additions & 4 deletions backend/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,20 @@ func (d ByObjectName) Len() int { return len(d) }
func (d ByObjectName) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d ByObjectName) Less(i, j int) bool { return *d[i].Key < *d[j].Key }

func GetStringPtr(s string) *string {
return &s
}

func GetPtrFromString(str string) *string {
if str == "" {
return nil
}
return &str
}

func GetStringFromPtr(str *string) string {
if str == nil {
return ""
}
return *str
}

func GetTimePtr(t time.Time) *time.Time {
return &t
}
Expand Down
33 changes: 24 additions & 9 deletions backend/posix/posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ func (p *Posix) doesBucketAndObjectExist(bucket, object string) error {
return nil
}

func (p *Posix) ListBuckets(_ context.Context, owner string, isAdmin bool) (s3response.ListAllMyBucketsResult, error) {
func (p *Posix) ListBuckets(_ context.Context, input s3response.ListBucketsInput) (s3response.ListAllMyBucketsResult, error) {
entries, err := os.ReadDir(".")
if err != nil {
return s3response.ListAllMyBucketsResult{},
fmt.Errorf("readdir buckets: %w", err)
}

var cToken string

var buckets []s3response.ListAllMyBucketsEntry
for _, entry := range entries {
fi, err := entry.Info()
Expand All @@ -236,8 +238,21 @@ func (p *Posix) ListBuckets(_ context.Context, owner string, isAdmin bool) (s3re
continue
}

if !strings.HasPrefix(fi.Name(), input.Prefix) {
continue
}

if len(buckets) == int(input.MaxBuckets) {
cToken = buckets[len(buckets)-1].Name
break
}

if fi.Name() <= input.ContinuationToken {
continue
}

// return all the buckets for admin users
if isAdmin {
if input.IsAdmin {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: entry.Name(),
CreationDate: fi.ModTime(),
Expand All @@ -260,23 +275,23 @@ func (p *Posix) ListBuckets(_ context.Context, owner string, isAdmin bool) (s3re
return s3response.ListAllMyBucketsResult{}, fmt.Errorf("parse acl tag: %w", err)
}

if acl.Owner == owner {
if acl.Owner == input.Owner {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: entry.Name(),
CreationDate: fi.ModTime(),
})
}
}

sort.Sort(backend.ByBucketName(buckets))

return s3response.ListAllMyBucketsResult{
Buckets: s3response.ListAllMyBucketsList{
Bucket: buckets,
},
Owner: s3response.CanonicalUser{
ID: owner,
ID: input.Owner,
},
Prefix: input.Prefix,
ContinuationToken: cToken,
}, nil
}

Expand Down Expand Up @@ -926,7 +941,7 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
// Check to see if the null versionId object is delete marker or not
if isDel {
nullObjDelMarker = &types.DeleteMarkerEntry{
VersionId: backend.GetStringPtr("null"),
VersionId: backend.GetPtrFromString("null"),
LastModified: backend.GetTimePtr(nf.ModTime()),
Key: &path,
IsLatest: getBoolPtr(false),
Expand All @@ -948,7 +963,7 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
Key: &path,
LastModified: backend.GetTimePtr(nf.ModTime()),
Size: &size,
VersionId: backend.GetStringPtr("null"),
VersionId: backend.GetPtrFromString("null"),
IsLatest: getBoolPtr(false),
StorageClass: types.ObjectVersionStorageClassStandard,
}
Expand Down Expand Up @@ -3250,7 +3265,7 @@ func (p *Posix) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
version = backend.GetStringPtr(string(vId))
version = backend.GetPtrFromString(string(vId))
} else {
contentLength := fi.Size()
res, err := p.PutObject(ctx,
Expand Down
24 changes: 15 additions & 9 deletions backend/s3proxy/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@ func New(access, secret, endpoint, region string, disableChecksum, sslSkipVerify
return s, nil
}

func (s *S3Proxy) ListBuckets(ctx context.Context, owner string, isAdmin bool) (s3response.ListAllMyBucketsResult, error) {
output, err := s.client.ListBuckets(ctx, &s3.ListBucketsInput{})
func (s *S3Proxy) ListBuckets(ctx context.Context, input s3response.ListBucketsInput) (s3response.ListAllMyBucketsResult, error) {
output, err := s.client.ListBuckets(ctx, &s3.ListBucketsInput{
ContinuationToken: &input.ContinuationToken,
MaxBuckets: &input.MaxBuckets,
Prefix: &input.Prefix,
})
if err != nil {
return s3response.ListAllMyBucketsResult{}, handleError(err)
}
Expand All @@ -96,6 +100,8 @@ func (s *S3Proxy) ListBuckets(ctx context.Context, owner string, isAdmin bool) (
Buckets: s3response.ListAllMyBucketsList{
Bucket: buckets,
},
ContinuationToken: backend.GetStringFromPtr(output.ContinuationToken),
Prefix: backend.GetStringFromPtr(output.Prefix),
}, nil
}

Expand All @@ -112,8 +118,8 @@ func (s *S3Proxy) CreateBucket(ctx context.Context, input *s3.CreateBucketInput,

var tagSet []types.Tag
tagSet = append(tagSet, types.Tag{
Key: backend.GetStringPtr(aclKey),
Value: backend.GetStringPtr(base64Encode(acl)),
Key: backend.GetPtrFromString(aclKey),
Value: backend.GetPtrFromString(base64Encode(acl)),
})

_, err = s.client.PutBucketTagging(ctx, &s3.PutBucketTaggingInput{
Expand Down Expand Up @@ -525,17 +531,17 @@ func (s *S3Proxy) PutBucketAcl(ctx context.Context, bucket string, data []byte)
for i, tag := range tagout.TagSet {
if *tag.Key == aclKey {
tagout.TagSet[i] = types.Tag{
Key: backend.GetStringPtr(aclKey),
Value: backend.GetStringPtr(base64Encode(data)),
Key: backend.GetPtrFromString(aclKey),
Value: backend.GetPtrFromString(base64Encode(data)),
}
found = true
break
}
}
if !found {
tagout.TagSet = append(tagout.TagSet, types.Tag{
Key: backend.GetStringPtr(aclKey),
Value: backend.GetStringPtr(base64Encode(data)),
Key: backend.GetPtrFromString(aclKey),
Value: backend.GetPtrFromString(base64Encode(data)),
})
}

Expand Down Expand Up @@ -595,7 +601,7 @@ func (s *S3Proxy) DeleteObjectTagging(ctx context.Context, bucket, object string
func (s *S3Proxy) PutBucketPolicy(ctx context.Context, bucket string, policy []byte) error {
_, err := s.client.PutBucketPolicy(ctx, &s3.PutBucketPolicyInput{
Bucket: &bucket,
Policy: backend.GetStringPtr(string(policy)),
Policy: backend.GetPtrFromString(string(policy)),
})
return handleError(err)
}
Expand Down
Loading

0 comments on commit a53667c

Please sign in to comment.