diff --git a/ais/backend/ais.go b/ais/backend/ais.go index 93e3033c95b..59e9b2e422a 100644 --- a/ais/backend/ais.go +++ b/ais/backend/ais.go @@ -446,7 +446,7 @@ func (m *AISBackendProvider) ListObjects(remoteBck *meta.Bck, msg *apc.LsoMsg, l unsetUUID(&bck) var lstRes *cmn.LsoResult - if lstRes, err = api.ListObjectsPage(remAis.bp, bck, remoteMsg); err != nil { + if lstRes, err = api.ListObjectsPage(remAis.bp, bck, remoteMsg, api.ListArgs{}); err != nil { errCode, err = extractErrCode(err, remAis.uuid) return } diff --git a/ais/backend/aws.go b/ais/backend/aws.go index 075c3022fc0..ef82f2e42ce 100644 --- a/ais/backend/aws.go +++ b/ais/backend/aws.go @@ -129,16 +129,18 @@ func (*awsProvider) HeadBucket(_ ctx, bck *meta.Bck) (bckProps cos.StrKVs, errCo return } +// // LIST OBJECTS via INVENTORY -func (awsp *awsProvider) ListObjectsInv(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResult, - ctx *core.LsoInventoryCtx) (errCode int, err error) { +// + +func (awsp *awsProvider) ListObjectsInv(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResult, ctx *core.LsoInventoryCtx) (errCode int, err error) { var ( svc *s3.Client fqn string fh *os.File cloudBck = bck.RemoteBck() ) - debug.Assert(msg.IsFlagSet(apc.LsInventory)) + debug.Assert(ctx != nil) svc, _, err = newClient(sessConf{bck: cloudBck}, "[list_objects]") if err != nil { if cmn.Rom.FastV(4, cos.SmoduleBackend) { @@ -198,8 +200,6 @@ func (*awsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResu return } } - debug.Assert(!msg.IsFlagSet(apc.LsInventory)) - params := &s3.ListObjectsV2Input{Bucket: aws.String(cloudBck.Name)} if msg.Prefix != "" { params.Prefix = aws.String(msg.Prefix) diff --git a/ais/backend/awsinv.go b/ais/backend/awsinv.go index 6e93bc02cf6..42848bd11b0 100644 --- a/ais/backend/awsinv.go +++ b/ais/backend/awsinv.go @@ -12,14 +12,12 @@ import ( "fmt" "io" "os" - "path" "path/filepath" "strconv" "strings" "time" "github.com/NVIDIA/aistore/api/apc" - "github.com/NVIDIA/aistore/api/env" "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/debug" @@ -33,10 +31,9 @@ import ( // TODO -- FIXME: // 1. len(line) == 4: expecting strictly (bucket, objname, size, etag) - use manifests -// 2: the offset must correspond to the previously returned ContinuationToken +// 2: the offset must correspond to the previously returned ContinuationToken ====> recover or fail? // 3: cleanup older inventories // 4: cached inventory must be stored with its own content-type (or, might disappear during pagination) -// 5: opt-out ListObjectsV2 when ctx.Offset > 0 const ( invSizeSGL = cos.MiB @@ -44,6 +41,7 @@ const ( ) const ( + invName = ".inventory" invSrcExt = ".csv.gz" invDstExt = ".csv" ) @@ -59,14 +57,17 @@ func sinceInv(t1, t2 time.Time) time.Duration { return t2.Sub(t1) } -// env "S3_BUCKET_INVENTORY" or const -func prefixInv(cloudBck *cmn.Bck) string { - return path.Join(env.BucketInventory(), cloudBck.Name) +func prefixInv(cloudBck *cmn.Bck, ctx *core.LsoInventoryCtx) (prefix string) { + prefix = cos.Either(ctx.Name, invName) + cos.PathSeparator + cloudBck.Name + if ctx.ID != "" { + prefix += cos.PathSeparator + ctx.ID + } + return prefix } func checkInventory(cloudBck *cmn.Bck, latest time.Time, ctx *core.LsoInventoryCtx) (fqn string, _ bool, _ error) { // 2. one bucket, one inventory (and one statically produced name) - prefix := prefixInv(cloudBck) + prefix := prefixInv(cloudBck, ctx) mi, _, err := fs.Hrw(prefix) if err != nil { return "", false, err @@ -96,7 +97,7 @@ func (awsp *awsProvider) getInventory(cloudBck *cmn.Bck, svc *s3.Client, ctx *co found string bn = aws.String(cloudBck.Name) params = &s3.ListObjectsV2Input{Bucket: bn} - prefix = prefixInv(cloudBck) + prefix = prefixInv(cloudBck, ctx) ) params.Prefix = aws.String(prefix) params.MaxKeys = aws.Int32(apc.MaxPageSizeAWS) // no more than 1000 manifests @@ -179,7 +180,7 @@ func (awsp *awsProvider) getInventory(cloudBck *cmn.Bck, svc *s3.Client, ctx *co func (*awsProvider) listInventory(cloudBck *cmn.Bck, fh *os.File, sgl *memsys.SGL, ctx *core.LsoInventoryCtx, msg *apc.LsoMsg, lst *cmn.LsoResult) error { msg.PageSize = calcPageSize(msg.PageSize, invMaxPage) - for j := uint(len(lst.Entries)); j < msg.PageSize; j++ { + for j := len(lst.Entries); j < int(msg.PageSize); j++ { lst.Entries = append(lst.Entries, &cmn.LsoEntry{}) } @@ -198,7 +199,7 @@ func (*awsProvider) listInventory(cloudBck *cmn.Bck, fh *os.File, sgl *memsys.SG } var ( - i uint + i int64 off = ctx.Offset skip = msg.ContinuationToken != "" lbuf = make([]byte, 256) diff --git a/ais/backend/common.go b/ais/backend/common.go index cb02f97d72d..af0bdfa784c 100644 --- a/ais/backend/common.go +++ b/ais/backend/common.go @@ -18,8 +18,8 @@ type ctx = context.Context // used when omitted for shortness sake func fmtTime(t time.Time) string { return t.Format(time.RFC3339) } -func calcPageSize(pageSize, maxPageSize uint) uint { - debug.Assert(int(pageSize) >= 0, pageSize) +func calcPageSize(pageSize, maxPageSize int64) int64 { + debug.Assert(pageSize >= 0, pageSize) if pageSize == 0 { return maxPageSize } diff --git a/ais/backend/hdfs.go b/ais/backend/hdfs.go index f4684f910f6..485d4a251cb 100644 --- a/ais/backend/hdfs.go +++ b/ais/backend/hdfs.go @@ -144,7 +144,7 @@ func (hp *hdfsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.Lso } return err } - if uint(len(lst.Entries)) >= msg.PageSize { + if len(lst.Entries) >= int(msg.PageSize) { return skipDir(fi) } objName := strings.TrimPrefix(strings.TrimPrefix(path, bck.Props.Extra.HDFS.RefDirectory), string(filepath.Separator)) @@ -197,7 +197,7 @@ func (hp *hdfsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.Lso } lst.Entries = lst.Entries[:idx] // Set continuation token only if we reached the page size. - if uint(len(lst.Entries)) >= msg.PageSize { + if len(lst.Entries) >= int(msg.PageSize) { lst.ContinuationToken = lst.Entries[len(lst.Entries)-1].Name } return 0, nil diff --git a/ais/bucketmeta_internal_test.go b/ais/bucketmeta_internal_test.go index 5012567af29..f2c4d7ee15e 100644 --- a/ais/bucketmeta_internal_test.go +++ b/ais/bucketmeta_internal_test.go @@ -7,14 +7,11 @@ package ais import ( "fmt" "net/http" - "os" - "path/filepath" "time" "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" - "github.com/NVIDIA/aistore/cmn/fname" "github.com/NVIDIA/aistore/cmn/jsp" "github.com/NVIDIA/aistore/core/meta" . "github.com/onsi/ginkgo" @@ -119,22 +116,6 @@ var _ = Describe("BMD marshal and unmarshal", func() { } } }) - - It("should correctly detect bmd corruption "+node, func() { - bmdFullPath := filepath.Join(mpath, fname.Bmd) - f, err := os.OpenFile(bmdFullPath, os.O_RDWR, 0) - Expect(err).NotTo(HaveOccurred()) - _, err = f.WriteAt([]byte("xxxxxxxxxxxx"), 10) - Expect(err).NotTo(HaveOccurred()) - Expect(f.Close()).NotTo(HaveOccurred()) - - fmt.Println("NOTE: error on screen is expected at this point...") - fmt.Println("") - - loaded := newBucketMD() - _, err = jsp.Load(testpath, loaded, loaded.JspOpts()) - Expect(err).To(HaveOccurred()) - }) }) } }) diff --git a/ais/emd_internal_test.go b/ais/emd_internal_test.go index 15e654a162e..5d22f02da08 100644 --- a/ais/emd_internal_test.go +++ b/ais/emd_internal_test.go @@ -6,15 +6,12 @@ package ais import ( "fmt" - "os" - "path/filepath" "testing" "time" "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" - "github.com/NVIDIA/aistore/cmn/fname" "github.com/NVIDIA/aistore/cmn/jsp" "github.com/NVIDIA/aistore/ext/etl" . "github.com/onsi/ginkgo" @@ -152,23 +149,6 @@ var _ = Describe("EtlMD marshal and unmarshal", func() { } } }) - - It("should correctly detect etlMD corruption "+node, func() { - eowner.init() - etlMDFullPath := filepath.Join(mpath, fname.Emd) - f, err := os.OpenFile(etlMDFullPath, os.O_RDWR, 0) - Expect(err).NotTo(HaveOccurred()) - _, err = f.WriteAt([]byte("xxxxxxxxxxxx"), 10) - Expect(err).NotTo(HaveOccurred()) - Expect(f.Close()).NotTo(HaveOccurred()) - - fmt.Println("NOTE: error on screen is expected at this point...") - fmt.Println("") - - loaded := newEtlMD() - _, err = jsp.Load(etlMDFullPath, loaded, loaded.JspOpts()) - Expect(err).To(HaveOccurred()) - }) }) } }) diff --git a/ais/plstcx.go b/ais/plstcx.go index 89636856ad2..d9a894a183a 100644 --- a/ais/plstcx.go +++ b/ais/plstcx.go @@ -35,6 +35,7 @@ type ( amsg *apc.ActMsg // orig config *cmn.Config smap *smapX + hdr http.Header // work tsi *meta.Snode xid string // x-tco @@ -107,7 +108,7 @@ func (c *lstcx) do() (string, error) { // 2. ls 1st page var lst *cmn.LsoResult - lst, err = c.p.lsObjsR(c.bckFrom, &c.lsmsg, c.smap, tsi /*designated target*/, c.config, true) + lst, err = c.p.lsObjsR(c.bckFrom, &c.lsmsg, c.hdr, c.smap, tsi /*designated target*/, c.config, true) if err != nil { return "", err } @@ -169,7 +170,7 @@ func (c *lstcx) pages(s string, cnt int) { // next page func (c *lstcx) _page() (int, error) { - lst, err := c.p.lsObjsR(c.bckFrom, &c.lsmsg, c.smap, c.tsi, c.config, true) + lst, err := c.p.lsObjsR(c.bckFrom, &c.lsmsg, c.hdr, c.smap, c.tsi, c.config, true) if err != nil { return 0, err } diff --git a/ais/proxy.go b/ais/proxy.go index 968e9ff96dc..c7b7a373207 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -1535,7 +1535,7 @@ func (p *proxy) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bc // do page beg := mono.NanoTime() - lst, err := p.lsPage(bck, amsg, lsmsg, p.owner.smap.get()) + lst, err := p.lsPage(bck, amsg, lsmsg, r.Header, p.owner.smap.get()) if err != nil { p.writeErr(w, r, err) return @@ -1563,7 +1563,7 @@ func (p *proxy) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bc } // one page; common code (native, s3 api) -func (p *proxy) lsPage(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg, smap *smapX) (*cmn.LsoResult, error) { +func (p *proxy) lsPage(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg, hdr http.Header, smap *smapX) (*cmn.LsoResult, error) { var ( nl nl.Listener err error @@ -1612,7 +1612,7 @@ func (p *proxy) lsPage(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg, smap } config := cmn.GCO.Get() - lst, err = p.lsObjsR(bck, lsmsg, smap, tsi, config, wantOnlyRemote) + lst, err = p.lsObjsR(bck, lsmsg, hdr, smap, tsi, config, wantOnlyRemote) // TODO: `status == http.StatusGone`: at this point we know that this // remote bucket exists and is offline. We should somehow try to list @@ -2183,7 +2183,7 @@ end: Entries: entries, Flags: flags, } - if uint(len(entries)) >= pageSize { + if len(entries) >= int(pageSize) { allEntries.ContinuationToken = entries[len(entries)-1].Name } // By default, recursion is always enabled. When disabled the result will include @@ -2195,7 +2195,7 @@ end: return allEntries, nil } -func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, smap *smapX, tsi *meta.Snode, config *cmn.Config, +func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, hdr http.Header, smap *smapX, tsi *meta.Snode, config *cmn.Config, wantOnlyRemote bool) (*cmn.LsoResult, error) { var ( results sliceResults @@ -2203,12 +2203,13 @@ func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, smap *smapX, tsi *meta args = allocBcArgs() timeout = config.Client.ListObjTimeout.D() ) - if lsmsg.IsFlagSet(apc.LsInventory) && lsmsg.ContinuationToken == "" { + if cos.IsParseBool(hdr.Get(apc.HdrInventory)) && lsmsg.ContinuationToken == "" /*first page*/ { timeout = config.Client.TimeoutLong.D() } args.req = cmn.HreqArgs{ Method: http.MethodGet, Path: apc.URLPathBuckets.Join(bck.Name), + Header: hdr, Query: bck.NewQuery(), Body: cos.MustMarshal(aisMsg), } diff --git a/ais/prxlso.go b/ais/prxlso.go index 5a9799df14c..e90ba4457f8 100644 --- a/ais/prxlso.go +++ b/ais/prxlso.go @@ -207,7 +207,7 @@ func (b *lsobjBuffer) mergeTargetBuffers() (filled bool) { return true } -func (b *lsobjBuffer) get(token string, size uint) (entries cmn.LsoEntries, hasEnough bool) { +func (b *lsobjBuffer) get(token string, size int64) (entries cmn.LsoEntries, hasEnough bool) { b.lastAccess.Store(mono.NanoTime()) // If user requested something before what we have currently in the buffer @@ -227,13 +227,13 @@ func (b *lsobjBuffer) get(token string, size uint) (entries cmn.LsoEntries, hasE }) entries = b.currentBuff[idx:] - if size > uint(len(entries)) { + if size > int64(len(entries)) { // In case we don't have enough entries and we haven't filled anything then // we must request more (if filled then we don't have enough because it's end). if !filled { return nil, false } - size = uint(len(entries)) + size = int64(len(entries)) } // Move buffer after returned entries. @@ -246,13 +246,13 @@ func (b *lsobjBuffer) get(token string, size uint) (entries cmn.LsoEntries, hasE return entries, true } -func (b *lsobjBuffer) set(id string, entries cmn.LsoEntries, size uint) { +func (b *lsobjBuffer) set(id string, entries cmn.LsoEntries, size int64) { if b.leftovers == nil { b.leftovers = make(map[string]*lsobjBufferTarget, 5) } b.leftovers[id] = &lsobjBufferTarget{ entries: entries, - done: uint(len(entries)) < size, + done: len(entries) < int(size), } b.lastAccess.Store(mono.NanoTime()) } @@ -273,12 +273,12 @@ func (b *lsobjBuffers) last(id, token string) string { return last } -func (b *lsobjBuffers) get(id, token string, size uint) (entries cmn.LsoEntries, hasEnough bool) { +func (b *lsobjBuffers) get(id, token string, size int64) (entries cmn.LsoEntries, hasEnough bool) { v, _ := b.buffers.LoadOrStore(id, &lsobjBuffer{}) return v.(*lsobjBuffer).get(token, size) } -func (b *lsobjBuffers) set(id, targetID string, entries cmn.LsoEntries, size uint) { +func (b *lsobjBuffers) set(id, targetID string, entries cmn.LsoEntries, size int64) { v, _ := b.buffers.LoadOrStore(id, &lsobjBuffer{}) v.(*lsobjBuffer).set(targetID, entries, size) } @@ -309,7 +309,7 @@ func (ci *cacheInterval) contains(token string) bool { return false } -func (ci *cacheInterval) get(token string, objCnt uint, params reqParams) (entries cmn.LsoEntries, hasEnough bool) { +func (ci *cacheInterval) get(token string, objCnt int64, params reqParams) (entries cmn.LsoEntries, hasEnough bool) { ci.lastAccess = mono.NanoTime() entries = ci.entries @@ -336,7 +336,7 @@ func (ci *cacheInterval) get(token string, objCnt uint, params reqParams) (entri } entries = entries[start:] - end := min(uint(len(entries)), objCnt) + end := min(len(entries), int(objCnt)) if params.prefix != "" { // Move `end-1` to last entry that starts with `params.prefix`. for ; end > 0; end-- { @@ -344,7 +344,7 @@ func (ci *cacheInterval) get(token string, objCnt uint, params reqParams) (entri break } } - if !ci.last && end < uint(len(entries)) { + if !ci.last && end < len(entries) { // We filtered out entries that start with `params.prefix` and // the entries are fully contained in the interval, examples: // * interval = ["a", "ma", "mb", "z"], token = "", objCnt = 4, prefix = "m" @@ -354,7 +354,7 @@ func (ci *cacheInterval) get(token string, objCnt uint, params reqParams) (entri } entries = entries[:end] - if ci.last || uint(len(entries)) >= objCnt { + if ci.last || len(entries) >= int(objCnt) { return entries, true } return nil, false @@ -435,7 +435,7 @@ func (c *lsobjCache) removeInterval(ci *cacheInterval) { } } -func (c *lsobjCache) get(token string, objCnt uint, params reqParams) (entries cmn.LsoEntries, hasEnough bool) { +func (c *lsobjCache) get(token string, objCnt int64, params reqParams) (entries cmn.LsoEntries, hasEnough bool) { c.mtx.RLock() if interval := c.findInterval(token); interval != nil { entries, hasEnough = interval.get(token, objCnt, params) @@ -444,13 +444,13 @@ func (c *lsobjCache) get(token string, objCnt uint, params reqParams) (entries c return } -func (c *lsobjCache) set(token string, entries cmn.LsoEntries, size uint) { +func (c *lsobjCache) set(token string, entries cmn.LsoEntries, size int64) { var ( end *cacheInterval cur = &cacheInterval{ token: token, entries: entries, - last: uint(len(entries)) < size, + last: len(entries) < int(size), lastAccess: mono.NanoTime(), } ) @@ -473,7 +473,7 @@ func (c *lsobjCache) invalidate() { // lsobjCaches // ///////////////// -func (c *lsobjCaches) get(reqID cacheReqID, token string, objCnt uint) (entries cmn.LsoEntries, hasEnough bool) { +func (c *lsobjCaches) get(reqID cacheReqID, token string, objCnt int64) (entries cmn.LsoEntries, hasEnough bool) { if v, ok := c.caches.Load(reqID); ok { if entries, hasEnough = v.(*lsobjCache).get(token, objCnt, reqParams{}); hasEnough { return @@ -494,7 +494,7 @@ func (c *lsobjCaches) get(reqID cacheReqID, token string, objCnt uint) (entries return nil, false } -func (c *lsobjCaches) set(reqID cacheReqID, token string, entries cmn.LsoEntries, size uint) { +func (c *lsobjCaches) set(reqID cacheReqID, token string, entries cmn.LsoEntries, size int64) { v, _ := c.caches.LoadOrStore(reqID, &lsobjCache{}) v.(*lsobjCache).set(token, entries, size) } diff --git a/ais/prxs3.go b/ais/prxs3.go index b67856ef43c..e5400cdf9d3 100644 --- a/ais/prxs3.go +++ b/ais/prxs3.go @@ -346,7 +346,7 @@ func (p *proxy) listObjectsS3(w http.ResponseWriter, r *http.Request, bucket str // - "encoding-type" s3.FillLsoMsg(q, lsmsg) - lst, err := p.lsAllPagesS3(bck, amsg, lsmsg) + lst, err := p.lsAllPagesS3(bck, amsg, lsmsg, r.Header) if cmn.Rom.FastV(5, cos.SmoduleS3) { nlog.Infoln("lsoS3", bck.Cname(""), len(lst.Entries), err) } @@ -371,11 +371,11 @@ func (p *proxy) listObjectsS3(w http.ResponseWriter, r *http.Request, bucket str lst = nil } -func (p *proxy) lsAllPagesS3(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg) (lst *cmn.LsoResult, _ error) { +func (p *proxy) lsAllPagesS3(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg, hdr http.Header) (lst *cmn.LsoResult, _ error) { smap := p.owner.smap.get() for pageNum := 1; ; pageNum++ { beg := mono.NanoTime() - page, err := p.lsPage(bck, amsg, lsmsg, smap) + page, err := p.lsPage(bck, amsg, lsmsg, hdr, smap) if err != nil { return lst, err } diff --git a/ais/s3/types.go b/ais/s3/types.go index 7cd5bf9c970..c6e326832be 100644 --- a/ais/s3/types.go +++ b/ais/s3/types.go @@ -120,7 +120,7 @@ func ObjName(items []string) string { return path.Join(items[1:]...) } func FillLsoMsg(query url.Values, msg *apc.LsoMsg) { mxStr := query.Get(QparamMaxKeys) if pageSize, err := strconv.Atoi(mxStr); err == nil && pageSize > 0 { - msg.PageSize = uint(pageSize) + msg.PageSize = int64(pageSize) } if prefix := query.Get(QparamPrefix); prefix != "" { msg.Prefix = prefix diff --git a/ais/test/bucket_test.go b/ais/test/bucket_test.go index aea42b2b767..4d9a403e8ef 100644 --- a/ais/test/bucket_test.go +++ b/ais/test/bucket_test.go @@ -778,7 +778,7 @@ func TestListObjectsGoBack(t *testing.T) { tlog.Logln("listing couple pages to move iterator on targets") for page := 0; page < m.num/int(msg.PageSize); page++ { tokens = append(tokens, msg.ContinuationToken) - objPage, err := api.ListObjectsPage(baseParams, m.bck, msg) + objPage, err := api.ListObjectsPage(baseParams, m.bck, msg, api.ListArgs{}) tassert.CheckFatal(t, err) expectedEntries = append(expectedEntries, objPage.Entries...) } @@ -787,7 +787,7 @@ func TestListObjectsGoBack(t *testing.T) { for i := len(tokens) - 1; i >= 0; i-- { msg.ContinuationToken = tokens[i] - objPage, err := api.ListObjectsPage(baseParams, m.bck, msg) + objPage, err := api.ListObjectsPage(baseParams, m.bck, msg, api.ListArgs{}) tassert.CheckFatal(t, err) entries = append(entries, objPage.Entries...) } @@ -852,7 +852,7 @@ func TestListObjectsRerequestPage(t *testing.T) { prevToken := msg.ContinuationToken for i := 0; i < rerequests; i++ { msg.ContinuationToken = prevToken - objList, err = api.ListObjectsPage(baseParams, m.bck, msg) + objList, err = api.ListObjectsPage(baseParams, m.bck, msg, api.ListArgs{}) tassert.CheckFatal(t, err) } totalCnt += len(objList.Entries) @@ -1119,7 +1119,7 @@ func TestListObjectsRandProxy(t *testing.T) { } for { baseParams := tools.BaseAPIParams() - objList, err := api.ListObjectsPage(baseParams, m.bck, msg) + objList, err := api.ListObjectsPage(baseParams, m.bck, msg, api.ListArgs{}) tassert.CheckFatal(t, err) totalCnt += len(objList.Entries) if objList.ContinuationToken == "" { @@ -1158,15 +1158,15 @@ func TestListObjectsRandPageSize(t *testing.T) { defer m.del() } for { - msg.PageSize = uint(rand.Intn(50) + 50) + msg.PageSize = rand.Int63n(50) + 50 - objList, err := api.ListObjectsPage(baseParams, m.bck, msg) + objList, err := api.ListObjectsPage(baseParams, m.bck, msg, api.ListArgs{}) tassert.CheckFatal(t, err) totalCnt += len(objList.Entries) if objList.ContinuationToken == "" { break } - tassert.Errorf(t, uint(len(objList.Entries)) == msg.PageSize, "wrong page size %d (expected %d)", + tassert.Errorf(t, len(objList.Entries) == int(msg.PageSize), "wrong page size %d (expected %d)", len(objList.Entries), msg.PageSize, ) } @@ -1203,11 +1203,11 @@ func TestListObjects(t *testing.T) { } tests := []struct { - pageSize uint + pageSize int64 }{ {pageSize: 0}, {pageSize: 2000}, - {pageSize: uint(rand.Intn(15000))}, + {pageSize: rand.Int63n(15000)}, } for _, test := range tests { @@ -1409,8 +1409,8 @@ func TestListObjectsPrefix(t *testing.T) { tests := []struct { name string prefix string - pageSize uint - limit uint + pageSize int64 + limit int64 expected int }{ { @@ -1504,7 +1504,7 @@ func TestListObjectsCache(t *testing.T) { for iter := 0; iter < totalIters; iter++ { var ( started = time.Now() - msg = &apc.LsoMsg{PageSize: uint(rand.Intn(20)) + 4} + msg = &apc.LsoMsg{PageSize: rand.Int63n(20) + 4} ) if useCache { msg.SetFlag(apc.UseListObjsCache) @@ -3438,7 +3438,7 @@ func TestBucketListAndSummary(t *testing.T) { apc.ActSummaryBck, xid, m.bck, summary.ObjCount, expectedFiles) } } else { - msg := &apc.LsoMsg{PageSize: uint(min(m.num/3, 256))} // mult. pages + msg := &apc.LsoMsg{PageSize: int64(min(m.num/3, 256))} // mult. pages if test.cached { msg.Flags = apc.LsObjCached } diff --git a/ais/test/regression_test.go b/ais/test/regression_test.go index 7a2d99bc809..5ae2c31127a 100644 --- a/ais/test/regression_test.go +++ b/ais/test/regression_test.go @@ -76,7 +76,7 @@ func TestListObjectsLocalGetLocation(t *testing.T) { m.puts() msg := &apc.LsoMsg{Props: apc.GetPropsLocation} - lst, err := api.ListObjects(baseParams, m.bck, msg, api.ListArgs{Limit: uint(m.num)}) + lst, err := api.ListObjects(baseParams, m.bck, msg, api.ListArgs{Limit: int64(m.num)}) tassert.CheckFatal(t, err) if len(lst.Entries) != m.num { @@ -128,7 +128,7 @@ func TestListObjectsLocalGetLocation(t *testing.T) { // Ensure no target URLs are returned when the property is not requested msg.Props = "" - lst, err = api.ListObjects(baseParams, m.bck, msg, api.ListArgs{Limit: uint(m.num)}) + lst, err = api.ListObjects(baseParams, m.bck, msg, api.ListArgs{Limit: int64(m.num)}) tassert.CheckFatal(t, err) if len(lst.Entries) != m.num { diff --git a/ais/tgtbck.go b/ais/tgtbck.go index b893e8555f8..b51f10a742f 100644 --- a/ais/tgtbck.go +++ b/ais/tgtbck.go @@ -266,12 +266,12 @@ func (t *target) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.B var ( xctn core.Xact - rns = xreg.RenewLso(bck, lsmsg.UUID, lsmsg) + rns = xreg.RenewLso(bck, lsmsg.UUID, lsmsg, r.Header) ) // check that xaction hasn't finished prior to this page read, restart if needed if rns.Err == xs.ErrGone { runtime.Gosched() - rns = xreg.RenewLso(bck, lsmsg.UUID, lsmsg) + rns = xreg.RenewLso(bck, lsmsg.UUID, lsmsg, r.Header) } if rns.Err != nil { t.writeErr(w, r, rns.Err) diff --git a/api/apc/headers.go b/api/apc/headers.go index b7646722ff8..facc0ccdd2d 100644 --- a/api/apc/headers.go +++ b/api/apc/headers.go @@ -1,25 +1,31 @@ // Package apc: API control messages and constants /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved. */ package apc import "strings" +// AIS http header conventions: +// - always starts with the prefix "ais-" +// - all words separated with "-" +// - no '.' periods, no underscores. // For standard and provider-specific HTTP headers, see cmn/cos/const_http.go const HdrError = "Hdr-Error" -// Header Key conventions: -// - starts with a prefix "ais-", -// - all words separated with "-": no dots and underscores. const ( HeaderPrefix = "ais-" + // bucket inventory - an alternative way to list (very large) buckets + HdrInventory = HeaderPrefix + "bucket-inventory" // must be present and must be "true" (or "y", "yes", "on" case-insensitive) + HdrInvName = HeaderPrefix + "inv-name" // optional; name of the inventory (to override the system default) + HdrInvID = HeaderPrefix + "inv-id" // optional; inventory ID (ditto) + // GET via x-blob-download - HdrBlobDownload = HeaderPrefix + ActBlobDl - HdrBlobChunk = HeaderPrefix + "blob-chunk" // e.g., 1mb, 2MIB, 3m, or 1234567 (bytes) - HdrBlobWorkers = HeaderPrefix + "blob-workers" // (dfltNumWorkers in xs/blob_download.go) + HdrBlobDownload = HeaderPrefix + ActBlobDl // must be present and must be "true" (or "y", "yes", "on" case-insensitive) + HdrBlobChunk = HeaderPrefix + "blob-chunk" // optional; e.g., 1mb, 2MIB, 3m, or 1234567 (bytes) + HdrBlobWorkers = HeaderPrefix + "blob-workers" // optional; the default number of workers is dfltNumWorkers in xs/blob_download.go // Bucket props headers HdrBucketProps = HeaderPrefix + "bucket-props" // => cmn.Bprops diff --git a/api/apc/lsmsg.go b/api/apc/lsmsg.go index 209a88d04b3..d4bb8e7580b 100644 --- a/api/apc/lsmsg.go +++ b/api/apc/lsmsg.go @@ -5,6 +5,7 @@ package apc import ( + "net/http" "strings" "github.com/NVIDIA/aistore/cmn/cos" @@ -77,9 +78,6 @@ const ( // and if it does: // - check whether remote version differs from its in-cluster copy LsVerChanged - - // (new & experimental) - LsInventory ) // max page sizes @@ -143,15 +141,16 @@ var ( ) type LsoMsg struct { - UUID string `json:"uuid"` // ID to identify a single multi-page request - Props string `json:"props"` // comma-delimited, e.g. "checksum,size,custom" (see GetProps* enum) - TimeFormat string `json:"time_format"` // RFC822 is the default - Prefix string `json:"prefix"` // return obj names starting with prefix (TODO: e.g. "A.tar/tutorials/") - StartAfter string `json:"start_after"` // start listing after (AIS buckets only) - ContinuationToken string `json:"continuation_token"` // => LsoResult.ContinuationToken => LsoMsg.ContinuationToken - SID string `json:"target"` // selected target to solely execute backend.list-objects - Flags uint64 `json:"flags,string"` // enum {LsObjCached, ...} - "LsoMsg flags" above - PageSize uint `json:"pagesize"` // max entries returned by list objects call + UUID string `json:"uuid"` // ID to identify a single multi-page request + Props string `json:"props"` // comma-delimited, e.g. "checksum,size,custom" (see GetProps* enum) + TimeFormat string `json:"time_format,omitempty"` // RFC822 is the default + Prefix string `json:"prefix"` // return obj names starting with prefix (TODO: e.g. "A.tar/tutorials/") + StartAfter string `json:"start_after,omitempty"` // start listing after (AIS buckets only) + ContinuationToken string `json:"continuation_token"` // => LsoResult.ContinuationToken => LsoMsg.ContinuationToken + SID string `json:"target"` // selected target to solely execute backend.list-objects + Flags uint64 `json:"flags,string"` // enum {LsObjCached, ...} - "LsoMsg flags" above + PageSize int64 `json:"pagesize"` // max entries returned by list objects call + Header http.Header `json:"hdr,omitempty"` // (for pointers, see `ListArgs` in api/ls.go) } //////////// diff --git a/api/env/aws.go b/api/env/aws.go index c9bc37308b5..40cee2221aa 100644 --- a/api/env/aws.go +++ b/api/env/aws.go @@ -15,13 +15,6 @@ func AwsDefaultRegion() (region string) { return region } -func BucketInventory() (inv string) { - if inv = os.Getenv(AWS.Inventory); inv == "" { - inv = "inv-all" // TODO -- FIXME: change to ".inventory" - } - return inv -} - // use S3_ENDPOINT to globally override the default 'https://s3.amazonaws.com' endpoint // NOTE: the same can be done on a per-bucket basis, via bucket prop `Extra.AWS.Endpoint` // (bucket override will always take precedence) @@ -35,9 +28,8 @@ var ( Profile string Inventory string }{ - Endpoint: "S3_ENDPOINT", - Region: "AWS_REGION", - Profile: "AWS_PROFILE", - Inventory: "S3_BUCKET_INVENTORY", + Endpoint: "S3_ENDPOINT", + Region: "AWS_REGION", + Profile: "AWS_PROFILE", } ) diff --git a/api/ls.go b/api/ls.go index 7c7de6079e6..760b74cd919 100644 --- a/api/ls.go +++ b/api/ls.go @@ -39,7 +39,8 @@ type ( ListArgs struct { Callback LsoCB CallAfter time.Duration - Limit uint + Header http.Header // to optimize listing very large buckets, e.g.: Header.Set(apc.HdrInventory, "true") + Limit int64 } ) @@ -103,30 +104,15 @@ func QueryBuckets(bp BaseParams, qbck cmn.QueryBcks, fltPresence int) (bool, err // listed page along with associated _continuation token_. // // See also: -// - `ListObjectsPage` -// - usage examples in CLI docs under docs/cli. +// - docs/cli/* for CLI usage examples +// - `apc.LsoMsg` +// - `api.ListObjectsPage` func ListObjects(bp BaseParams, bck cmn.Bck, lsmsg *apc.LsoMsg, args ListArgs) (*cmn.LsoResult, error) { - var ( - path = apc.URLPathBuckets.Join(bck.Name) - hdr = http.Header{ - cos.HdrAccept: []string{cos.ContentMsgPack}, - cos.HdrContentType: []string{cos.ContentJSON}, - } - ) - bp.Method = http.MethodGet + reqParams := lsoReq(bp, bck, &args) if lsmsg == nil { lsmsg = &apc.LsoMsg{} - } - lsmsg.UUID = "" - lsmsg.ContinuationToken = "" - reqParams := AllocRp() - { - reqParams.BaseParams = bp - reqParams.Path = path - reqParams.Header = hdr - reqParams.Query = bck.AddToQuery(nil) - - reqParams.buf = allocMbuf() // mem-pool msgpack + } else { + lsmsg.UUID, lsmsg.ContinuationToken = "", "" // new } lst, err := lso(reqParams, lsmsg, args) @@ -135,6 +121,25 @@ func ListObjects(bp BaseParams, bck cmn.Bck, lsmsg *apc.LsoMsg, args ListArgs) ( return lst, err } +func lsoReq(bp BaseParams, bck cmn.Bck, args *ListArgs) *ReqParams { + hdr := args.Header + if hdr == nil { + hdr = make(http.Header, 2) + } + hdr.Set(cos.HdrAccept, cos.ContentMsgPack) + hdr.Set(cos.HdrContentType, cos.ContentJSON) + bp.Method = http.MethodGet + reqParams := AllocRp() + { + reqParams.BaseParams = bp + reqParams.Path = apc.URLPathBuckets.Join(bck.Name) + reqParams.Header = hdr + reqParams.Query = bck.NewQuery() + reqParams.buf = allocMbuf() // msgpack + } + return reqParams +} + // `toRead` holds the remaining number of objects to list (that is, unless we are listing // the entire bucket). Each iteration lists a page of objects and reduces `toRead` // accordingly. When the latter gets below page size, we perform the final @@ -180,7 +185,7 @@ func lso(reqParams *ReqParams, lsmsg *apc.LsoMsg, args ListArgs) (lst *cmn.LsoRe if page.ContinuationToken == "" { // listed all pages break } - toRead = uint(max(int(toRead)-len(page.Entries), 0)) + toRead = max(toRead-int64(len(page.Entries)), 0) lsmsg.ContinuationToken = page.ContinuationToken } return lst, nil @@ -206,29 +211,18 @@ func lsoPage(reqParams *ReqParams) (_ *cmn.LsoResult, err error) { // ListObjectsPage returns the first page of bucket objects. // On success the function updates `lsmsg.ContinuationToken` which client then can reuse // to fetch the next page. -// See also: CLI and CLI usage examples -// See also: `apc.LsoMsg` -// See also: `api.ListObjectsInvalidateCache` -// See also: `api.ListObjects` -func ListObjectsPage(bp BaseParams, bck cmn.Bck, lsmsg *apc.LsoMsg) (*cmn.LsoResult, error) { - bp.Method = http.MethodGet +// See also: +// - docs/cli/* for CLI usage examples +// - `apc.LsoMsg` +// - `api.ListObjects` +func ListObjectsPage(bp BaseParams, bck cmn.Bck, lsmsg *apc.LsoMsg, args ListArgs) (*cmn.LsoResult, error) { + reqParams := lsoReq(bp, bck, &args) if lsmsg == nil { lsmsg = &apc.LsoMsg{} } actMsg := apc.ActMsg{Action: apc.ActList, Value: lsmsg} - reqParams := AllocRp() - { - reqParams.BaseParams = bp - reqParams.Path = apc.URLPathBuckets.Join(bck.Name) - reqParams.Header = http.Header{ - cos.HdrAccept: []string{cos.ContentMsgPack}, - cos.HdrContentType: []string{cos.ContentJSON}, - } - reqParams.Query = bck.AddToQuery(url.Values{}) - reqParams.Body = cos.MustMarshal(actMsg) + reqParams.Body = cos.MustMarshal(actMsg) - reqParams.buf = allocMbuf() // mem-pool msgpack - } // no need to preallocate bucket entries slice (msgpack does it) page := &cmn.LsoResult{} _, err := reqParams.DoReqAny(page) diff --git a/bench/microbenchmarks/apitests/listobj_test.go b/bench/microbenchmarks/apitests/listobj_test.go index 937788000c8..3620e94ca9f 100644 --- a/bench/microbenchmarks/apitests/listobj_test.go +++ b/bench/microbenchmarks/apitests/listobj_test.go @@ -19,8 +19,8 @@ import ( ) type testConfig struct { - objectCnt uint - pageSize uint + objectCnt int + pageSize int useCache bool } @@ -31,14 +31,14 @@ func (tc testConfig) name() string { ) } -func createAndFillBucket(b *testing.B, objCnt uint, u string) cmn.Bck { +func createAndFillBucket(b *testing.B, objCnt int, u string) cmn.Bck { const workerCount = 10 var ( bck = cmn.Bck{Name: trand.String(10), Provider: apc.AIS} baseParams = tools.BaseAPIParams(u) wg = &sync.WaitGroup{} - objCntPerWorker = int(objCnt) / workerCount + objCntPerWorker = objCnt / workerCount ) tools.CreateBucket(b, baseParams.URL, bck, nil, true /*cleanup*/) @@ -82,14 +82,14 @@ func BenchmarkListObject(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - msg := &apc.LsoMsg{PageSize: test.pageSize} + msg := &apc.LsoMsg{PageSize: int64(test.pageSize)} if test.useCache { msg.SetFlag(apc.UseListObjsCache) } objs, err := api.ListObjects(baseParams, bck, msg, api.ListArgs{}) tassert.CheckFatal(b, err) tassert.Errorf( - b, len(objs.Entries) == int(test.objectCnt), + b, len(objs.Entries) == test.objectCnt, "expected %d objects got %d", test.objectCnt, len(objs.Entries), ) } diff --git a/cmd/cli/cli/get.go b/cmd/cli/cli/get.go index 557370d683c..0e2fe1d4bf2 100644 --- a/cmd/cli/cli/get.go +++ b/cmd/cli/cli/get.go @@ -184,7 +184,7 @@ func getMultiObj(c *cli.Context, bck cmn.Bck, archpath, outFile string, extract } } - // setup list-objects control msg and api call + // setup lsmsg msg := &apc.LsoMsg{Prefix: prefix} msg.AddProps(apc.GetPropsMinimal...) if flagIsSet(c, listArchFlag) || extract || archpath != "" { @@ -193,17 +193,23 @@ func getMultiObj(c *cli.Context, bck cmn.Bck, archpath, outFile string, extract if flagIsSet(c, getObjCachedFlag) { msg.SetFlag(apc.LsObjCached) } - if flagIsSet(c, useInventoryFlag) { - msg.SetFlag(apc.LsInventory) - } pageSize, limit, err := _setPage(c, bck) if err != nil { return err } - msg.PageSize = uint(pageSize) + msg.PageSize = pageSize + + // setup lsargs + lsargs := api.ListArgs{Limit: limit} + if flagIsSet(c, useInventoryFlag) { + lsargs.Header = http.Header{ + apc.HdrInventory: []string{"true"}, + apc.HdrInvName: []string{"inv-all"}, // TODO -- FIXME: remove; provide via flag + } + } // list-objects - objList, err := api.ListObjects(apiBP, bck, msg, api.ListArgs{Limit: uint(limit)}) + objList, err := api.ListObjects(apiBP, bck, msg, lsargs) if err != nil { return V(err) } @@ -244,8 +250,8 @@ func getMultiObj(c *cli.Context, bck cmn.Bck, archpath, outFile string, extract out = " to standard output" } else { out = outFile - if out != "" && cos.IsLastB(out, filepath.Separator) { - out = out[:len(out)-1] + if out != "" { + out = cos.TrimLastB(out, filepath.Separator) } } if flagIsSet(c, lengthFlag) { @@ -536,15 +542,11 @@ func getObject(c *cli.Context, bck cmn.Bck, objName, archpath, outFile string, q out = " to standard output" case extract: out = " to " + outFile - if cos.IsLastB(out, filepath.Separator) { - out = out[:len(out)-1] - } + out = cos.TrimLastB(out, filepath.Separator) out = strings.TrimSuffix(out, mime) + "/" default: out = " as " + outFile - if cos.IsLastB(out, filepath.Separator) { - out = out[:len(out)-1] - } + out = cos.TrimLastB(out, filepath.Separator) } switch { case flagIsSet(c, lengthFlag): diff --git a/cmd/cli/cli/ls.go b/cmd/cli/cli/ls.go index 5c90b19ebe4..8acf6d72e08 100644 --- a/cmd/cli/cli/ls.go +++ b/cmd/cli/cli/ls.go @@ -308,9 +308,6 @@ func listObjects(c *cli.Context, bck cmn.Bck, prefix string, listArch bool) erro if listArch { msg.SetFlag(apc.LsArchDir) } - if flagIsSet(c, useInventoryFlag) { - msg.SetFlag(apc.LsInventory) - } var ( props []string @@ -372,13 +369,22 @@ func listObjects(c *cli.Context, bck cmn.Bck, prefix string, listArch bool) erro if err != nil { return err } - msg.PageSize = uint(pageSize) + msg.PageSize = pageSize - // list page by page, print pages one at a time + // finally, setup lsargs + lsargs := api.ListArgs{Limit: limit} + if flagIsSet(c, useInventoryFlag) { + lsargs.Header = http.Header{ + apc.HdrInventory: []string{"true"}, + apc.HdrInvName: []string{"inv-all"}, // TODO -- FIXME: remove; provide via flag + } + } + + // list (and immediately show) pages, one page at a time if flagIsSet(c, pagedFlag) { - pageCounter, maxPages, toShow := 0, parseIntFlag(c, maxPagesFlag), limit + pageCounter, maxPages, toShow := 0, parseIntFlag(c, maxPagesFlag), int(limit) for { - objList, err := api.ListObjectsPage(apiBP, bck, msg) + objList, err := api.ListObjectsPage(apiBP, bck, msg, lsargs) if err != nil { return lsoErr(msg, err) } @@ -417,7 +423,7 @@ func listObjects(c *cli.Context, bck cmn.Bck, prefix string, listArch bool) erro } } - // list all pages up to a limit, show progress + // alternatively (when `--paged` not specified) list all pages up to a limit, show progress var ( callAfter = listObjectsWaitTime u = &_listed{c: c, bck: &bck} @@ -425,8 +431,9 @@ func listObjects(c *cli.Context, bck cmn.Bck, prefix string, listArch bool) erro if flagIsSet(c, refreshFlag) { callAfter = parseDurationFlag(c, refreshFlag) } - args := api.ListArgs{Callback: u.cb, CallAfter: callAfter, Limit: uint(limit)} - objList, err := api.ListObjects(apiBP, bck, msg, args) + lsargs.Callback = u.cb + lsargs.CallAfter = callAfter + objList, err := api.ListObjects(apiBP, bck, msg, lsargs) if err != nil { return lsoErr(msg, err) } @@ -444,22 +451,22 @@ func lsoErr(msg *apc.LsoMsg, err error) error { return V(err) } -func _setPage(c *cli.Context, bck cmn.Bck) (pageSize, limit int, err error) { +func _setPage(c *cli.Context, bck cmn.Bck) (pageSize, limit int64, err error) { b := meta.CloneBck(&bck) if flagIsSet(c, pageSizeFlag) { - pageSize = parseIntFlag(c, pageSizeFlag) + pageSize = int64(parseIntFlag(c, pageSizeFlag)) if pageSize < 0 { err = fmt.Errorf("invalid %s: page size (%d) cannot be negative", qflprn(pageSizeFlag), pageSize) return } - if uint(pageSize) > b.MaxPageSize() { + if pageSize > b.MaxPageSize() { if b.Props == nil { if b.Props, err = headBucket(bck, true /* don't add */); err != nil { return } } // still? - if uint(pageSize) > b.MaxPageSize() { + if pageSize > b.MaxPageSize() { err = fmt.Errorf("invalid %s: page size (%d) cannot exceed the maximum (%d)", qflprn(pageSizeFlag), pageSize, b.MaxPageSize()) return @@ -467,7 +474,7 @@ func _setPage(c *cli.Context, bck cmn.Bck) (pageSize, limit int, err error) { } } - limit = parseIntFlag(c, objLimitFlag) + limit = int64(parseIntFlag(c, objLimitFlag)) if limit < 0 { err = fmt.Errorf("invalid %s: max number of listed objects (%d) cannot be negative", qflprn(objLimitFlag), limit) return @@ -477,7 +484,7 @@ func _setPage(c *cli.Context, bck cmn.Bck) (pageSize, limit int, err error) { } // when limit "wins" - if limit < pageSize || (uint(limit) < b.MaxPageSize() && pageSize == 0) { + if limit < pageSize || (limit < b.MaxPageSize() && pageSize == 0) { pageSize = limit } return @@ -647,9 +654,14 @@ func (u *_listed) cb(ctx *api.LsoCounter) { s := "Listed " + cos.FormatBigNum(ctx.Count()) + " objects" if u.l == 0 { u.l = len(s) + 3 - if u.bck.IsRemote() && !flagIsSet(u.c, listObjCachedFlag) { - note := fmt.Sprintf("listing remote objects in %s may take a while (tip: use %s to speed up)\n", - u.bck.Cname(""), qflprn(listObjCachedFlag)) + if u.bck.IsRemote() { + var tip string + if flagIsSet(u.c, listObjCachedFlag) { + tip = fmt.Sprintf("use %s to show pages immediately - one page at a time", qflprn(pagedFlag)) + } else { + tip = fmt.Sprintf("use %s to speed up and/or %s to show pages", qflprn(listObjCachedFlag), qflprn(pagedFlag)) + } + note := fmt.Sprintf("listing remote objects in %s may take a while\n(Tip: %s)\n", u.bck.Cname(""), tip) actionNote(u.c, note) } } else if len(s) > u.l { diff --git a/cmd/cli/cli/utils.go b/cmd/cli/cli/utils.go index 59f217cdb19..a8c1cb9e8ad 100644 --- a/cmd/cli/cli/utils.go +++ b/cmd/cli/cli/utils.go @@ -657,7 +657,7 @@ func isBucketEmpty(bck cmn.Bck, cached bool) (bool, error) { msg.SetFlag(apc.LsObjCached) } msg.SetFlag(apc.LsNameOnly) - objList, err := api.ListObjectsPage(apiBP, bck, msg) + objList, err := api.ListObjectsPage(apiBP, bck, msg, api.ListArgs{}) if err != nil { return false, V(err) } diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index 63519c1d6fd..672a8b16fe5 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli go 1.22 require ( - github.com/NVIDIA/aistore v1.3.23-0.20240307002008-c965c8a7d89c + github.com/NVIDIA/aistore v1.3.23-0.20240308185015-d250511d14a5 github.com/fatih/color v1.16.0 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo v1.16.5 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index 7f4eef00996..43474182873 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -1,7 +1,7 @@ code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.23-0.20240307002008-c965c8a7d89c h1:sHfN0IUduCYqpLtyIvNlVm5LhSOJi3pyHXk0EvRFWKk= -github.com/NVIDIA/aistore v1.3.23-0.20240307002008-c965c8a7d89c/go.mod h1:V9DB29cNiY0k7bd5f5uBBKprzMiyk4ZdR4wdIsFjt3E= +github.com/NVIDIA/aistore v1.3.23-0.20240308185015-d250511d14a5 h1:J3bZmSGL5/IPPxyWSoyoi6fqxp1G7zJW6SCq0hGHF/4= +github.com/NVIDIA/aistore v1.3.23-0.20240308185015-d250511d14a5/go.mod h1:V9DB29cNiY0k7bd5f5uBBKprzMiyk4ZdR4wdIsFjt3E= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= diff --git a/cmn/api.go b/cmn/api.go index eb0fb929523..9032aafef27 100644 --- a/cmn/api.go +++ b/cmn/api.go @@ -87,13 +87,13 @@ type ( // - https://docs.aws.amazon.com/cli/latest/userguide/cli-usage-pagination.html#cli-usage-pagination-serverside // vs OpenStack Swift: 10,000 // - https://docs.openstack.org/swift/latest/api/pagination.html - MaxPageSize uint `json:"max_pagesize,omitempty"` + MaxPageSize int64 `json:"max_pagesize,omitempty"` } ExtraPropsAWSToSet struct { CloudRegion *string `json:"cloud_region"` Endpoint *string `json:"endpoint"` Profile *string `json:"profile"` - MaxPageSize *uint `json:"max_pagesize"` + MaxPageSize *int64 `json:"max_pagesize"` } ExtraPropsHTTP struct { diff --git a/cmn/bck.go b/cmn/bck.go index b811a5bb7a2..03d1f422482 100644 --- a/cmn/bck.go +++ b/cmn/bck.go @@ -402,7 +402,7 @@ func (b *Bck) HasProvider() bool { return b.Provider != "" } // func (b *Bck) NewQuery() (q url.Values) { - q = make(url.Values) + q = make(url.Values, 1) if b.Provider != "" { q.Set(apc.QparamProvider, b.Provider) } @@ -415,7 +415,7 @@ func (b *Bck) NewQuery() (q url.Values) { func (b *Bck) AddToQuery(query url.Values) url.Values { if b.Provider != "" { if query == nil { - query = make(url.Values) + query = make(url.Values, 1) } query.Set(apc.QparamProvider, b.Provider) } diff --git a/cmn/cos/convert.go b/cmn/cos/convert.go index 62f8fcaf889..2e1dab32e84 100644 --- a/cmn/cos/convert.go +++ b/cmn/cos/convert.go @@ -38,9 +38,9 @@ func ParseBool(s string) (bool, error) { // add. options s = strings.ToLower(s) switch s { - case "y", "yes", "on": + case "y", "yes", "on", "true": return true, nil - case "n", "no", "off": + case "n", "no", "off", "false": return false, nil } // gen. case diff --git a/cmn/tests/iter_fields_test.go b/cmn/tests/iter_fields_test.go index 506e6b2e68b..b8a6076827d 100644 --- a/cmn/tests/iter_fields_test.go +++ b/cmn/tests/iter_fields_test.go @@ -95,7 +95,7 @@ var _ = Describe("IterFields", func() { "extra.aws.cloud_region": "us-central", "extra.aws.endpoint": "", "extra.aws.profile": "", - "extra.aws.max_pagesize": uint(0), + "extra.aws.max_pagesize": int64(0), "access": apc.AccessAttrs(0), "features": feat.Flags(0), @@ -161,7 +161,7 @@ var _ = Describe("IterFields", func() { "extra.aws.cloud_region": (*string)(nil), "extra.aws.endpoint": (*string)(nil), "extra.aws.profile": (*string)(nil), - "extra.aws.max_pagesize": (*uint)(nil), + "extra.aws.max_pagesize": (*int64)(nil), "extra.http.original_url": (*string)(nil), }, ), diff --git a/core/backend.go b/core/backend.go index d3c07482cb4..3230d059d06 100644 --- a/core/backend.go +++ b/core/backend.go @@ -24,6 +24,8 @@ type ( ErrCode int } LsoInventoryCtx struct { + Name string + ID string Offset int64 Size int64 } diff --git a/core/meta/bck.go b/core/meta/bck.go index 091e3dfd2bf..367c1b1ef30 100644 --- a/core/meta/bck.go +++ b/core/meta/bck.go @@ -252,7 +252,7 @@ func (b *Bck) checkAccess(bit apc.AccessAttrs) (err error) { return } -func (b *Bck) MaxPageSize() uint { +func (b *Bck) MaxPageSize() int64 { switch b.Provider { case apc.AIS: return apc.MaxPageSizeAIS diff --git a/docs/blob_downloader.md b/docs/blob_downloader.md index 66478767f09..5172d42d122 100644 --- a/docs/blob_downloader.md +++ b/docs/blob_downloader.md @@ -115,7 +115,7 @@ To meet this motivation, AIS now supports `GET` request with additional (and opt | Header | Values (examples) | Comments | | --- | --- | --- | -| `ais-blob-download` | "true", "" | NOTE: to engage blob downloader, this http header must be present and must be "true" (or "yes", etc.) | +| `ais-blob-download` | "true", "" | NOTE: to engage blob downloader, this http header must be present and must be "true" (or "y", "yes", "on" case-insensitive) | | `ais-blob-chunk` | "1mb", "1234567", "128KiB" | [system defaults](#blob-downloader) above | | `ais-blob-workers` | "3", "7", "16" | ditto | diff --git a/tools/client.go b/tools/client.go index ca52d0ac76b..7ad03bdf0e6 100644 --- a/tools/client.go +++ b/tools/client.go @@ -124,7 +124,7 @@ func PutObject(t *testing.T, bck cmn.Bck, objName string, reader readers.Reader) } // ListObjectNames returns a slice of object names of all objects that match the prefix in a bucket -func ListObjectNames(proxyURL string, bck cmn.Bck, prefix string, objectCountLimit uint, cached bool) ([]string, error) { +func ListObjectNames(proxyURL string, bck cmn.Bck, prefix string, objectCountLimit int64, cached bool) ([]string, error) { var ( bp = BaseAPIParams(proxyURL) msg = &apc.LsoMsg{Prefix: prefix} diff --git a/xact/xreg/bucket.go b/xact/xreg/bucket.go index 9519956ad7f..3c0486efbd0 100644 --- a/xact/xreg/bucket.go +++ b/xact/xreg/bucket.go @@ -5,6 +5,8 @@ package xreg import ( + "net/http" + "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/debug" @@ -43,6 +45,10 @@ type ( Tag string Copies int } + LsoArgs struct { + Msg *apc.LsoMsg + Hdr http.Header + } ) ////////////// @@ -142,7 +148,11 @@ func RenewBckRename(bckFrom, bckTo *meta.Bck, uuid string, rmdVersion int64, pha return RenewBucketXact(apc.ActMoveBck, bckTo, Args{Custom: custom, UUID: uuid}) } -func RenewLso(bck *meta.Bck, uuid string, msg *apc.LsoMsg) RenewRes { - e := dreg.bckXacts[apc.ActList].New(Args{UUID: uuid, Custom: msg}, bck) +func RenewLso(bck *meta.Bck, uuid string, msg *apc.LsoMsg, hdr http.Header) RenewRes { + custom := &LsoArgs{ + Msg: msg, + Hdr: hdr, + } + e := dreg.bckXacts[apc.ActList].New(Args{UUID: uuid, Custom: custom}, bck) return dreg.renewByID(e, bck) } diff --git a/xact/xs/lso.go b/xact/xs/lso.go index a84a6d088fb..c4f8dceb68f 100644 --- a/xact/xs/lso.go +++ b/xact/xs/lso.go @@ -39,6 +39,7 @@ import ( type ( lsoFactory struct { msg *apc.LsoMsg + hdr http.Header streamingF } LsoXact struct { @@ -88,9 +89,11 @@ var ( ) func (*lsoFactory) New(args xreg.Args, bck *meta.Bck) xreg.Renewable { + custom := args.Custom.(*xreg.LsoArgs) p := &lsoFactory{ streamingF: streamingF{RenewBase: xreg.RenewBase{Args: args, Bck: bck}, kind: apc.ActList}, - msg: args.Custom.(*apc.LsoMsg), + msg: custom.Msg, + hdr: custom.Hdr, } return p } @@ -122,8 +125,8 @@ func (p *lsoFactory) Start() (err error) { debug.Assert(!r.walk.dontPopulate || p.msg.IsFlagSet(apc.LsDontAddRemote)) if r.listRemote() { - if p.msg.IsFlagSet(apc.LsInventory) { - r.ctx = &core.LsoInventoryCtx{} + if cos.IsParseBool(p.hdr.Get(apc.HdrInventory)) { + r.ctx = &core.LsoInventoryCtx{Name: p.hdr.Get(apc.HdrInvName), ID: p.hdr.Get(apc.HdrInvID)} } if !r.walk.wor { smap := core.T.Sowner().Get() @@ -329,8 +332,8 @@ func (r *LsoXact) doPage() *LsoRsp { lst = r.lastPage[idx:] page *cmn.LsoResult ) - debug.Assert(uint(len(lst)) >= cnt || r.walk.done) - if uint(len(lst)) >= cnt { + debug.Assert(int64(len(lst)) >= cnt || r.walk.done) + if int64(len(lst)) >= cnt { entries := lst[:cnt] page = &cmn.LsoResult{UUID: r.msg.UUID, Entries: entries, ContinuationToken: entries[cnt-1].Name} } else { @@ -342,21 +345,21 @@ func (r *LsoXact) doPage() *LsoRsp { // `ais show job` will report the sum of non-replicated obj numbers and // sum of obj sizes - for all visited objects // Returns the index of the first object in the page that follows the continuation `token` -func (r *LsoXact) findToken(token string) uint { +func (r *LsoXact) findToken(token string) int { if r.listRemote() && r.token == token { return 0 } - return uint(sort.Search(len(r.lastPage), func(i int) bool { // TODO: revisit + return sort.Search(len(r.lastPage), func(i int) bool { // TODO: revisit return !cmn.TokenGreaterEQ(token, r.lastPage[i].Name) - })) + }) } -func (r *LsoXact) havePage(token string, cnt uint) bool { +func (r *LsoXact) havePage(token string, cnt int64) bool { if r.walk.done { return true } idx := r.findToken(token) - return idx+cnt < uint(len(r.lastPage)) + return idx+int(cnt) < len(r.lastPage) } func (r *LsoXact) nextPageR() (err error) { @@ -490,7 +493,7 @@ func (r *LsoXact) nextPageA() { if r.havePage(r.token, r.msg.PageSize) { return } - for cnt := uint(0); cnt < r.msg.PageSize; { + for cnt := int64(0); cnt < r.msg.PageSize; { obj, ok := <-r.walk.pageCh if !ok { r.walk.done = true @@ -517,18 +520,18 @@ func (r *LsoXact) shiftLastPage(token string) { if j == 0 { return } - l := uint(len(r.lastPage)) + l := len(r.lastPage) // (all sent) if j == l { - r.gcLastPage(0, int(l)) + r.gcLastPage(0, l) r.lastPage = r.lastPage[:0] return } // otherwise, shift the not-yet-transmitted entries and fix the slice copy(r.lastPage[0:], r.lastPage[j:]) - r.gcLastPage(int(l-j), int(l)) + r.gcLastPage(l-j, l) r.lastPage = r.lastPage[:l-j] } diff --git a/xact/xs/nextpage.go b/xact/xs/nextpage.go index 43702cb7347..31e029dbec3 100644 --- a/xact/xs/nextpage.go +++ b/xact/xs/nextpage.go @@ -83,8 +83,7 @@ func (npg *npgCtx) cb(fqn string, de fs.DirEntry) error { func (npg *npgCtx) nextPageR(nentries cmn.LsoEntries, inclStatusLocalMD bool) (lst *cmn.LsoResult, err error) { debug.Assert(!npg.wi.msg.IsFlagSet(apc.LsObjCached)) lst = &cmn.LsoResult{Entries: nentries} - if npg.wi.msg.IsFlagSet(apc.LsInventory) { - debug.Assert(npg.ctx != nil) + if npg.ctx != nil { _, err = core.T.Backend(npg.bck).ListObjectsInv(npg.bck, npg.wi.msg, lst, npg.ctx) } else { _, err = core.T.Backend(npg.bck).ListObjects(npg.bck, npg.wi.msg, lst)