Skip to content

Commit

Permalink
micro-optimize prefetch; simplify cold-get; add stats-updater i/f; al…
Browse files Browse the repository at this point in the history
…ign fields

* with refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Mar 8, 2025
1 parent 3206a6b commit a250bfd
Show file tree
Hide file tree
Showing 25 changed files with 155 additions and 124 deletions.
2 changes: 2 additions & 0 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func (h *htrun) String() string { return h.si.String() }
func (h *htrun) Bowner() meta.Bowner { return h.owner.bmd }
func (h *htrun) Sowner() meta.Sowner { return h.owner.smap }

func (h *htrun) StatsUpdater() cos.StatsUpdater { return h.statsT }

func (h *htrun) PageMM() *memsys.MMSA { return h.gmm }
func (h *htrun) ByteMM() *memsys.MMSA { return h.smm }

Expand Down
10 changes: 5 additions & 5 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,7 @@ func (t *target) Run() error {
t.htrun.init(config)
t.setusr1()

tstats := t.statsT.(*stats.Trunner)

core.Tinit(t, tstats, config, true /*run hk*/)
core.Tinit(t, config, true /*run hk*/)

fatalErr, writeErr := t.checkRestarted(config)
if fatalErr != nil {
Expand Down Expand Up @@ -355,6 +353,8 @@ func (t *target) Run() error {
}
t.owner.smap.put(smap)

tstats := t.statsT.(*stats.Trunner)

if daemon.cli.target.standby {
tstats.Standby(true)
t.regstate.disabled.Store(true)
Expand Down Expand Up @@ -411,8 +411,8 @@ func (t *target) Run() error {
go t.goresilver(marked.Interrupted)
}

dsort.Tinit(t.statsT, db, config)
dload.Init(t.statsT, db, &config.Client)
dsort.Tinit(db, config)
dload.Init(db, &config.Client)

err = t.htrun.run(config)

Expand Down
46 changes: 17 additions & 29 deletions ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,39 +142,33 @@ func (t *target) CopyObject(lom *core.LOM, dm *bundle.DataMover, params *xs.CoiP
return size, err
}

// use `backend.GetObj`
// (compare w/ `backend.GetObjReader` via ldp and blob download)
func (t *target) GetCold(ctx context.Context, lom *core.LOM, xkind string, owt cmn.OWT) (ecode int, err error) {
// 1. lock
// lock
switch owt {
case cmn.OwtGetPrefetchLock:
// do nothing
case cmn.OwtGetTryLock, cmn.OwtGetLock:
if owt == cmn.OwtGetTryLock {
if !lom.TryLock(true) {
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
nlog.Warningln(t.String(), lom.String(), owt.String(), "is busy")
}
return 0, cmn.ErrSkip // e.g. prefetch can skip it and keep on going
case cmn.OwtGetTryLock: // e.g., downloader
if !lom.TryLock(true) {
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
nlog.Warningln(t.String(), lom.String(), owt.String(), "is busy")
}
} else {
lom.Lock(true)
return 0, cmn.ErrSkip // TODO: must be cmn.ErrBusy
}
case cmn.OwtGetLock: // regular usage
lom.Lock(true)
default:
// for cmn.OwtGet, see goi.getCold
// other cold-get use cases include:
// - cmn.OwtGet, goi.getCold
// - cmn.OwtGetPrefetchLock, xs/prefetch
debug.Assert(false, owt.String())
return http.StatusInternalServerError, errors.New("invalid " + owt.String())
return http.StatusInternalServerError, errors.New("cold-get: invalid " + owt.String())
}

// 2. GET remote object and store it
// cold GET
var (
started = mono.NanoTime()
backend = t.Backend(lom.Bck())
)
if ecode, err = backend.GetObj(ctx, lom, owt, nil /*origReq*/); err != nil {
if owt != cmn.OwtGetPrefetchLock {
lom.Unlock(true)
}
lom.Unlock(true)
if cmn.IsErrFailedTo(err) {
nlog.Warningln(err)
} else {
Expand All @@ -183,17 +177,11 @@ func (t *target) GetCold(ctx context.Context, lom *core.LOM, xkind string, owt c
return ecode, err
}

// 3. unlock
switch owt {
case cmn.OwtGetPrefetchLock:
// do nothing
case cmn.OwtGetTryLock, cmn.OwtGetLock:
lom.Unlock(true)
}

// 4. stats
// unlock and stats
lom.Unlock(true)
lat := mono.SinceNano(started)
t.coldstats(backend, lom.Bck().Cname(""), xkind, lom.Lsize(), lat)

return 0, nil
}

Expand Down
2 changes: 1 addition & 1 deletion ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (poi *putOI) fini() (ecode int, err error) {
case cmn.OwtGetPrefetchLock:
if !lom.TryLock(true) {
nlog.Warningln(poi.loghdr(), "is busy")
return 0, cmn.ErrSkip // e.g. prefetch can skip it and keep on going
return 0, cmn.ErrSkip // e.g. prefetch can skip it and keep on going // TODO: must be cmn.ErrBusy
}
defer lom.Unlock(true)
default:
Expand Down
2 changes: 1 addition & 1 deletion ais/tgtobj_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestMain(m *testing.M) {
t.htrun.init(config)

t.statsT = mock.NewStatsTracker()
core.Tinit(t, t.statsT, config, false)
core.Tinit(t, config, false)

bck := meta.NewBck(testBucket, apc.AIS, cmn.NsGlobal)
bmd := newBucketMD()
Expand Down
7 changes: 4 additions & 3 deletions api/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
)

type ETLObjArgs struct {
// ETLName specifies the running ETL instance to be used in inline transform.
ETLName string

// TransformArgs holds the arguments to be used in ETL inline transform,
// which will be sent as `apc.QparamETLArgs` query parameter in the request.
// Optional, can be omitted (nil).
TransformArgs any

// ETLName specifies the running ETL instance to be used in inline transform.
ETLName string
}

// Initiate custom ETL workload by executing one of the documented `etl.InitMsg`
Expand Down
4 changes: 2 additions & 2 deletions cmn/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (e *ErrBusy) Error() string {
return fmt.Sprintf("%s %q is currently busy%s, please try again", e.whereOrType, e.what, s)
}

func isErrBusy(err error) bool {
func IsErrBusy(err error) bool {
_, ok := err.(*ErrBusy)
return ok
}
Expand Down Expand Up @@ -1286,7 +1286,7 @@ func WriteErr(w http.ResponseWriter, r *http.Request, err error, opts ...int /*[
status = http.StatusRequestedRangeNotSatisfiable
case isErrUnsupp(err), isErrNotImpl(err):
status = http.StatusNotImplemented
case isErrBusy(err):
case IsErrBusy(err):
status = http.StatusConflict
}
}
Expand Down
12 changes: 6 additions & 6 deletions core/lcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (lchk *lchk) evict(timeout time.Duration, now time.Time, pct int) {
var (
avail = fs.GetAvail()
wg = &sync.WaitGroup{}
evicted = g.tstats.Get(LcacheEvictedCount)
evicted = T.StatsUpdater().Get(LcacheEvictedCount)
)
lchk.total.Store(0)
for _, mi := range avail {
Expand All @@ -323,7 +323,7 @@ func (lchk *lchk) evict(timeout time.Duration, now time.Time, pct int) {
go evct.do()
}
wg.Wait()
evicted = g.tstats.Get(LcacheEvictedCount) - evicted
evicted = T.StatsUpdater().Get(LcacheEvictedCount) - evicted
nlog.Infoln("hk done:", lchk.total.Load(), evicted)
}

Expand Down Expand Up @@ -368,7 +368,7 @@ func (evct *evct) f(hkey, value any) bool {
if lmd2 == md {
*md = lom0.md // zero out
}
g.tstats.Inc(LcacheEvictedCount)
T.StatsUpdater().Inc(LcacheEvictedCount)

// throttle
evct.evicted++
Expand All @@ -392,13 +392,13 @@ func _flushAtime(md *lmeta, atime time.Time, mdTime int64) {
return
}
if err = lom.flushAtime(atime); err != nil {
g.tstats.Inc(LcacheErrCount)
T.StatsUpdater().Inc(LcacheErrCount)
T.FSHC(err, lom.Mountpath(), lom.FQN)
return
}

// stats
g.tstats.Inc(LcacheFlushColdCount)
T.StatsUpdater().Inc(LcacheFlushColdCount)

if !md.isDirty() {
return
Expand All @@ -418,7 +418,7 @@ func _flushAtime(md *lmeta, atime time.Time, mdTime int64) {
continue
}
if err = fs.SetXattr(copyFQN, XattrLOM, buf); err != nil {
g.tstats.Inc(LcacheErrCount)
T.StatsUpdater().Inc(LcacheErrCount)
nlog.Errorln("set-xattr [", copyFQN, err, "]")
break
}
Expand Down
2 changes: 1 addition & 1 deletion core/ldp.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (lom *LOM) CheckRemoteMD(locked, sync bool, origReq *http.Request) (res CRM
ecode, err = 0, errDel
} else {
vlabs := map[string]string{"bucket": lom.Bck().Cname("")} // TODO -- FIXME: cannot import stats
g.tstats.IncWith(RemoteDeletedDelCount, vlabs)
T.StatsUpdater().IncWith(RemoteDeletedDelCount, vlabs)
}
debug.Assert(err != nil)
return CRMD{ErrCode: ecode, Err: err}
Expand Down
6 changes: 2 additions & 4 deletions core/lom.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type (

type (
global struct {
tstats cos.StatsUpdater // (stats.Trunner)
pmm *memsys.MMSA
smm *memsys.MMSA
locker nameLocker
Expand Down Expand Up @@ -97,13 +96,12 @@ var (

func Pinit() { bckLocker = newNameLocker() }

func Tinit(t Target, tstats cos.StatsUpdater, config *cmn.Config, runHK bool) {
func Tinit(t Target, config *cmn.Config, runHK bool) {
bckLocker = newNameLocker()
T = t
{
g.maxLmeta.Store(xattrMaxSize)
g.locker = newNameLocker()
g.tstats = tstats
g.pmm = t.PageMM()
g.smm = t.ByteMM()
}
Expand Down Expand Up @@ -516,7 +514,7 @@ func (lom *LOM) _collide(lmd *lmeta) {
if cmn.Rom.FastV(4, cos.SmoduleCore) || lom.digest&0xf == 5 {
nlog.InfoDepth(1, LcacheCollisionCount, lom.digest, "[", *lmd.uname, "]", *lom.md.uname, lom.Cname())
}
g.tstats.Inc(LcacheCollisionCount)
T.StatsUpdater().Inc(LcacheCollisionCount)
}

func (lom *LOM) Uncache() {
Expand Down
20 changes: 11 additions & 9 deletions core/mock/target_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/fs"
Expand All @@ -31,21 +32,22 @@ var _ core.Target = (*TargetMock)(nil)

func NewTarget(bo meta.Bowner) *TargetMock {
t := &TargetMock{BO: bo}
core.Tinit(t, NewStatsTracker(), nil /*config*/, false /*run HK*/)
core.Tinit(t, nil /*config*/, false /*run HK*/)
return t
}

func (t *TargetMock) Bowner() meta.Bowner { return t.BO }
func (t *TargetMock) Sowner() meta.Sowner { return t.SO }

func (*TargetMock) SID() string { return mockID }
func (*TargetMock) String() string { return "tmock" }
func (*TargetMock) Snode() *meta.Snode { return &meta.Snode{DaeID: mockID} }
func (*TargetMock) ClusterStarted() bool { return true }
func (*TargetMock) NodeStarted() bool { return true }
func (*TargetMock) DataClient() *http.Client { return http.DefaultClient }
func (*TargetMock) PageMM() *memsys.MMSA { return memsys.PageMM() }
func (*TargetMock) ByteMM() *memsys.MMSA { return memsys.ByteMM() }
func (*TargetMock) SID() string { return mockID }
func (*TargetMock) String() string { return "tmock" }
func (*TargetMock) Snode() *meta.Snode { return &meta.Snode{DaeID: mockID} }
func (*TargetMock) ClusterStarted() bool { return true }
func (*TargetMock) NodeStarted() bool { return true }
func (*TargetMock) DataClient() *http.Client { return http.DefaultClient }
func (*TargetMock) StatsUpdater() cos.StatsUpdater { return NewStatsTracker() }
func (*TargetMock) PageMM() *memsys.MMSA { return memsys.PageMM() }
func (*TargetMock) ByteMM() *memsys.MMSA { return memsys.ByteMM() }

func (*TargetMock) MaxUtilLoad() (int64, float64) { return 0, 0 }

Expand Down
5 changes: 4 additions & 1 deletion core/node.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Package core provides core metadata and in-cluster API
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
*/
package core

import (
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/memsys"
)
Expand All @@ -30,6 +31,8 @@ type (
ClusterStarted() bool
NodeStarted() bool

StatsUpdater() cos.StatsUpdater

// Memory allocators
PageMM() *memsys.MMSA
ByteMM() *memsys.MMSA
Expand Down
13 changes: 4 additions & 9 deletions ext/dload/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/fs"
"github.com/NVIDIA/aistore/stats"
"github.com/NVIDIA/aistore/xact/xreg"
"golang.org/x/sync/errgroup"
)
Expand All @@ -46,9 +45,8 @@ type (
}

global struct {
tstats stats.Tracker
db kvdb.Driver
store *infoStore
db kvdb.Driver
store *infoStore

// Downloader selects one of the two clients (below) by the destination URL.
// Certification check is disabled for now and does not depend on cluster settings.
Expand All @@ -61,16 +59,13 @@ type (

var g global

func Init(tstats stats.Tracker, db kvdb.Driver, clientConf *cmn.ClientConf) {
func Init(db kvdb.Driver, clientConf *cmn.ClientConf) {
g.clientH, g.clientTLS = cmn.NewDefaultClients(clientConf.TimeoutLong.D())

if db == nil { // unit tests only
return
}
{
g.tstats = tstats
g.db = db
}
g.db = db
xreg.RegNonBckXact(&factory{})
}

Expand Down
4 changes: 2 additions & 2 deletions ext/dload/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (task *singleTask) download(lom *core.LOM) {

vlabs := map[string]string{stats.VarlabBucket: lom.Bck().Cname("")}
lsize := task.currentSize.Load()
g.tstats.AddWith(
core.T.StatsUpdater().AddWith(
cos.NamedVal64{Name: stats.DloadSize, Value: lsize, VarLabs: vlabs},
cos.NamedVal64{Name: stats.DloadLatencyTotal, Value: int64(task.ended.Load().Sub(task.started.Load())), VarLabs: vlabs},
)
Expand Down Expand Up @@ -251,7 +251,7 @@ func (task *singleTask) wrapReader(r io.ReadCloser) io.ReadCloser {
// Probably we need to extend the persistent database (db.go) so that it will contain
// also information about specific tasks.
func (task *singleTask) markFailed(statusMsg string) {
g.tstats.Inc(stats.ErrDloadCount)
core.T.StatsUpdater().Inc(stats.ErrDloadCount)
g.store.persistError(task.jobID(), task.obj.objName, statusMsg)
g.store.incErrorCnt(task.jobID())
}
Expand Down
2 changes: 1 addition & 1 deletion ext/dload/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestCompareObject(t *testing.T) {
// initialize http clients
clientConf.Timeout = 5 * cos.Duration(time.Second)
clientConf.TimeoutLong = 15 * cos.Duration(time.Second)
dload.Init(nil, nil, &clientConf)
dload.Init(nil, &clientConf)

// Modify local object to contain invalid (meta)data.
customMD := cos.StrKVs{
Expand Down
Loading

0 comments on commit a250bfd

Please sign in to comment.