Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 relay interface #833

Merged
merged 29 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6814f14
Created core framework, need to add unit tests
cody-littley Oct 23, 2024
c7ba7d6
Add unit tests for fragmentation logic.
cody-littley Oct 23, 2024
e5dcc78
Incremental progress.
cody-littley Oct 23, 2024
17ac4bc
Fix some bugs.
cody-littley Oct 24, 2024
356959c
Fixed bugs.
cody-littley Oct 24, 2024
958e6e1
Test against localstack.
cody-littley Oct 24, 2024
a2bb06f
lint
cody-littley Oct 24, 2024
95b9d75
Cleanup.
cody-littley Oct 24, 2024
9b9d57d
Merge branch 'master' into s3-relay-interface
cody-littley Oct 24, 2024
e6c7a0e
Remove TTL from upload
cody-littley Oct 25, 2024
b712c88
Merge branch 'master' into s3-relay-interface
cody-littley Oct 25, 2024
93e2310
Use v2 APIs.
cody-littley Oct 28, 2024
6e9aa2e
Made suggested changes.
cody-littley Oct 28, 2024
8323596
Made suggested changes.
cody-littley Oct 28, 2024
5d5f425
Incremental checkin.
cody-littley Oct 28, 2024
2ddd2c4
Finished migration.
cody-littley Oct 28, 2024
220d48f
Fix flags.
cody-littley Oct 28, 2024
4996fb0
Fix bug.
cody-littley Oct 29, 2024
06cb92e
Merge branch 'master' into s3-relay-interface
cody-littley Oct 29, 2024
dc5b6fc
Fix unit test.
cody-littley Oct 29, 2024
9104ab1
Fix unit test.
cody-littley Oct 29, 2024
319fca6
Fix unit test.
cody-littley Oct 29, 2024
fe9e08c
Fix unit test.
cody-littley Oct 29, 2024
8f269de
Make suggested changes.
cody-littley Oct 29, 2024
c5d5b96
Tweak unit test.
cody-littley Oct 29, 2024
a4c081d
Merge branch 'master' into s3-relay-interface
cody-littley Oct 29, 2024
c4c7e2d
Change localstack port.
cody-littley Oct 29, 2024
684dab8
Add debug code.
cody-littley Oct 30, 2024
958f593
Fiddle with inabox settings.
cody-littley Oct 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.21.1

require (
github.com/Layr-Labs/eigensdk-go v0.1.7-0.20240507215523-7e4891d5099a
github.com/aws/aws-sdk-go v1.55.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to do it with just v2 library?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, refactored to only use v2 API.

github.com/aws/aws-sdk-go-v2 v1.26.1
github.com/aws/aws-sdk-go-v2/credentials v1.17.11
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.12
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bw
github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA=
github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU=
Expand Down
21 changes: 21 additions & 0 deletions relay/dataplane/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package dataplane

import "time"

// S3Client is a convenience wrapper for uploading and downloading files from amazon S3. May
// break down files into smaller parts for upload (to improve latency), and if so the files are
// reassembled on download. This tool is not intended to be used for reading and writing files
// that are consumed by utilities that are not aware of the multipart upload/download process.
//
// Implementations of this interface are required to be thread-safe.
type S3Client interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we implement new methods that support fragments in the existing s3 client (common/aws/s3)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change made

// Upload uploads a file to S3. The fragmentSize parameter specifies the maximum size of each
// file uploaded to S3. If the file is larger than fragmentSize then it will be broken into
// smaller parts and uploaded in parallel. The file will be reassembled on download.
Upload(key string, data []byte, fragmentSize int, ttl time.Duration) error
// Download downloads a file from S3, as written by Upload. The fileSize (in bytes) and fragmentSize
// must be the same as the values used in the Upload call.
Download(key string, fileSize int, fragmentSize int) ([]byte, error)
// Close closes the S3 client.
Close() error
}
164 changes: 164 additions & 0 deletions relay/dataplane/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package dataplane

import (
"context"
"github.com/Layr-Labs/eigenda/common/kvstore/mapstore"
tu "github.com/Layr-Labs/eigenda/common/testutils"
"github.com/Layr-Labs/eigenda/inabox/deploy"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/assert"
"math/rand"
"os"
"testing"
"time"
)

var (
dockertestPool *dockertest.Pool
dockertestResource *dockertest.Resource
)

const (
localstackPort = "4566"
localstackHost = "http://0.0.0.0:4566"
)

