Skip to content

Commit

Permalink
Adds MultiPart{Upload|Download} to OCI backend
Browse files Browse the repository at this point in the history
Signed-off-by: Edward McClanahan <[email protected]>
  • Loading branch information
edmc-ss committed Jan 23, 2025
1 parent 81bc96b commit d6f6e89
Show file tree
Hide file tree
Showing 9 changed files with 1,027 additions and 44 deletions.
10 changes: 5 additions & 5 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,12 @@ test:short:azure:
- make test-short

test:short:oci:
<<: *test_short_skip_scheduled_def
<<: *test_short_def
variables:
BUCKET: "oc://ais-ci"
script:
- ${SCRIPTS_DIR}/clean_deploy.sh --target-cnt $NUM_TARGET --proxy-cnt $NUM_PROXY --mountpath-cnt $FS_CNT --oci
- make test-short
- . ais/test/oci/mp_tuning.env && ${SCRIPTS_DIR}/clean_deploy.sh --target-cnt $NUM_TARGET --proxy-cnt $NUM_PROXY --mountpath-cnt $FS_CNT --oci
- . ais/test/oci/mp_tuning.env && make test-short

test:long:
<<: *test_long_skip_scheduled_def
Expand Down Expand Up @@ -356,8 +356,8 @@ test:long:oci:
variables:
BUCKET: "oc://ais-ci"
script:
- ${SCRIPTS_DIR}/clean_deploy.sh --target-cnt $NUM_TARGET --proxy-cnt $NUM_PROXY --mountpath-cnt $FS_CNT --oci --ht
- make test-long
- . ais/test/oci/mp_tuning.env && ${SCRIPTS_DIR}/clean_deploy.sh --target-cnt $NUM_TARGET --proxy-cnt $NUM_PROXY --mountpath-cnt $FS_CNT --oci --ht
- . ais/test/oci/mp_tuning.env && make test-long

test:long:aisloader:
stage: test-long
Expand Down
139 changes: 101 additions & 38 deletions ais/backend/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@

// Package backend contains implementation of various backend providers.
/*
* Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved.
*/
package backend

// Outstanding [TODO] items:
// 4) Multi-Segment-Upload utilization (for fast/large object PUTs)... if practical
// 5) Multi-Segment-Download utilization (for fast/large object GETs)... if practical
// 1) Resolve if ais/test/mp_tuning.env is *not* the way to do this
// 2) Avoid per MPU Child make([]byte) invocation (see e.g. memsys/)
// 3) Resolve proper place/way to stress/bench test in pipeline(s)
// 4) Add MPU stress (and benchmarking?)
// 5) Add MPD stress (and benchmarking?)
// 6) Add support for object versioning
// 7) Resolve test:long:oci CI Pipeline failure in TestMultiProxy
// 8) Support "bucket props" that also avoids ENV OCI_COMPARTMENT_OCID
// 9) Define our own "rcFile" equivalent if desired (preferably in something like JSON)
// 10) Add in support for OCI SDK's rcFile if desired (perhaps in lieu of 9)
// 10) Add in support for OCI SDK's rcFile if desired (perhaps in lieu of 9)

