diff --git a/xact/xs/lrit.go b/xact/xs/lrit.go index ecbf6d1cce..c452b18904 100644 --- a/xact/xs/lrit.go +++ b/xact/xs/lrit.go @@ -262,7 +262,7 @@ func (r *lrit) _prefix(wi lrwi, smap *meta.Smap) error { ecode int lst *cmn.LsoRes lsmsg = &apc.LsoMsg{Prefix: r.prefix, Props: apc.GetPropsStatus} - npg = newNpgCtx(r.bck, lsmsg, noopCb, nil /*core.LsoInvCtx bucket inventory*/) + npg = newNpgCtx(r.bck, lsmsg, noopCb, nil /*inventory*/, nil /*bp: see below*/) bremote = r.bck.IsRemote() ) lsmsg.SetFlag(apc.LsNoDirs) @@ -272,6 +272,8 @@ func (r *lrit) _prefix(wi lrwi, smap *meta.Smap) error { } if !bremote { smap = nil // not needed + } else { + npg.bp = core.T.Backend(r.bck) } for { if r.done() { diff --git a/xact/xs/lso.go b/xact/xs/lso.go index 8797bcc741..c25286b9a9 100644 --- a/xact/xs/lso.go +++ b/xact/xs/lso.go @@ -43,29 +43,31 @@ type ( streamingF } LsoXact struct { - msg *apc.LsoMsg - msgCh chan *apc.LsoMsg // incoming requests + lpis lpi.Lpis + msg *apc.LsoMsg // first message + msgCh chan *apc.LsoMsg // next messages respCh chan *LsoRsp // responses - next pages remtCh chan *LsoRsp // remote paging by the responsible target - stopCh cos.StopCh // to stop xaction - token string // continuation token -> last responded page + ctx *core.LsoInvCtx // bucket inventory nextToken string // next continuation token -> next pages + token string // continuation token -> last responded page + stopCh cos.StopCh // to stop xaction page cmn.LsoEntries // current page (contents) walk struct { pageCh chan *cmn.LsoEnt // channel to accumulate listed object entries stopCh *cos.StopCh // to abort bucket walk wi *walkInfo // walking context and state + bp core.Backend // t.Backend(bck) wg sync.WaitGroup // wait until this walk finishes done bool // done walking (indication) wor bool // wantOnlyRemote dontPopulate bool // when listing remote obj-s: don't include local MD (in re: LsDonAddRemote) this bool // r.msg.SID == core.T.SID(): true when this target does remote paging last bool // last remote page + remote bool // list remote } streamingX - lensgl int64 - ctx *core.LsoInvCtx - lpis lpi.Lpis + lensgl int64 // channel to accumulate listed object entries } LsoRsp struct { Err error @@ -190,8 +192,11 @@ func (p *lsoFactory) beginStreams(r *LsoXact) error { func (r *LsoXact) Run(wg *sync.WaitGroup) { wg.Done() - if !lsoIsRemote(r.p.Bck, r.msg.IsFlagSet(apc.LsCached)) { + r.walk.remote = lsoIsRemote(r.p.Bck, r.msg.IsFlagSet(apc.LsCached)) + if !r.walk.remote { r.initWalk() + } else { + r.walk.bp = core.T.Backend(r.p.Bck) } loop: for { @@ -203,8 +208,9 @@ loop: r.msg.PageSize = msg.PageSize // cannot change - debug.Assert((r.msg.SID == core.T.SID()) == r.walk.this) - debug.Assert(r.walk.wor == r.msg.WantOnlyRemoteProps()) + debug.Assert(r.msg.SID == msg.SID, r.msg.SID, " vs ", msg.SID) + debug.Assert(r.walk.wor == msg.WantOnlyRemoteProps(), msg.Str(r.p.Bck.Cname(""))) + debug.Assert(r.walk.remote == lsoIsRemote(r.p.Bck, msg.IsFlagSet(apc.LsCached)), msg.Str(r.p.Bck.Cname(""))) r.IncPending() resp := r.doPage() @@ -345,7 +351,7 @@ func (r *LsoXact) Do(msg *apc.LsoMsg) *LsoRsp { } func (r *LsoXact) doPage() *LsoRsp { - if lsoIsRemote(r.p.Bck, r.msg.IsFlagSet(apc.LsCached)) { + if r.walk.remote { if r.msg.ContinuationToken == "" || r.msg.ContinuationToken != r.token { // can't extract the next-to-list object name from the remotely generated // continuation token, keeping and returning the entire last page @@ -405,7 +411,7 @@ func (r *LsoXact) havePage(token string, cnt int64) bool { func (r *LsoXact) nextPageR() (err error) { var ( page *cmn.LsoRes - npg = newNpgCtx(r.p.Bck, r.msg, r.LomAdd, r.ctx) + npg = newNpgCtx(r.p.Bck, r.msg, r.LomAdd, r.ctx, r.walk.bp) smap = core.T.Sowner().Get() tsi = smap.GetActiveNode(r.msg.SID) ) diff --git a/xact/xs/nextpage.go b/xact/xs/nextpage.go index 2260f26391..89599ab403 100644 --- a/xact/xs/nextpage.go +++ b/xact/xs/nextpage.go @@ -18,6 +18,7 @@ import ( ) type npgCtx struct { + bp core.Backend bck *meta.Bck ctx *core.LsoInvCtx wi walkInfo @@ -25,8 +26,9 @@ type npgCtx struct { idx int } -func newNpgCtx(bck *meta.Bck, msg *apc.LsoMsg, cb lomVisitedCb, ctx *core.LsoInvCtx) (npg *npgCtx) { +func newNpgCtx(bck *meta.Bck, msg *apc.LsoMsg, cb lomVisitedCb, ctx *core.LsoInvCtx, bp core.Backend) (npg *npgCtx) { npg = &npgCtx{ + bp: bp, bck: bck, wi: walkInfo{ msg: msg.Clone(), @@ -89,13 +91,13 @@ func (npg *npgCtx) nextPageR(nentries cmn.LsoEntries) (lst *cmn.LsoRes, err erro lst = &cmn.LsoRes{Entries: nentries} if npg.ctx != nil { if npg.ctx.Lom == nil { - _, err = core.T.Backend(npg.bck).GetBucketInv(npg.bck, npg.ctx) + _, err = npg.bp.GetBucketInv(npg.bck, npg.ctx) } if err == nil { - err = core.T.Backend(npg.bck).ListObjectsInv(npg.bck, npg.wi.msg, lst, npg.ctx) + err = npg.bp.ListObjectsInv(npg.bck, npg.wi.msg, lst, npg.ctx) } } else { - _, err = core.T.Backend(npg.bck).ListObjects(npg.bck, npg.wi.msg, lst) + _, err = npg.bp.ListObjects(npg.bck, npg.wi.msg, lst) } if err != nil { freeLsoEntries(nentries) diff --git a/xact/xs/nsumm.go b/xact/xs/nsumm.go index e13f1ec4a0..20f8fec192 100644 --- a/xact/xs/nsumm.go +++ b/xact/xs/nsumm.go @@ -351,8 +351,9 @@ func (r *XactNsumm) visitObj(lom *core.LOM, _ []byte) error { func (r *XactNsumm) runCloudBck(bck *meta.Bck, res *cmn.BsummResult) { lsmsg := &apc.LsoMsg{Props: apc.GetPropsSize, Prefix: r.p.msg.Prefix} lsmsg.SetFlag(apc.LsNameSize | apc.LsNoDirs) + bp := core.T.Backend(bck) for !r.IsAborted() { - npg := newNpgCtx(bck, lsmsg, noopCb, nil) // TODO: inventory offset + npg := newNpgCtx(bck, lsmsg, noopCb, nil, bp) // TODO: inventory offset nentries := allocLsoEntries() lst, err := npg.nextPageR(nentries) if err != nil {