Skip to content

Commit

Permalink
list-objects and friends: micro-optimizations
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Mar 8, 2025
1 parent 753b928 commit 3206a6b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 18 deletions.
4 changes: 3 additions & 1 deletion xact/xs/lrit.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (r *lrit) _prefix(wi lrwi, smap *meta.Smap) error {
ecode int
lst *cmn.LsoRes
lsmsg = &apc.LsoMsg{Prefix: r.prefix, Props: apc.GetPropsStatus}
npg = newNpgCtx(r.bck, lsmsg, noopCb, nil /*core.LsoInvCtx bucket inventory*/)
npg = newNpgCtx(r.bck, lsmsg, noopCb, nil /*inventory*/, nil /*bp: see below*/)
bremote = r.bck.IsRemote()
)
lsmsg.SetFlag(apc.LsNoDirs)
Expand All @@ -272,6 +272,8 @@ func (r *lrit) _prefix(wi lrwi, smap *meta.Smap) error {
}
if !bremote {
smap = nil // not needed
} else {
npg.bp = core.T.Backend(r.bck)
}
for {
if r.done() {
Expand Down
30 changes: 18 additions & 12 deletions xact/xs/lso.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,31 @@ type (
streamingF
}
LsoXact struct {
msg *apc.LsoMsg
msgCh chan *apc.LsoMsg // incoming requests
lpis lpi.Lpis
msg *apc.LsoMsg // first message
msgCh chan *apc.LsoMsg // next messages
respCh chan *LsoRsp // responses - next pages
remtCh chan *LsoRsp // remote paging by the responsible target
stopCh cos.StopCh // to stop xaction
token string // continuation token -> last responded page
ctx *core.LsoInvCtx // bucket inventory
nextToken string // next continuation token -> next pages
token string // continuation token -> last responded page
stopCh cos.StopCh // to stop xaction
page cmn.LsoEntries // current page (contents)
walk struct {
pageCh chan *cmn.LsoEnt // channel to accumulate listed object entries
stopCh *cos.StopCh // to abort bucket walk
wi *walkInfo // walking context and state
bp core.Backend // t.Backend(bck)
wg sync.WaitGroup // wait until this walk finishes
done bool // done walking (indication)
wor bool // wantOnlyRemote
dontPopulate bool // when listing remote obj-s: don't include local MD (in re: LsDonAddRemote)
this bool // r.msg.SID == core.T.SID(): true when this target does remote paging
last bool // last remote page
remote bool // list remote
}
streamingX
lensgl int64
ctx *core.LsoInvCtx
lpis lpi.Lpis
lensgl int64 // channel to accumulate listed object entries
}
LsoRsp struct {
Err error
Expand Down Expand Up @@ -190,8 +192,11 @@ func (p *lsoFactory) beginStreams(r *LsoXact) error {
func (r *LsoXact) Run(wg *sync.WaitGroup) {
wg.Done()

if !lsoIsRemote(r.p.Bck, r.msg.IsFlagSet(apc.LsCached)) {
r.walk.remote = lsoIsRemote(r.p.Bck, r.msg.IsFlagSet(apc.LsCached))
if !r.walk.remote {
r.initWalk()
} else {
r.walk.bp = core.T.Backend(r.p.Bck)
}
loop:
for {
Expand All @@ -203,8 +208,9 @@ loop:
r.msg.PageSize = msg.PageSize

// cannot change
debug.Assert((r.msg.SID == core.T.SID()) == r.walk.this)
debug.Assert(r.walk.wor == r.msg.WantOnlyRemoteProps())
debug.Assert(r.msg.SID == msg.SID, r.msg.SID, " vs ", msg.SID)
debug.Assert(r.walk.wor == msg.WantOnlyRemoteProps(), msg.Str(r.p.Bck.Cname("")))
debug.Assert(r.walk.remote == lsoIsRemote(r.p.Bck, msg.IsFlagSet(apc.LsCached)), msg.Str(r.p.Bck.Cname("")))

r.IncPending()
resp := r.doPage()
Expand Down Expand Up @@ -345,7 +351,7 @@ func (r *LsoXact) Do(msg *apc.LsoMsg) *LsoRsp {
}

func (r *LsoXact) doPage() *LsoRsp {
if lsoIsRemote(r.p.Bck, r.msg.IsFlagSet(apc.LsCached)) {
if r.walk.remote {
if r.msg.ContinuationToken == "" || r.msg.ContinuationToken != r.token {
// can't extract the next-to-list object name from the remotely generated
// continuation token, keeping and returning the entire last page
Expand Down Expand Up @@ -405,7 +411,7 @@ func (r *LsoXact) havePage(token string, cnt int64) bool {
func (r *LsoXact) nextPageR() (err error) {
var (
page *cmn.LsoRes
npg = newNpgCtx(r.p.Bck, r.msg, r.LomAdd, r.ctx)
npg = newNpgCtx(r.p.Bck, r.msg, r.LomAdd, r.ctx, r.walk.bp)
smap = core.T.Sowner().Get()
tsi = smap.GetActiveNode(r.msg.SID)
)
Expand Down
10 changes: 6 additions & 4 deletions xact/xs/nextpage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ import (
)

type npgCtx struct {
bp core.Backend
bck *meta.Bck
ctx *core.LsoInvCtx
wi walkInfo
page cmn.LsoRes
idx int
}

func newNpgCtx(bck *meta.Bck, msg *apc.LsoMsg, cb lomVisitedCb, ctx *core.LsoInvCtx) (npg *npgCtx) {
func newNpgCtx(bck *meta.Bck, msg *apc.LsoMsg, cb lomVisitedCb, ctx *core.LsoInvCtx, bp core.Backend) (npg *npgCtx) {
npg = &npgCtx{
bp: bp,
bck: bck,
wi: walkInfo{
msg: msg.Clone(),
Expand Down Expand Up @@ -89,13 +91,13 @@ func (npg *npgCtx) nextPageR(nentries cmn.LsoEntries) (lst *cmn.LsoRes, err erro
lst = &cmn.LsoRes{Entries: nentries}
if npg.ctx != nil {
if npg.ctx.Lom == nil {
_, err = core.T.Backend(npg.bck).GetBucketInv(npg.bck, npg.ctx)
_, err = npg.bp.GetBucketInv(npg.bck, npg.ctx)
}
if err == nil {
err = core.T.Backend(npg.bck).ListObjectsInv(npg.bck, npg.wi.msg, lst, npg.ctx)
err = npg.bp.ListObjectsInv(npg.bck, npg.wi.msg, lst, npg.ctx)
}
} else {
_, err = core.T.Backend(npg.bck).ListObjects(npg.bck, npg.wi.msg, lst)
_, err = npg.bp.ListObjects(npg.bck, npg.wi.msg, lst)
}
if err != nil {
freeLsoEntries(nentries)
Expand Down
3 changes: 2 additions & 1 deletion xact/xs/nsumm.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,9 @@ func (r *XactNsumm) visitObj(lom *core.LOM, _ []byte) error {
func (r *XactNsumm) runCloudBck(bck *meta.Bck, res *cmn.BsummResult) {
lsmsg := &apc.LsoMsg{Props: apc.GetPropsSize, Prefix: r.p.msg.Prefix}
lsmsg.SetFlag(apc.LsNameSize | apc.LsNoDirs)
bp := core.T.Backend(bck)
for !r.IsAborted() {
npg := newNpgCtx(bck, lsmsg, noopCb, nil) // TODO: inventory offset
npg := newNpgCtx(bck, lsmsg, noopCb, nil, bp) // TODO: inventory offset
nentries := allocLsoEntries()
lst, err := npg.nextPageR(nentries)
if err != nil {
Expand Down

0 comments on commit 3206a6b

Please sign in to comment.