import (
"container/list"
"context"
"crypto/x509"
"encoding/pem"
Expand All @@ -24,6 +28,7 @@ import (
"net/http"
"os"
"path/filepath"
"sync"
"time"

"github.com/NVIDIA/aistore/api/apc"
Expand All @@ -46,26 +51,34 @@ const (
maxPageSizeMax = 1000
maxPageSizeDefault = maxPageSizeMax
mpdSegmentMaxSizeMin = 4 * cos.KiB
mpdSegmentMaxSizeMax = 256 * cos.MiB
mpdSegmentMaxSizeDefault = mpdSegmentMaxSizeMax
mpdThresholdMin = 8 * cos.KiB
mpdThresholdMax = 512 * cos.MiB
mpdThresholdDefault = mpdThresholdMax
mpdMaxThreadsMin = 4
mpdSegmentMaxSizeMax = 5 * cos.GiB
mpdSegmentMaxSizeDefault = 256 * cos.MiB
mpdThresholdMin = 4 * cos.KiB
mpdThresholdMax = 5 * cos.GiB
mpdThresholdDefault = 512 * cos.MiB
mpdMaxThreadsMin = 1
mpdMaxThreadsMax = 64
mpdMaxThreadsDefault = 16
mpuSegmentMaxSizeMin = 4 * cos.KiB
mpuSegmentMaxSizeMax = 256 * cos.MiB
mpuSegmentMaxSizeDefault = mpuSegmentMaxSizeMax
mpuThresholdMin = 8 * cos.KiB
mpuThresholdMax = 512 * cos.MiB
mpuThresholdDefault = mpuThresholdMax
mpuMaxThreadsMin = 4
mpuSegmentMaxSizeMax = 5 * cos.GiB
mpuSegmentMaxSizeDefault = 256 * cos.MiB
mpuThresholdMin = 4 * cos.KiB
mpuThresholdMax = 5 * cos.GiB
mpuThresholdDefault = 512 * cos.MiB
mpuMaxThreadsMin = 1
mpuMaxThreadsMax = 64
mpuMaxThreadsDefault = 16
mpThreadPoolSizeMin = 1
mpThreadPoolSizeMax = 16384
mpThreadPoolSizeDefault = 1024
)

type mpChildIf interface {
Run()
}

type ocibp struct {
sync.Mutex // serializes access to .mpChildPendingList & .mpChildActiveCount
t core.TargetPut
configurationProvider ocicmn.ConfigurationProvider
compartmentOCID string
Expand All @@ -76,6 +89,9 @@ type ocibp struct {
mpuSegmentMaxSize int64
mpuThreshold int64
mpuMaxThreads int64
mpThreadPoolSize int64
mpChildPendingList *list.List
mpChildActiveCount int64
client ocios.ObjectStorageClient
namespace string
base
Expand All @@ -86,8 +102,9 @@ var _ core.Backend = (*ocibp)(nil)

func NewOCI(t core.TargetPut, tstats stats.Tracker, startingUp bool) (core.Backend, error) {
bp := &ocibp{
t: t,
base: base{provider: apc.OCI},
t: t,
mpChildPendingList: list.New(),
base: base{provider: apc.OCI},
}

if err := bp.fetchCliConfig(); err != nil {
Expand Down Expand Up @@ -240,6 +257,10 @@ func (bp *ocibp) fetchTuningENVs() (err error) {
mpuMaxThreadsDefault, &bp.mpuMaxThreads); err != nil {
return
}
if err = bp.set(env.OCIMultiPartThreadPoolSize, mpThreadPoolSizeMin, mpThreadPoolSizeMax,
mpThreadPoolSizeDefault, &bp.mpThreadPoolSize); err != nil {
return
}

return nil
}
Expand Down Expand Up @@ -274,6 +295,34 @@ func ociStatus(rawResponse *http.Response) (ecode int) {
return
}

func (bp *ocibp) pooledLauchChild(mpChild mpChildIf) {
bp.Lock()
if bp.mpChildActiveCount < bp.mpThreadPoolSize {
bp.mpChildActiveCount++
bp.Unlock()
go bp.pooledLaunchChildRunner(mpChild)
} else {
_ = bp.mpChildPendingList.PushBack(mpChild)
bp.Unlock()
}
}

func (bp *ocibp) pooledLaunchChildRunner(mpChild mpChildIf) {
for {
mpChild.Run()
bp.Lock()
le := bp.mpChildPendingList.Front()
if le == nil {
bp.mpChildActiveCount--
bp.Unlock()
return
}
bp.mpChildPendingList.Remove(le)
mpChild = le.Value.(mpChildIf)
bp.Unlock()
}
}

// as core.Backend --------------------------------------------------------------

func (bp *ocibp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (ecode int, err error) {
Expand Down Expand Up @@ -440,23 +489,42 @@ func (bp *ocibp) ListBuckets(_ cmn.QueryBcks) (bcks cmn.Bcks, ecode int, _ error
return bcks, 0, nil
}

// [TODO] Need to implement multi-threaded PUT when "length" exceeds bp.mpuThreshold
func (bp *ocibp) PutObj(r io.ReadCloser, lom *core.LOM, _ *http.Request) (int, error) {
h := cmn.BackendHelpers.OCI
cloudBck := lom.Bck().RemoteBck()
var (
avoidingMPU bool // true if object size is not known or known to be <= bp.mpuThreshold
cloudBck = lom.Bck().RemoteBck()
err error
objectAttrs *cmn.ObjAttrs = lom.ObjAttrs()
objectSize int64
)

if objectAttrs == nil {
avoidingMPU = true
} else {
objectSize = objectAttrs.Size
avoidingMPU = (objectSize <= bp.mpuThreshold)
}

if !avoidingMPU {
return bp.putObjViaMPU(r, lom, objectSize)
}

req := ocios.PutObjectRequest{
NamespaceName: &bp.namespace,
BucketName: &cloudBck.Name,
ObjectName: &lom.ObjName,
PutObjectBody: r,
}

resp, err := bp.client.PutObject(context.Background(), req)
// Note: in case PutObject() failed to close r...
_ = r.Close()
if err != nil {
return ociStatus(resp.RawResponse), err
}

h := cmn.BackendHelpers.OCI

lom.SetCustomKey(apc.HdrBackendProvider, apc.OCI)
if v, ok := h.EncodeETag(resp.ETag); ok {
lom.SetCustomKey(cmn.ETag, v)
Expand Down Expand Up @@ -548,22 +616,24 @@ func (bp *ocibp) GetObj(ctx context.Context, lom *core.LOM, owt cmn.OWT, _ *http
return 0, err
}

// [TODO]
// 1. Need to implement multi-threaded GET when "length" exceeds bp.mpdThreshold
// 2. Consider setting req.IfMatch to lom.GetCustomKey(cmn.ETag) if present
// [TODO] Consider setting req.IfMatch to lom.GetCustomKey(cmn.ETag) if present
func (bp *ocibp) GetObjReader(ctx context.Context, lom *core.LOM, offset, length int64) (res core.GetReaderResult) {
var (
cloudBck = lom.Bck().RemoteBck()
h = cmn.BackendHelpers.OCI
rangeHeader string
attemptingMPD = (length == 0)
cloudBck = lom.Bck().RemoteBck()
err error
rangeHeader string
)

req := ocios.GetObjectRequest{
NamespaceName: &bp.namespace,
BucketName: &cloudBck.Name,
ObjectName: &lom.ObjName,
}
if length > 0 {
if attemptingMPD {
rangeHeader = cmn.MakeRangeHdr(0, bp.mpdThreshold)
req.Range = &rangeHeader
} else {
rangeHeader = cmn.MakeRangeHdr(offset, length)
req.Range = &rangeHeader
}
Expand All @@ -575,18 +645,11 @@ func (bp *ocibp) GetObjReader(ctx context.Context, lom *core.LOM, offset, length
return
}

if length == 0 {
lom.ObjAttrs().Size = *resp.ContentLength
lom.SetCustomKey(cmn.SourceObjMD, apc.OCI)
if v, ok := h.EncodeETag(resp.ETag); ok {
lom.SetCustomKey(cmn.ETag, v)
}
if v, ok := h.EncodeCksum(resp.ContentMd5); ok {
lom.SetCustomKey(cmn.MD5ObjMD, v)
}
if attemptingMPD {
return bp.getObjReaderViaMPD(ctx, lom, &resp)
}

res.R = resp.Content
res.Size = *resp.ContentLength
return res
return
}
Loading

0 comments on commit d6f6e89

Please sign in to comment.