Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update google-cloud-storage parameters and object retention setting #2794

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 38 additions & 64 deletions das/google_cloud_storage_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

type GoogleCloudStorageOperator interface {
Bucket(name string) *googlestorage.BucketHandle
Upload(ctx context.Context, bucket, objectPrefix string, value []byte) error
Upload(ctx context.Context, bucket, objectPrefix string, value []byte, discardAfterTimeout bool, timeout uint64) error
Download(ctx context.Context, bucket, objectPrefix string, key common.Hash) ([]byte, error)
Close(ctx context.Context) error
}
Expand All @@ -36,15 +36,21 @@ func (g *GoogleCloudStorageClient) Bucket(name string) *googlestorage.BucketHand
return g.client.Bucket(name)
}

func (g *GoogleCloudStorageClient) Upload(ctx context.Context, bucket, objectPrefix string, value []byte) error {
func (g *GoogleCloudStorageClient) Upload(ctx context.Context, bucket, objectPrefix string, value []byte, discardAfterTimeout bool, timeout uint64) error {
obj := g.client.Bucket(bucket).Object(objectPrefix + EncodeStorageServiceKey(dastree.Hash(value)))
w := obj.NewWriter(ctx)

if discardAfterTimeout && timeout <= math.MaxInt64 {
w.Retention = &googlestorage.ObjectRetention{
Mode: "Unlocked",
RetainUntil: time.Unix(int64(timeout), 0),
}
}

if _, err := fmt.Fprintln(w, value); err != nil {
return err
}
return w.Close()

}

func (g *GoogleCloudStorageClient) Download(ctx context.Context, bucket, objectPrefix string, key common.Hash) ([]byte, error) {
Expand All @@ -61,90 +67,60 @@ func (g *GoogleCloudStorageClient) Close(ctx context.Context) error {
}

type GoogleCloudStorageServiceConfig struct {
Enable bool `koanf:"enable"`
AccessToken string `koanf:"access-token"`
Bucket string `koanf:"bucket"`
ObjectPrefix string `koanf:"object-prefix"`
EnableExpiry bool `koanf:"enable-expiry"`
MaxRetention time.Duration `koanf:"max-retention"`
Enable bool `koanf:"enable"`
AccessToken string `koanf:"access-token"`
AccessTokenFile string `koanf:"access-token-file"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to leave support for access-token as well as add support for access-token-file for maximum flexibility

Bucket string `koanf:"bucket"`
ObjectPrefix string `koanf:"object-prefix"`
DiscardAfterTimeout bool `koanf:"discard-after-timeout"`
}

var DefaultGoogleCloudStorageServiceConfig = GoogleCloudStorageServiceConfig{}

func GoogleCloudConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultGoogleCloudStorageServiceConfig.Enable, "EXPERIMENTAL/unsupported - enable storage/retrieval of sequencer batch data from an Google Cloud Storage bucket")
f.String(prefix+".access-token", DefaultGoogleCloudStorageServiceConfig.AccessToken, "Google Cloud Storage access token")
f.String(prefix+".access-token", DefaultGoogleCloudStorageServiceConfig.AccessToken, "Google Cloud Storage access token (JSON string)")
f.String(prefix+".access-token-file", DefaultGoogleCloudStorageServiceConfig.AccessTokenFile, "Google Cloud Storage access token (JSON file path)")
f.String(prefix+".bucket", DefaultGoogleCloudStorageServiceConfig.Bucket, "Google Cloud Storage bucket")
f.String(prefix+".object-prefix", DefaultGoogleCloudStorageServiceConfig.ObjectPrefix, "prefix to add to Google Cloud Storage objects")
f.Bool(prefix+".enable-expiry", DefaultLocalFileStorageConfig.EnableExpiry, "enable expiry of batches")
f.Duration(prefix+".max-retention", DefaultLocalFileStorageConfig.MaxRetention, "store requests with expiry times farther in the future than max-retention will be rejected")

f.Bool(prefix+".discard-after-timeout", DefaultGoogleCloudStorageServiceConfig.DiscardAfterTimeout, "discard data after its expiry timeout")
}

type GoogleCloudStorageService struct {
operator GoogleCloudStorageOperator
bucket string
objectPrefix string
enableExpiry bool
maxRetention time.Duration
operator GoogleCloudStorageOperator
bucket string
objectPrefix string
discardAfterTimeout bool
}

func NewGoogleCloudStorageService(config GoogleCloudStorageServiceConfig) (StorageService, error) {
var client *googlestorage.Client
var err error
// Note that if the credentials are not specified, the client library will find credentials using ADC(Application Default Credentials)
// https://cloud.google.com/docs/authentication/provide-credentials-adc.
if config.AccessToken == "" {
client, err = googlestorage.NewClient(context.Background())
} else {
if config.AccessToken != "" {
client, err = googlestorage.NewClient(context.Background(), option.WithCredentialsJSON([]byte(config.AccessToken)))
} else if config.AccessTokenFile != "" {
client, err = googlestorage.NewClient(context.Background(), option.WithCredentialsFile(config.AccessTokenFile))
} else {
client, err = googlestorage.NewClient(context.Background())
}
if err != nil {
return nil, fmt.Errorf("error creating Google Cloud Storage client: %w", err)
}
service := &GoogleCloudStorageService{
operator: &GoogleCloudStorageClient{client: client},
bucket: config.Bucket,
objectPrefix: config.ObjectPrefix,
enableExpiry: config.EnableExpiry,
maxRetention: config.MaxRetention,
}
if config.EnableExpiry {
lifecycleRule := googlestorage.LifecycleRule{
Action: googlestorage.LifecycleAction{Type: "Delete"},
Condition: googlestorage.LifecycleCondition{AgeInDays: int64(config.MaxRetention.Hours() / 24)}, // Objects older than 30 days
}
ctx := context.Background()
bucket := service.operator.Bucket(service.bucket)
// check if bucket exists (and others), and update expiration policy if enabled
attrs, err := bucket.Attrs(ctx)
if err != nil {
return nil, fmt.Errorf("error getting bucket attributes: %w", err)
}
attrs.Lifecycle.Rules = append(attrs.Lifecycle.Rules, lifecycleRule)

bucketAttrsToUpdate := googlestorage.BucketAttrsToUpdate{
Lifecycle: &attrs.Lifecycle,
}
if _, err := bucket.Update(ctx, bucketAttrsToUpdate); err != nil {
return nil, fmt.Errorf("failed to update bucket lifecycle: %w", err)
}
operator: &GoogleCloudStorageClient{client: client},
bucket: config.Bucket,
objectPrefix: config.ObjectPrefix,
discardAfterTimeout: config.DiscardAfterTimeout,
}
return service, nil
}

func (gcs *GoogleCloudStorageService) Put(ctx context.Context, data []byte, expiry uint64) error {
logPut("das.GoogleCloudStorageService.Store", data, expiry, gcs)
if expiry > math.MaxInt64 {
return fmt.Errorf("request expiry time (%v) exceeds max int64", expiry)
}
// #nosec G115
expiryTime := time.Unix(int64(expiry), 0)
currentTimePlusRetention := time.Now().Add(gcs.maxRetention)
if expiryTime.After(currentTimePlusRetention) {
return fmt.Errorf("requested expiry time (%v) exceeds current time plus maximum allowed retention period(%v)", expiryTime, currentTimePlusRetention)
}
if err := gcs.operator.Upload(ctx, gcs.bucket, gcs.objectPrefix, data); err != nil {
func (gcs *GoogleCloudStorageService) Put(ctx context.Context, value []byte, timeout uint64) error {
logPut("das.GoogleCloudStorageService.Store", value, timeout, gcs)

if err := gcs.operator.Upload(ctx, gcs.bucket, gcs.objectPrefix, value, gcs.discardAfterTimeout, timeout); err != nil {
log.Error("das.GoogleCloudStorageService.Store", "err", err)
return err
}
Expand All @@ -162,10 +138,10 @@ func (gcs *GoogleCloudStorageService) GetByHash(ctx context.Context, key common.
}

func (gcs *GoogleCloudStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) {
if gcs.enableExpiry {
return daprovider.KeepForever, nil
if gcs.discardAfterTimeout {
return daprovider.DiscardAfterDataTimeout, nil
}
return daprovider.DiscardAfterDataTimeout, nil
return daprovider.KeepForever, nil
}

func (gcs *GoogleCloudStorageService) Sync(ctx context.Context) error {
Expand All @@ -184,8 +160,6 @@ func (gcs *GoogleCloudStorageService) HealthCheck(ctx context.Context) error {
bucket := gcs.operator.Bucket(gcs.bucket)
// check if we have bucket permissions
permissions := []string{
"storage.buckets.get",
"storage.buckets.list",
"storage.objects.create",
"storage.objects.delete",
"storage.objects.list",
Expand Down
5 changes: 2 additions & 3 deletions das/google_cloud_storage_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (c *mockGCSClient) Close(ctx context.Context) error {
return nil
}

func (c *mockGCSClient) Upload(ctx context.Context, bucket, objectPrefix string, value []byte) error {
func (c *mockGCSClient) Upload(ctx context.Context, bucket, objectPrefix string, value []byte, discardAfterTimeout bool, timeout uint64) error {
key := objectPrefix + EncodeStorageServiceKey(dastree.Hash(value))
c.storage[key] = value
return nil
Expand All @@ -47,7 +47,7 @@ func NewTestGoogleCloudStorageService(ctx context.Context, googleCloudStorageCon
operator: &mockGCSClient{
storage: make(map[string][]byte),
},
maxRetention: googleCloudStorageConfig.MaxRetention,
discardAfterTimeout: true,
}, nil
}

Expand All @@ -57,7 +57,6 @@ func TestNewGoogleCloudStorageService(t *testing.T) {
expiry := uint64(time.Now().Add(time.Hour).Unix())
googleCloudStorageServiceConfig := DefaultGoogleCloudStorageServiceConfig
googleCloudStorageServiceConfig.Enable = true
googleCloudStorageServiceConfig.MaxRetention = time.Hour * 24
googleCloudService, err := NewTestGoogleCloudStorageService(ctx, googleCloudStorageServiceConfig)
Require(t, err)

Expand Down
Loading