Skip to content

Commit

Permalink
Fixing MinObject related issues in MRDWrapper (#2936)
Browse files Browse the repository at this point in the history
Ensure MRD's minObject is updated when inode's minObject is updated.

Ensure that we set MRD to nil if there were any errors while creating it. (We observed that NewMultiRangeDownloader returned a non nil value for MRD even in case of error)

b/391534052
  • Loading branch information
abhishek10004 authored Jan 31, 2025
1 parent 45a8f47 commit d0c8162
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 9 deletions.
18 changes: 17 additions & 1 deletion internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ func NewFileInode(
unlinked: false,
config: cfg,
globalMaxWriteBlocksSem: globalMaxBlocksSem,
MRDWrapper: gcsx.NewMultiRangeDownloaderWrapper(bucket, &minObj),
}
var err error
f.MRDWrapper, err = gcsx.NewMultiRangeDownloaderWrapper(bucket, &f.src)
if err != nil {
logger.Errorf("NewFileInode: Error in creating MRDWrapper %v", err)
}

f.lc.Init(id)
Expand Down Expand Up @@ -697,6 +701,7 @@ func (f *FileInode) SetMtime(
minObj = *minObjPtr
}
f.src = minObj
f.updateMRDWrapper()
return
}

Expand Down Expand Up @@ -832,6 +837,8 @@ func (f *FileInode) Flush(ctx context.Context) (err error) {
func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
f.src = *minObj
// Update MRDWrapper
f.updateMRDWrapper()
// Convert localFile to nonLocalFile after it is synced to GCS.
if f.IsLocal() {
f.local = false
Expand All @@ -848,6 +855,15 @@ func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
return
}

// Updates the min object stored in MRDWrapper corresponding to the inode.
// Should be called when minObject associated with inode is updated.
func (f *FileInode) updateMRDWrapper() {
err := f.MRDWrapper.SetMinObject(&f.src)
if err != nil {
logger.Errorf("FileInode::updateMRDWrapper Error in setting minObject %v", err)
}
}

// Truncate the file to the specified size.
//
// LOCKS_REQUIRED(f.mu)
Expand Down
22 changes: 22 additions & 0 deletions internal/fs/inode/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ func (t *FileTest) TestWriteThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)
Expand Down Expand Up @@ -489,6 +492,8 @@ func (t *FileTest) TestWriteToLocalFileThenSync() {
assert.Equal(t.T(),
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])
// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())
assert.Nil(t.T(), err)
Expand Down Expand Up @@ -547,6 +552,8 @@ func (t *FileTest) TestSyncEmptyLocalFile() {
assert.Equal(t.T(), t.in.SourceGeneration().Object, m.Generation)
assert.Equal(t.T(), t.in.SourceGeneration().Metadata, m.MetaGeneration)
assert.Equal(t.T(), uint64(0), m.Size)
// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())
// Validate the mtime.
mtimeInBucket, ok := m.Metadata["gcsfuse_mtime"]
assert.True(t.T(), ok)
Expand Down Expand Up @@ -620,6 +627,9 @@ func (t *FileTest) TestAppendThenSync() {
writeTime.UTC().Format(time.RFC3339Nano),
m.Metadata["gcsfuse_mtime"])

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())

