Skip to content

Commit

Permalink
list very large buckets using "bucket inventory"
Browse files Browse the repository at this point in the history
* for starters, S3 only:
  docs.aws.amazon.com/AmazonS3/latest/userguide/configure-inventory.html#rest-api-inventory
* part one

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Mar 5, 2024
1 parent 9428dd3 commit 398da3d
Show file tree
Hide file tree
Showing 15 changed files with 283 additions and 24 deletions.
22 changes: 20 additions & 2 deletions ais/backend/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (*awsProvider) HeadBucket(_ ctx, bck *meta.Bck) (bckProps cos.StrKVs, errCo
// NOTE: obtaining versioning info is extremely slow - to avoid timeouts, imposing a hard limit on the page size
const versionedPageSize = 20

func (*awsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResult) (errCode int, err error) {
func (awsp *awsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResult) (errCode int, err error) {
var (
svc *s3.Client
h = cmn.BackendHelpers.Amazon
Expand All @@ -151,6 +151,24 @@ func (*awsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResu
}
}

// alternative path
if msg.IsFlagSet(apc.LsInventory) {
var (
fqn string
fh *os.File
)
if fqn, err = awsp.getInventory(cloudBck, svc); err != nil {
return
}
if fh, err = os.Open(fqn); err != nil {
_, err = _errInv(err)
return
}
err = awsp.listInventory(cloudBck, fh, msg, lst)
fh.Close()
return
}

params := &s3.ListObjectsV2Input{Bucket: aws.String(cloudBck.Name)}
if msg.Prefix != "" {
params.Prefix = aws.String(msg.Prefix)
Expand Down Expand Up @@ -655,7 +673,7 @@ func getBucketLocation(svc *s3.Client, bckName string) (region string, err error
}
region = string(resp.LocationConstraint)
if region == "" {
region = env.AwsDefaultRegion()
region = env.AwsDefaultRegion() // env "AWS_REGION" or "us-east-1" - in that order
}
return
}
Expand Down
183 changes: 183 additions & 0 deletions ais/backend/awsinv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//go:build aws

// Package backend contains implementation of various backend providers.
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
*/
package backend

import (
"bufio"
"compress/gzip"
"context"
"fmt"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/api/env"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/fs"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

const (
invSrcExt = ".csv.gz"
invDstExt = ".csv"
)

func (awsp *awsProvider) getInventory(cloudBck *cmn.Bck, svc *s3.Client) (string, error) {
var (
latest time.Time
found string
params = &s3.ListObjectsV2Input{Bucket: aws.String(cloudBck.Name)}
prefix = path.Join(env.BucketInventory(), cloudBck.Name) // env "S3_BUCKET_INVENTORY" or const
)
params.Prefix = aws.String(prefix)
params.MaxKeys = aws.Int32(apc.MaxPageSizeAWS)

// 1. ls inventory
resp, err := svc.ListObjectsV2(context.Background(), params)
if err != nil {
return "", err
}
for _, obj := range resp.Contents {
name := *obj.Key
if cos.Ext(name) != invSrcExt {
continue
}
mtime := *(obj.LastModified)
if mtime.After(latest) {
latest = mtime
found = name
}
}
if found == "" {
return "", cos.NewErrNotFound(cloudBck, "S3 bucket inventory")
}

// 2. one bucket, one inventory (and one statically produced name)
mi, _, err := fs.Hrw(prefix)
if err != nil {
return "", err
}
fqn := mi.MakePathFQN(cloudBck, fs.WorkfileType, prefix) + invDstExt
if cos.Stat(fqn) == nil {
return fqn, nil
}

// 3. get and save unzipped locally
input := s3.GetObjectInput{Bucket: aws.String(cloudBck.Name), Key: aws.String(found)}
obj, err := svc.GetObject(context.Background(), &input)
if err != nil {
return "", err
}
defer cos.Close(obj.Body)
gzr, err := gzip.NewReader(obj.Body)
if err != nil {
return _errInv(err)
}
if err = cos.CreateDir(filepath.Dir(fqn)); err != nil {
gzr.Close()
return _errInv(err)
}
wfh, err := os.OpenFile(fqn, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, cos.PermRWR)
if err != nil {
return _errInv(err)
}
size := *obj.ContentLength
buf, slab := awsp.t.PageMM().AllocSize(min(size, 64*cos.KiB))
_, err = cos.CopyBuffer(wfh, gzr, buf)
slab.Free(buf)
wfh.Close()

if err != nil {
return _errInv(err)
}
err = gzr.Close()
debug.AssertNoErr(err)

// 4. cleanup older inventories
b := aws.String(cloudBck.Name)
for _, obj := range resp.Contents {
name := *obj.Key
mtime := *(obj.LastModified)
if name == found || latest.Sub(mtime) < 23*time.Hour {
continue
}
_, errN := svc.DeleteObject(context.Background(), &s3.DeleteObjectInput{Bucket: b, Key: obj.Key})
if errN != nil {
nlog.Errorln(name, errN)
}
}

return fqn, nil
}

func _errInv(err error) (string, error) {
return "", fmt.Errorf("bucket-inventory: %w", err)
}

// TODO -- FIXME: len(line) == 4: expecting strictly (bucket, objname, size, etag)
func (*awsProvider) listInventory(cloudBck *cmn.Bck, fh *os.File, msg *apc.LsoMsg, lst *cmn.LsoResult) (err error) {
var (
i uint
scanner = bufio.NewScanner(fh)
skip = msg.ContinuationToken != ""
)
msg.PageSize = calcPageSize(msg.PageSize, apc.MaxPageSizeAIS /* 10k */)
for j := uint(len(lst.Entries)); j < msg.PageSize; j++ {
lst.Entries = append(lst.Entries, &cmn.LsoEntry{})
}
for scanner.Scan() {
line := strings.Split(scanner.Text(), ",")
debug.Assert(len(line) == 4)
debug.Assert(strings.Contains(line[0], cloudBck.Name))

objName := cmn.UnquoteCEV(line[1])
if msg.Prefix != "" && !strings.HasPrefix(objName, msg.Prefix) {
continue
}
if skip && objName == msg.ContinuationToken {
skip = false
}
if skip {
continue
}

// have page?
if i >= msg.PageSize {
lst.ContinuationToken = objName
break
}

// next entry
entry := lst.Entries[i]
i++
entry.Name = objName
size := cmn.UnquoteCEV(line[2])
entry.Size, err = strconv.ParseInt(size, 10, 64)
if err != nil {
return err
}
if msg.IsFlagSet(apc.LsNameOnly) || msg.IsFlagSet(apc.LsNameSize) {
continue
}
if msg.WantProp(apc.GetPropsCustom) {
custom := cos.StrKVs{cmn.ETag: cmn.UnquoteCEV(line[3])}
entry.Custom = cmn.CustomMD2S(custom)
}
}
lst.Entries = lst.Entries[:i]

debug.AssertNoErr(scanner.Err())
return nil
}
2 changes: 2 additions & 0 deletions ais/backend/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/fs"
)
Expand All @@ -18,6 +19,7 @@ type ctx = context.Context // used when omitted for shortness sake
func fmtTime(t time.Time) string { return t.Format(time.RFC3339) }

func calcPageSize(pageSize, maxPageSize uint) uint {
debug.Assert(int(pageSize) >= 0, pageSize)
if pageSize == 0 {
return maxPageSize
}
Expand Down
3 changes: 3 additions & 0 deletions api/apc/lsmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ const (
// and if it does:
// - check whether remote version differs from its in-cluster copy
LsVerChanged

// (new & experimental)
LsInventory
)

// max page sizes
Expand Down
21 changes: 15 additions & 6 deletions api/env/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ func AwsDefaultRegion() (region string) {
return region
}

func BucketInventory() (inv string) {
if inv = os.Getenv(AWS.Inventory); inv == "" {
inv = "inv-all" // TODO -- FIXME: change to ".inventory"
}
return inv
}

// use S3_ENDPOINT to globally override the default 'https://s3.amazonaws.com' endpoint
// NOTE: the same can be done on a per-bucket basis, via bucket prop `Extra.AWS.Endpoint`
// (bucket override will always take precedence)
Expand All @@ -23,12 +30,14 @@ func AwsDefaultRegion() (region string) {

var (
AWS = struct {
Endpoint string
Region string
Profile string
Endpoint string
Region string
Profile string
Inventory string
}{
Endpoint: "S3_ENDPOINT",
Region: "AWS_REGION",
Profile: "AWS_PROFILE",
Endpoint: "S3_ENDPOINT",
Region: "AWS_REGION",
Profile: "AWS_PROFILE",
Inventory: "S3_BUCKET_INVENTORY",
}
)
4 changes: 1 addition & 3 deletions api/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func QueryBuckets(bp BaseParams, qbck cmn.QueryBcks, fltPresence int) (bool, err
// - usage examples in CLI docs under docs/cli.
func ListObjects(bp BaseParams, bck cmn.Bck, lsmsg *apc.LsoMsg, args ListArgs) (*cmn.LsoResult, error) {
var (
q url.Values
path = apc.URLPathBuckets.Join(bck.Name)
hdr = http.Header{
cos.HdrAccept: []string{cos.ContentMsgPack},
Expand All @@ -118,15 +117,14 @@ func ListObjects(bp BaseParams, bck cmn.Bck, lsmsg *apc.LsoMsg, args ListArgs) (
if lsmsg == nil {
lsmsg = &apc.LsoMsg{}
}
q = bck.AddToQuery(q)
lsmsg.UUID = ""
lsmsg.ContinuationToken = ""
reqParams := AllocRp()
{
reqParams.BaseParams = bp
reqParams.Path = path
reqParams.Header = hdr
reqParams.Query = q
reqParams.Query = bck.AddToQuery(nil)

reqParams.buf = allocMbuf() // mem-pool msgpack
}
Expand Down
1 change: 1 addition & 0 deletions cmd/cli/cli/bucket_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ var (
silentFlag,
dontWaitFlag,
verChangedFlag,
useInventoryFlag,
},

cmdLRU: {
Expand Down
4 changes: 4 additions & 0 deletions cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,10 @@ var (
indent4 + "\t- applies to remote backends that maintain at least some form of versioning information (e.g., version, checksum, ETag)\n" +
indent4 + "\t- see related: 'ais get --latest', 'ais cp --sync', 'ais prefetch --latest'",
}
useInventoryFlag = cli.BoolFlag{
Name: "inventory",
Usage: "experimental; requires s3:// backend",
}

keepMDFlag = cli.BoolFlag{Name: "keep-md", Usage: "keep bucket metadata"}
dataSlicesFlag = cli.IntFlag{Name: "data-slices,data,d", Usage: "number of data slices", Required: true}
Expand Down
3 changes: 3 additions & 0 deletions cmd/cli/cli/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ func getMultiObj(c *cli.Context, bck cmn.Bck, archpath, outFile string, extract
if flagIsSet(c, getObjCachedFlag) {
msg.SetFlag(apc.LsObjCached)
}
if flagIsSet(c, useInventoryFlag) {
msg.SetFlag(apc.LsInventory)
}
pageSize, limit, err := _setPage(c, bck)
if err != nil {
return err
Expand Down
11 changes: 11 additions & 0 deletions cmd/cli/cli/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ func listObjects(c *cli.Context, bck cmn.Bck, prefix string, listArch bool) erro
if listArch {
msg.SetFlag(apc.LsArchDir)
}
if flagIsSet(c, useInventoryFlag) {
msg.SetFlag(apc.LsInventory)
}

var (
props []string
Expand Down Expand Up @@ -505,6 +508,14 @@ func printLso(c *cli.Context, entries cmn.LsoEntries, lstFilter *lstFilter, prop
}
}

// validate props for typos
for _, prop := range propsList {
if _, ok := teb.ObjectPropsMap[prop]; !ok {
return fmt.Errorf("unknown object property %q (expecting one of: %v)",
prop, cos.StrKVs(teb.ObjectPropsMap).Keys())
}
}

tmpl := teb.LsoTemplate(propsList, hideHeader, addCachedCol, addStatusCol)
opts := teb.Opts{AltMap: teb.FuncMapUnits(units)}
if err := teb.Print(matched, tmpl, opts); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/cli/cli/object_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ var (
listArchFlag,
objLimitFlag,
unitsFlag,
verboseFlag, // client side
silentFlag, // server side
useInventoryFlag, // experimental
verboseFlag, // client side
silentFlag, // server side
},

commandPut: append(
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module github.com/NVIDIA/aistore/cmd/cli
go 1.22

require (
github.com/NVIDIA/aistore v1.3.23-0.20240301175348-cd4020d0f129
github.com/NVIDIA/aistore v1.3.23-0.20240303161644-fa4ca2c175ab
github.com/fatih/color v1.16.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.30.0
github.com/onsi/gomega v1.31.1
github.com/urfave/cli v1.22.14
github.com/vbauerster/mpb/v4 v4.12.2
golang.org/x/sync v0.6.0
Expand Down
Loading

0 comments on commit 398da3d

Please sign in to comment.