type clientBuilder struct {
// This method is called at the beginning of the test.
start func() error
// This method is called to build a new client.
build func() (S3Client, error)
// This method is called at the end of the test when all operations are done.
finish func() error
}

var clientBuilders = []*clientBuilder{
{
start: func() error {
return nil
},
build: func() (S3Client, error) {
return NewLocalClient(mapstore.NewStore()), nil
},
finish: func() error {
return nil
},
},
{
start: func() error {
return setupLocalstack()
},
build: func() (S3Client, error) {

config := DefaultS3Config()
config.AWSConfig.Endpoint = aws.String(localstackHost)
config.AWSConfig.S3ForcePathStyle = aws.Bool(true)
config.AWSConfig.WithRegion("us-east-1")

err := os.Setenv("AWS_ACCESS_KEY_ID", "localstack")
if err != nil {
return nil, err
}
err = os.Setenv("AWS_SECRET_ACCESS_KEY", "localstack")
if err != nil {
return nil, err
}

config.Bucket = "this-is-a-test-bucket"
config.AutoCreateBucket = true

client, err := NewS3Client(context.Background(), config)
if err != nil {
return nil, err
}

return client, nil
},
finish: func() error {
teardownLocalstack()
return nil
},
},
}

func setupLocalstack() error {
var err error
dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localstackPort)
if err != nil {
teardownLocalstack()
return err
}
return nil
}

func teardownLocalstack() {
deploy.PurgeDockertestResources(dockertestPool, dockertestResource)
}

func RandomOperationsTest(t *testing.T, client S3Client) {
numberToWrite := 100
expectedData := make(map[string][]byte)

fragmentSize := rand.Intn(1000) + 1000

for i := 0; i < numberToWrite; i++ {
key := tu.RandomString(10)
fragmentMultiple := rand.Float64() * 10
dataSize := int(fragmentMultiple*float64(fragmentSize)) + 1
data := tu.RandomBytes(dataSize)
expectedData[key] = data

err := client.Upload(key, data, fragmentSize, time.Hour)
assert.NoError(t, err)
}

// Read back the data
for key, expected := range expectedData {
data, err := client.Download(key, len(expected), fragmentSize)
assert.NoError(t, err)
assert.Equal(t, expected, data)
}
}

func TestRandomOperations(t *testing.T) {
tu.InitializeRandom()
for _, builder := range clientBuilders {
err := builder.start()
assert.NoError(t, err)

client, err := builder.build()
assert.NoError(t, err)
RandomOperationsTest(t, client)
err = client.Close()
assert.NoError(t, err)

err = builder.finish()
assert.NoError(t, err)
}
}

func ReadNonExistentValueTest(t *testing.T, client S3Client) {
_, err := client.Download("nonexistent", 1000, 1000)
assert.Error(t, err)
randomKey := tu.RandomString(10)
_, err = client.Download(randomKey, 0, 0)
assert.Error(t, err)
}

func TestReadNonExistentValue(t *testing.T) {
tu.InitializeRandom()
for _, builder := range clientBuilders {
err := builder.start()
assert.NoError(t, err)

client, err := builder.build()
assert.NoError(t, err)
ReadNonExistentValueTest(t, client)
err = client.Close()
assert.NoError(t, err)

err = builder.finish()
assert.NoError(t, err)
}
}
46 changes: 46 additions & 0 deletions relay/dataplane/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package dataplane

import (
"github.com/aws/aws-sdk-go/aws"
"time"
)

// S3Config is the configuration for an S3Client.
type S3Config struct {
// The AWS configuration to use when interacting with S3.
// Default uses the aws.Config default except for region which is set to "us-east-2".
AWSConfig *aws.Config
// The name of the S3 bucket to use. All data written to the S3Client will be written to this bucket.
// This is a required field.
Bucket string
// If true then the bucket will be created if it does not already exist. If false and the bucket does not exist
// then the S3Client will return an error when it is created. Default is false.
AutoCreateBucket bool
// The number of characters of the key to use as the prefix. A value of "3" for the key "ABCDEFG" would result in
// the prefix "ABC". Default is 3.
PrefixChars int
// This framework utilizes a pool of workers to help upload/download files. This value specifies the number of
// workers to use. Default is 32.
Parallelism int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set the default as number of cpus in machine? Like by using gomaxprocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've turned this into two parameters: ParallelismFactor and ParallelismConstant. The total number of workers is set equal to the formula ParallelismFactor * numCores + ParallelismConstant.

We will want to have a lot more workers than cores, since most of the time the workers are blocked on IO tasks. This allows us to set a sane default that uses a good number of workers as the number of cores grows.

// The capacity of the task channel. Default is 256. It is suggested that this value exceed the number of workers.
TaskChannelCapacity int
// If a single read takes longer than this value then the read will be aborted. Default is 30 seconds.
ReadTimeout time.Duration
// If a single write takes longer than this value then the write will be aborted. Default is 30 seconds.
WriteTimeout time.Duration
}

