Skip to content

Commit

Permalink
list very large buckets using "bucket inventory" (major update, API c…
Browse files Browse the repository at this point in the history
…hanges)

* API: introduce "ais-" http headers (soon to be documented)
* Go API: update list-objects control message ('lsmsg')
* part six, prev. commit: 51a1df4

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Mar 8, 2024
1 parent c380bdd commit 8736c91
Show file tree
Hide file tree
Showing 36 changed files with 231 additions and 248 deletions.
2 changes: 1 addition & 1 deletion ais/backend/ais.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (m *AISBackendProvider) ListObjects(remoteBck *meta.Bck, msg *apc.LsoMsg, l
unsetUUID(&bck)

var lstRes *cmn.LsoResult
if lstRes, err = api.ListObjectsPage(remAis.bp, bck, remoteMsg); err != nil {
if lstRes, err = api.ListObjectsPage(remAis.bp, bck, remoteMsg, api.ListArgs{}); err != nil {
errCode, err = extractErrCode(err, remAis.uuid)
return
}
Expand Down
10 changes: 5 additions & 5 deletions ais/backend/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,18 @@ func (*awsProvider) HeadBucket(_ ctx, bck *meta.Bck) (bckProps cos.StrKVs, errCo
return
}

//
// LIST OBJECTS via INVENTORY
func (awsp *awsProvider) ListObjectsInv(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResult,
ctx *core.LsoInventoryCtx) (errCode int, err error) {
//

func (awsp *awsProvider) ListObjectsInv(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResult, ctx *core.LsoInventoryCtx) (errCode int, err error) {
var (
svc *s3.Client
fqn string
fh *os.File
cloudBck = bck.RemoteBck()
)
debug.Assert(msg.IsFlagSet(apc.LsInventory))
debug.Assert(ctx != nil)
svc, _, err = newClient(sessConf{bck: cloudBck}, "[list_objects]")
if err != nil {
if cmn.Rom.FastV(4, cos.SmoduleBackend) {
Expand Down Expand Up @@ -198,8 +200,6 @@ func (*awsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResu
return
}
}
debug.Assert(!msg.IsFlagSet(apc.LsInventory))

params := &s3.ListObjectsV2Input{Bucket: aws.String(cloudBck.Name)}
if msg.Prefix != "" {
params.Prefix = aws.String(msg.Prefix)
Expand Down
23 changes: 12 additions & 11 deletions ais/backend/awsinv.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ import (
"fmt"
"io"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/api/env"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
Expand All @@ -33,17 +31,17 @@ import (

// TODO -- FIXME:
// 1. len(line) == 4: expecting strictly (bucket, objname, size, etag) - use manifests
// 2: the offset must correspond to the previously returned ContinuationToken
// 2: the offset must correspond to the previously returned ContinuationToken ====> recover or fail?
// 3: cleanup older inventories
// 4: cached inventory must be stored with its own content-type (or, might disappear during pagination)
// 5: opt-out ListObjectsV2 when ctx.Offset > 0

const (
invSizeSGL = cos.MiB
invMaxPage = 8 * apc.MaxPageSizeAWS // roughly, 2MB given 256B lines
)

const (
invName = ".inventory"
invSrcExt = ".csv.gz"
invDstExt = ".csv"
)
Expand All @@ -59,14 +57,17 @@ func sinceInv(t1, t2 time.Time) time.Duration {
return t2.Sub(t1)
}

// env "S3_BUCKET_INVENTORY" or const
func prefixInv(cloudBck *cmn.Bck) string {
return path.Join(env.BucketInventory(), cloudBck.Name)
func prefixInv(cloudBck *cmn.Bck, ctx *core.LsoInventoryCtx) (prefix string) {
prefix = cos.Either(ctx.Name, invName) + cos.PathSeparator + cloudBck.Name
if ctx.ID != "" {
prefix += cos.PathSeparator + ctx.ID
}
return prefix
}

func checkInventory(cloudBck *cmn.Bck, latest time.Time, ctx *core.LsoInventoryCtx) (fqn string, _ bool, _ error) {
// 2. one bucket, one inventory (and one statically produced name)
prefix := prefixInv(cloudBck)
prefix := prefixInv(cloudBck, ctx)
mi, _, err := fs.Hrw(prefix)
if err != nil {
return "", false, err
Expand Down Expand Up @@ -96,7 +97,7 @@ func (awsp *awsProvider) getInventory(cloudBck *cmn.Bck, svc *s3.Client, ctx *co
found string
bn = aws.String(cloudBck.Name)
params = &s3.ListObjectsV2Input{Bucket: bn}
prefix = prefixInv(cloudBck)
prefix = prefixInv(cloudBck, ctx)
)
params.Prefix = aws.String(prefix)
params.MaxKeys = aws.Int32(apc.MaxPageSizeAWS) // no more than 1000 manifests
Expand Down Expand Up @@ -179,7 +180,7 @@ func (awsp *awsProvider) getInventory(cloudBck *cmn.Bck, svc *s3.Client, ctx *co
func (*awsProvider) listInventory(cloudBck *cmn.Bck, fh *os.File, sgl *memsys.SGL, ctx *core.LsoInventoryCtx, msg *apc.LsoMsg,
lst *cmn.LsoResult) error {
msg.PageSize = calcPageSize(msg.PageSize, invMaxPage)
for j := uint(len(lst.Entries)); j < msg.PageSize; j++ {
for j := len(lst.Entries); j < int(msg.PageSize); j++ {
lst.Entries = append(lst.Entries, &cmn.LsoEntry{})
}

Expand All @@ -198,7 +199,7 @@ func (*awsProvider) listInventory(cloudBck *cmn.Bck, fh *os.File, sgl *memsys.SG
}

var (
i uint
i int64
off = ctx.Offset
skip = msg.ContinuationToken != ""
lbuf = make([]byte, 256)
Expand Down
4 changes: 2 additions & 2 deletions ais/backend/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type ctx = context.Context // used when omitted for shortness sake

func fmtTime(t time.Time) string { return t.Format(time.RFC3339) }

func calcPageSize(pageSize, maxPageSize uint) uint {
debug.Assert(int(pageSize) >= 0, pageSize)
func calcPageSize(pageSize, maxPageSize int64) int64 {
debug.Assert(pageSize >= 0, pageSize)
if pageSize == 0 {
return maxPageSize
}
Expand Down
4 changes: 2 additions & 2 deletions ais/backend/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (hp *hdfsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.Lso
}
return err
}
if uint(len(lst.Entries)) >= msg.PageSize {
if len(lst.Entries) >= int(msg.PageSize) {
return skipDir(fi)
}
objName := strings.TrimPrefix(strings.TrimPrefix(path, bck.Props.Extra.HDFS.RefDirectory), string(filepath.Separator))
Expand Down Expand Up @@ -197,7 +197,7 @@ func (hp *hdfsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.Lso
}
lst.Entries = lst.Entries[:idx]
// Set continuation token only if we reached the page size.
if uint(len(lst.Entries)) >= msg.PageSize {
if len(lst.Entries) >= int(msg.PageSize) {
lst.ContinuationToken = lst.Entries[len(lst.Entries)-1].Name
}
return 0, nil
Expand Down
19 changes: 0 additions & 19 deletions ais/bucketmeta_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@ package ais
import (
"fmt"
"net/http"
"os"
"path/filepath"
"time"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/fname"
"github.com/NVIDIA/aistore/cmn/jsp"
"github.com/NVIDIA/aistore/core/meta"
. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -119,22 +116,6 @@ var _ = Describe("BMD marshal and unmarshal", func() {
}
}
})

It("should correctly detect bmd corruption "+node, func() {
bmdFullPath := filepath.Join(mpath, fname.Bmd)
f, err := os.OpenFile(bmdFullPath, os.O_RDWR, 0)
Expect(err).NotTo(HaveOccurred())
_, err = f.WriteAt([]byte("xxxxxxxxxxxx"), 10)
Expect(err).NotTo(HaveOccurred())
Expect(f.Close()).NotTo(HaveOccurred())

fmt.Println("NOTE: error on screen is expected at this point...")
fmt.Println("")

loaded := newBucketMD()
_, err = jsp.Load(testpath, loaded, loaded.JspOpts())
Expect(err).To(HaveOccurred())
})
})
}
})
20 changes: 0 additions & 20 deletions ais/emd_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@ package ais

import (
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/fname"
"github.com/NVIDIA/aistore/cmn/jsp"
"github.com/NVIDIA/aistore/ext/etl"
. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -152,23 +149,6 @@ var _ = Describe("EtlMD marshal and unmarshal", func() {
}
}
})

It("should correctly detect etlMD corruption "+node, func() {
eowner.init()
etlMDFullPath := filepath.Join(mpath, fname.Emd)
f, err := os.OpenFile(etlMDFullPath, os.O_RDWR, 0)
Expect(err).NotTo(HaveOccurred())
_, err = f.WriteAt([]byte("xxxxxxxxxxxx"), 10)
Expect(err).NotTo(HaveOccurred())
Expect(f.Close()).NotTo(HaveOccurred())

fmt.Println("NOTE: error on screen is expected at this point...")
fmt.Println("")

loaded := newEtlMD()
_, err = jsp.Load(etlMDFullPath, loaded, loaded.JspOpts())
Expect(err).To(HaveOccurred())
})
})
}
})
5 changes: 3 additions & 2 deletions ais/plstcx.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type (
amsg *apc.ActMsg // orig
config *cmn.Config
smap *smapX
hdr http.Header
// work
tsi *meta.Snode
xid string // x-tco
Expand Down Expand Up @@ -107,7 +108,7 @@ func (c *lstcx) do() (string, error) {

// 2. ls 1st page
var lst *cmn.LsoResult
lst, err = c.p.lsObjsR(c.bckFrom, &c.lsmsg, c.smap, tsi /*designated target*/, c.config, true)
lst, err = c.p.lsObjsR(c.bckFrom, &c.lsmsg, c.hdr, c.smap, tsi /*designated target*/, c.config, true)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -169,7 +170,7 @@ func (c *lstcx) pages(s string, cnt int) {

// next page
func (c *lstcx) _page() (int, error) {
lst, err := c.p.lsObjsR(c.bckFrom, &c.lsmsg, c.smap, c.tsi, c.config, true)
lst, err := c.p.lsObjsR(c.bckFrom, &c.lsmsg, c.hdr, c.smap, c.tsi, c.config, true)
if err != nil {
return 0, err
}
Expand Down
13 changes: 7 additions & 6 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1535,7 +1535,7 @@ func (p *proxy) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bc

// do page
beg := mono.NanoTime()
lst, err := p.lsPage(bck, amsg, lsmsg, p.owner.smap.get())
lst, err := p.lsPage(bck, amsg, lsmsg, r.Header, p.owner.smap.get())
if err != nil {
p.writeErr(w, r, err)
return
Expand Down Expand Up @@ -1563,7 +1563,7 @@ func (p *proxy) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bc
}

// one page; common code (native, s3 api)
func (p *proxy) lsPage(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg, smap *smapX) (*cmn.LsoResult, error) {
func (p *proxy) lsPage(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg, hdr http.Header, smap *smapX) (*cmn.LsoResult, error) {
var (
nl nl.Listener
err error
Expand Down Expand Up @@ -1612,7 +1612,7 @@ func (p *proxy) lsPage(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg, smap
}

config := cmn.GCO.Get()
lst, err = p.lsObjsR(bck, lsmsg, smap, tsi, config, wantOnlyRemote)
lst, err = p.lsObjsR(bck, lsmsg, hdr, smap, tsi, config, wantOnlyRemote)

// TODO: `status == http.StatusGone`: at this point we know that this
// remote bucket exists and is offline. We should somehow try to list
Expand Down Expand Up @@ -2183,7 +2183,7 @@ end:
Entries: entries,
Flags: flags,
}
if uint(len(entries)) >= pageSize {
if len(entries) >= int(pageSize) {
allEntries.ContinuationToken = entries[len(entries)-1].Name
}
// By default, recursion is always enabled. When disabled the result will include
Expand All @@ -2195,20 +2195,21 @@ end:
return allEntries, nil
}

func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, smap *smapX, tsi *meta.Snode, config *cmn.Config,
func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, hdr http.Header, smap *smapX, tsi *meta.Snode, config *cmn.Config,
wantOnlyRemote bool) (*cmn.LsoResult, error) {
var (
results sliceResults
aisMsg = p.newAmsgActVal(apc.ActList, &lsmsg)
args = allocBcArgs()
timeout = config.Client.ListObjTimeout.D()
)
if lsmsg.IsFlagSet(apc.LsInventory) && lsmsg.ContinuationToken == "" {
if cos.IsParseBool(hdr.Get(apc.HdrInventory)) && lsmsg.ContinuationToken == "" /*first page*/ {
timeout = config.Client.TimeoutLong.D()
}
args.req = cmn.HreqArgs{
Method: http.MethodGet,
Path: apc.URLPathBuckets.Join(bck.Name),
Header: hdr,
Query: bck.NewQuery(),
Body: cos.MustMarshal(aisMsg),
}
Expand Down
Loading

0 comments on commit 8736c91

Please sign in to comment.