Expand Down Expand Up @@ -676,6 +686,9 @@ func (t *FileTest) TestTruncateDownwardThenSync() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)
Expand Down Expand Up @@ -742,6 +755,9 @@ func (t *FileTest) TestTruncateUpwardThenFlush() {
// The generation should have advanced.
assert.Less(t.T(), t.backingObj.Generation, t.in.SourceGeneration().Object)

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

// Stat the current object in the bucket.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)
Expand Down Expand Up @@ -1031,6 +1047,9 @@ func (t *FileTest) TestSyncFlush_Clobbered() {
err = t.in.Flush(t.ctx)
}

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

// Check if the error is a FileClobberedError
var fcErr *gcsfuse_errors.FileClobberedError
assert.True(t.T(), errors.As(err, &fcErr), "expected FileClobberedError but got %v", err)
Expand Down Expand Up @@ -1155,6 +1174,9 @@ func (t *FileTest) TestSetMtime_ContentDirty() {
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
m, _, err := t.bucket.StatObject(t.ctx, statReq)

// Validate MinObject in MRDWrapper is same as the MinObject in inode.
assert.Same(t.T(), &t.in.src, t.in.MRDWrapper.GetMinObject())

assert.Nil(t.T(), err)
assert.NotNil(t.T(), m)
assert.Equal(t.T(),
Expand Down
31 changes: 28 additions & 3 deletions internal/gcsx/multi_range_downloader_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,21 @@ import (
// it's refcount reaches 0.
const multiRangeDownloaderTimeout = 60 * time.Second

func NewMultiRangeDownloaderWrapper(bucket gcs.Bucket, object *gcs.MinObject) MultiRangeDownloaderWrapper {
func NewMultiRangeDownloaderWrapper(bucket gcs.Bucket, object *gcs.MinObject) (MultiRangeDownloaderWrapper, error) {
return NewMultiRangeDownloaderWrapperWithClock(bucket, object, clock.RealClock{})
}

func NewMultiRangeDownloaderWrapperWithClock(bucket gcs.Bucket, object *gcs.MinObject, clock clock.Clock) MultiRangeDownloaderWrapper {
func NewMultiRangeDownloaderWrapperWithClock(bucket gcs.Bucket, object *gcs.MinObject, clock clock.Clock) (MultiRangeDownloaderWrapper, error) {
if object == nil {
return MultiRangeDownloaderWrapper{}, fmt.Errorf("NewMultiRangeDownloaderWrapperWithClock: Missing MinObject")
}
// In case of a local inode, MRDWrapper would be created with an empty minObject (i.e. with a minObject without any information)
// and when the object is actually created, MRDWrapper would be updated using SetMinObject method.
return MultiRangeDownloaderWrapper{
clock: clock,
bucket: bucket,
object: object,
}
}, nil
}

type readResult struct {
Expand All @@ -56,6 +61,7 @@ type MultiRangeDownloaderWrapper struct {
Wrapped gcs.MultiRangeDownloader

// Bucket and object details for MultiRangeDownloader.
// Object should not be nil.
object *gcs.MinObject
bucket gcs.Bucket

Expand All @@ -69,6 +75,20 @@ type MultiRangeDownloaderWrapper struct {
clock clock.Clock
}

// Sets the gcs.MinObject stored in the wrapper to passed value, only if it's non nil.
func (mrdWrapper *MultiRangeDownloaderWrapper) SetMinObject(minObj *gcs.MinObject) error {
if minObj == nil {
return fmt.Errorf("MultiRangeDownloaderWrapper::SetMinObject: Missing MinObject")
}
mrdWrapper.object = minObj
return nil
}

// Returns the minObject stored in MultiRangeDownloaderWrapper. Used only for unit testing.
func (mrdWrapper *MultiRangeDownloaderWrapper) GetMinObject() *gcs.MinObject {
return mrdWrapper.object
}

// Returns current refcount.
func (mrdWrapper *MultiRangeDownloaderWrapper) GetRefCount() int {
mrdWrapper.mu.Lock()
Expand Down Expand Up @@ -139,6 +159,10 @@ func (mrdWrapper *MultiRangeDownloaderWrapper) cleanupMultiRangeDownloader() {

// Ensures that MultiRangeDownloader exists, creating it if it does not exist.
func (mrdWrapper *MultiRangeDownloaderWrapper) ensureMultiRangeDownloader() (err error) {
if mrdWrapper.object == nil || mrdWrapper.bucket == nil {
return fmt.Errorf("ensureMultiRangeDownloader error: Missing minObject or bucket")
}

if mrdWrapper.Wrapped == nil {
mrdWrapper.Wrapped, err = mrdWrapper.bucket.NewMultiRangeDownloader(context.Background(), &gcs.MultiRangeDownloaderRequest{
Name: mrdWrapper.object.Name,
Expand All @@ -161,6 +185,7 @@ func (mrdWrapper *MultiRangeDownloaderWrapper) Read(ctx context.Context, buf []b
err = mrdWrapper.ensureMultiRangeDownloader()
if err != nil {
err = fmt.Errorf("MultiRangeDownloaderWrapper::Read: Error in creating MultiRangeDownloader: %v", err)
mrdWrapper.Wrapped = nil
return
}

Expand Down
116 changes: 115 additions & 1 deletion internal/gcsx/multi_range_downloader_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gcsx

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -45,6 +46,7 @@ func TestMRDWrapperTestSuite(t *testing.T) {
}

func (t *mrdWrapperTest) SetupTest() {
var err error
t.object = &gcs.MinObject{
Name: "foo",
Size: 100,
Expand All @@ -54,7 +56,8 @@ func (t *mrdWrapperTest) SetupTest() {
// Create the bucket.
t.mockBucket = new(storage.TestifyMockBucket)
t.mrdTimeout = time.Millisecond
t.mrdWrapper = NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{WaitTime: t.mrdTimeout})
t.mrdWrapper, err = NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{WaitTime: t.mrdTimeout})
assert.Nil(t.T(), err, "Error in creating MRDWrapper")
t.mrdWrapper.Wrapped = fake.NewFakeMultiRangeDownloaderWithSleep(t.object, t.objectData, time.Microsecond)
t.mrdWrapper.refCount = 0
}
Expand Down Expand Up @@ -156,3 +159,114 @@ func (t *mrdWrapperTest) Test_Read() {
})
}
}

func (t *mrdWrapperTest) Test_NewMultiRangeDownloaderWrapper() {
testCases := []struct {
name string
bucket gcs.Bucket
obj *gcs.MinObject
err error
}{
{
name: "ValidParameters",
bucket: t.mockBucket,
obj: t.object,
err: nil,
},
{
name: "NilMinObject",
bucket: t.mockBucket,
obj: nil,
err: fmt.Errorf("NewMultiRangeDownloaderWrapperWithClock: Missing MinObject"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func() {
_, err := NewMultiRangeDownloaderWrapper(tc.bucket, tc.obj)
if tc.err == nil {
assert.NoError(t.T(), err)
} else {
assert.Error(t.T(), err)
assert.EqualError(t.T(), err, tc.err.Error())
}
})
}
}

func (t *mrdWrapperTest) Test_SetMinObject() {
testCases := []struct {
name string
obj *gcs.MinObject
err error
}{
{
name: "ValidMinObject",
obj: t.object,
err: nil,
},
{
name: "NilMinObject",
obj: nil,
err: fmt.Errorf("MultiRangeDownloaderWrapper::SetMinObject: Missing MinObject"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func() {
err := t.mrdWrapper.SetMinObject(tc.obj)
if tc.err == nil {
assert.NoError(t.T(), err)
} else {
assert.Error(t.T(), err)
assert.EqualError(t.T(), err, tc.err.Error())
}
})
}
}

func (t *mrdWrapperTest) Test_EnsureMultiRangeDownloader() {
testCases := []struct {
name string
obj *gcs.MinObject
bucket gcs.Bucket
err error
}{
{
name: "ValidMinObject",
obj: t.object,
bucket: t.mockBucket,
err: nil,
},
{
name: "NilMinObject",
obj: nil,
bucket: t.mockBucket,
err: fmt.Errorf("ensureMultiRangeDownloader error: Missing minObject or bucket"),
},
{
name: "NilBucket",
obj: t.object,
bucket: nil,
err: fmt.Errorf("ensureMultiRangeDownloader error: Missing minObject or bucket"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func() {
t.mrdWrapper.bucket = tc.bucket
t.mrdWrapper.object = tc.obj
t.mrdWrapper.Wrapped = nil
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, t.objectData, time.Microsecond))
err := t.mrdWrapper.ensureMultiRangeDownloader()
if tc.err == nil {
assert.NoError(t.T(), err)
assert.NotNil(t.T(), t.mrdWrapper.Wrapped)
} else {
assert.Error(t.T(), err)
assert.EqualError(t.T(), err, tc.err.Error())
assert.Nil(t.T(), t.mrdWrapper.Wrapped)
}
})
}
}
12 changes: 8 additions & 4 deletions internal/gcsx/random_reader_stretchr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ func (t *RandomReaderStretchrTest) Test_ReadAt_MRDRead() {
t.rr.wrapped.seeks = minSeeksForRandom + 1
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper, err := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
assert.Nil(t.T(), err, "Error in creating MRDWrapper")
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down Expand Up @@ -688,7 +689,8 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ReadFull() {
t.rr.wrapped.isMRDInUse = false
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper, err := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
assert.Nil(t.T(), err, "Error in creating MRDWrapper")
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down Expand Up @@ -722,7 +724,8 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ReadChunk() {
t.rr.wrapped.reader = nil
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper, err := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
assert.Nil(t.T(), err, "Error in creating MRDWrapper")
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, time.Microsecond)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down Expand Up @@ -761,7 +764,8 @@ func (t *RandomReaderStretchrTest) Test_ReadFromMultiRangeReader_ValidateTimeout
t.rr.wrapped.isMRDInUse = false
t.object.Size = uint64(tc.dataSize)
testContent := testutil.GenerateRandomBytes(int(t.object.Size))
fakeMRDWrapper := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
fakeMRDWrapper, err := NewMultiRangeDownloaderWrapperWithClock(t.mockBucket, t.object, &clock.FakeClock{})
assert.Nil(t.T(), err, "Error in creating MRDWrapper")
t.rr.wrapped.mrdWrapper = &fakeMRDWrapper
t.mockBucket.On("NewMultiRangeDownloader", mock.Anything, mock.Anything).Return(fake.NewFakeMultiRangeDownloaderWithSleep(t.object, testContent, tc.sleepTime)).Times(1)
t.mockBucket.On("BucketType", mock.Anything).Return(gcs.BucketType{Zonal: true}).Times(1)
Expand Down

0 comments on commit d0c8162

Please sign in to comment.