// DefaultS3Config returns a new S3Config with default values.
func DefaultS3Config() *S3Config {
return &S3Config{
AWSConfig: &aws.Config{
Region: aws.String("us-east-2"),
},
AutoCreateBucket: false,
PrefixChars: 3,
Parallelism: 32,
TaskChannelCapacity: 256,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}
}
115 changes: 115 additions & 0 deletions relay/dataplane/fragment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package dataplane

import (
"fmt"
"sort"
"strings"
)

// GetFragmentCount returns the number of fragments that a file of the given size will be broken into.
func GetFragmentCount(fileSize int, fragmentSize int) int {
if fileSize < fragmentSize {
return 1
} else if fileSize%fragmentSize == 0 {
return fileSize / fragmentSize
} else {
return fileSize/fragmentSize + 1
}
}

// GetFragmentKey returns the key for the fragment at the given index.
//
// Fragment keys take the form of "prefix/body-index[f]". The prefix is the first prefixLength characters
// of the file key. The body is the file key. The index is the index of the fragment. The character "f" is appended
// to the key of the last fragment in the series.
//
// Example: fileKey="abc123", prefixLength=2, fragmentCount=3
// The keys will be "ab/abc123-0", "ab/abc123-1", "ab/abc123-2f"
func GetFragmentKey(fileKey string, prefixLength int, fragmentCount int, index int) string {
var prefix string
if prefixLength > len(fileKey) {
prefix = fileKey
} else {
prefix = fileKey[:prefixLength]
}

postfix := ""
if fragmentCount-1 == index {
postfix = "f"
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also validate that index < fragmentCount ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checks added

return fmt.Sprintf("%s/%s-%d%s", prefix, fileKey, index, postfix)
}

// Fragment is a subset of a file.
type Fragment struct {
FragmentKey string
Data []byte
Index int
}

// BreakIntoFragments breaks a file into fragments of the given size.
func BreakIntoFragments(fileKey string, data []byte, prefixLength int, fragmentSize int) []*Fragment {
fragmentCount := GetFragmentCount(len(data), fragmentSize)
fragments := make([]*Fragment, fragmentCount)
for i := 0; i < fragmentCount; i++ {
start := i * fragmentSize
end := start + fragmentSize
if end > len(data) {
end = len(data)
}
fragments[i] = &Fragment{
FragmentKey: GetFragmentKey(fileKey, prefixLength, fragmentCount, i),
Data: data[start:end],
Index: i,
}
}
return fragments
}

// GetFragmentKeys returns the keys for all fragments of a file.
func GetFragmentKeys(fileKey string, prefixLength int, fragmentCount int) []string {
keys := make([]string, fragmentCount)
for i := 0; i < fragmentCount; i++ {
keys[i] = GetFragmentKey(fileKey, prefixLength, fragmentCount, i)
}
return keys
}

// RecombineFragments recombines fragments into a single file.
// Returns an error if any fragments are missing.
func RecombineFragments(fragments []*Fragment) ([]byte, error) {

if len(fragments) == 0 {
return nil, fmt.Errorf("no fragments")
}

// Sort the fragments by index
sort.Slice(fragments, func(i, j int) bool {
return fragments[i].Index < fragments[j].Index
})

// Make sure there aren't any gaps in the fragment indices
dataSize := 0
for i, fragment := range fragments {
if fragment.Index != i {
return nil, fmt.Errorf("missing fragment with index %d", i)
}
dataSize += len(fragment.Data)
}

// Make sure we have the last fragment
if !strings.HasSuffix(fragments[len(fragments)-1].FragmentKey, "f") {
return nil, fmt.Errorf("missing final fragment")
}

fragmentSize := len(fragments[0].Data)

// Concatenate the data
result := make([]byte, dataSize)
for _, fragment := range fragments {
copy(result[fragment.Index*fragmentSize:], fragment.Data)
}

return result, nil
}
Loading
Loading