diff --git a/.licenses-gomod.sha256 b/.licenses-gomod.sha256 index 2d9f23e4c4..c4257748a3 100644 --- a/.licenses-gomod.sha256 +++ b/.licenses-gomod.sha256 @@ -1 +1 @@ -100644 b0aa86d2e7007d35deadc3f5a230e6ae6795e63e go.mod +100644 eebc1d45861288f3d533cf011a65dfd4c822945a go.mod diff --git a/.mockery.yaml b/.mockery.yaml index 3b84385c74..a82f64d301 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -11,4 +11,5 @@ packages: github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/dbuser: github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/deployment: github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/customroles: + github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/datafederation: github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/teams: diff --git a/config/crd/bases/atlas.mongodb.com_atlasdatafederations.yaml b/config/crd/bases/atlas.mongodb.com_atlasdatafederations.yaml index cbc30186df..fe6c2dc1b2 100644 --- a/config/crd/bases/atlas.mongodb.com_atlasdatafederations.yaml +++ b/config/crd/bases/atlas.mongodb.com_atlasdatafederations.yaml @@ -84,10 +84,16 @@ spec: privateEndpoints: items: properties: + comment: + type: string + customerEndpointDNSName: + type: string endpointId: type: string provider: type: string + region: + type: string type: type: string type: object @@ -105,6 +111,8 @@ spec: required: - name type: object + skipRoleValidation: + type: boolean storage: properties: databases: @@ -126,6 +134,10 @@ spec: type: string databaseRegex: type: string + datasetName: + type: string + datasetPrefix: + type: string defaultFormat: enum: - .avro @@ -153,6 +165,8 @@ spec: type: string storeName: type: string + trimLevel: + type: integer urls: items: type: string @@ -187,8 +201,14 @@ spec: items: type: string type: array + allowInsecure: + type: boolean bucket: type: string + clusterName: + type: string + defaultFormat: + type: string delimiter: type: string includeTags: @@ -201,8 +221,35 @@ spec: type: string public: type: boolean + readConcern: + properties: + level: + type: string + type: object + readPreference: + properties: + maxStalenessSeconds: + type: integer + mode: + type: string + tagSets: + items: + items: + properties: + name: + type: string + value: + type: string + type: object + type: array + type: array + type: object region: type: string + urls: + items: + type: string + type: array type: object type: array type: object diff --git a/go.mod b/go.mod index b0aa86d2e7..eebc1d4586 100644 --- a/go.mod +++ b/go.mod @@ -108,7 +108,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/go-querystring v1.1.0 // indirect - github.com/google/gofuzz v1.2.0 // indirect + github.com/google/gofuzz v1.2.0 github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/imdario/mergo v0.3.12 // indirect diff --git a/internal/mocks/translation/data_federation_service.go b/internal/mocks/translation/data_federation_service.go new file mode 100644 index 0000000000..ccda3410fb --- /dev/null +++ b/internal/mocks/translation/data_federation_service.go @@ -0,0 +1,240 @@ +// Code generated by mockery. DO NOT EDIT. + +package translation + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + datafederation "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/datafederation" +) + +// DataFederationServiceMock is an autogenerated mock type for the DataFederationService type +type DataFederationServiceMock struct { + mock.Mock +} + +type DataFederationServiceMock_Expecter struct { + mock *mock.Mock +} + +func (_m *DataFederationServiceMock) EXPECT() *DataFederationServiceMock_Expecter { + return &DataFederationServiceMock_Expecter{mock: &_m.Mock} +} + +// Create provides a mock function with given fields: ctx, df +func (_m *DataFederationServiceMock) Create(ctx context.Context, df *datafederation.DataFederation) error { + ret := _m.Called(ctx, df) + + if len(ret) == 0 { + panic("no return value specified for Create") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *datafederation.DataFederation) error); ok { + r0 = rf(ctx, df) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DataFederationServiceMock_Create_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Create' +type DataFederationServiceMock_Create_Call struct { + *mock.Call +} + +// Create is a helper method to define mock.On call +// - ctx context.Context +// - df *datafederation.DataFederation +func (_e *DataFederationServiceMock_Expecter) Create(ctx interface{}, df interface{}) *DataFederationServiceMock_Create_Call { + return &DataFederationServiceMock_Create_Call{Call: _e.mock.On("Create", ctx, df)} +} + +func (_c *DataFederationServiceMock_Create_Call) Run(run func(ctx context.Context, df *datafederation.DataFederation)) *DataFederationServiceMock_Create_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*datafederation.DataFederation)) + }) + return _c +} + +func (_c *DataFederationServiceMock_Create_Call) Return(_a0 error) *DataFederationServiceMock_Create_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DataFederationServiceMock_Create_Call) RunAndReturn(run func(context.Context, *datafederation.DataFederation) error) *DataFederationServiceMock_Create_Call { + _c.Call.Return(run) + return _c +} + +// Delete provides a mock function with given fields: ctx, projectID, name +func (_m *DataFederationServiceMock) Delete(ctx context.Context, projectID string, name string) error { + ret := _m.Called(ctx, projectID, name) + + if len(ret) == 0 { + panic("no return value specified for Delete") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, projectID, name) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DataFederationServiceMock_Delete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delete' +type DataFederationServiceMock_Delete_Call struct { + *mock.Call +} + +// Delete is a helper method to define mock.On call +// - ctx context.Context +// - projectID string +// - name string +func (_e *DataFederationServiceMock_Expecter) Delete(ctx interface{}, projectID interface{}, name interface{}) *DataFederationServiceMock_Delete_Call { + return &DataFederationServiceMock_Delete_Call{Call: _e.mock.On("Delete", ctx, projectID, name)} +} + +func (_c *DataFederationServiceMock_Delete_Call) Run(run func(ctx context.Context, projectID string, name string)) *DataFederationServiceMock_Delete_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *DataFederationServiceMock_Delete_Call) Return(_a0 error) *DataFederationServiceMock_Delete_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DataFederationServiceMock_Delete_Call) RunAndReturn(run func(context.Context, string, string) error) *DataFederationServiceMock_Delete_Call { + _c.Call.Return(run) + return _c +} + +// Get provides a mock function with given fields: ctx, projectID, name +func (_m *DataFederationServiceMock) Get(ctx context.Context, projectID string, name string) (*datafederation.DataFederation, error) { + ret := _m.Called(ctx, projectID, name) + + if len(ret) == 0 { + panic("no return value specified for Get") + } + + var r0 *datafederation.DataFederation + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (*datafederation.DataFederation, error)); ok { + return rf(ctx, projectID, name) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string) *datafederation.DataFederation); ok { + r0 = rf(ctx, projectID, name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datafederation.DataFederation) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, projectID, name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DataFederationServiceMock_Get_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Get' +type DataFederationServiceMock_Get_Call struct { + *mock.Call +} + +// Get is a helper method to define mock.On call +// - ctx context.Context +// - projectID string +// - name string +func (_e *DataFederationServiceMock_Expecter) Get(ctx interface{}, projectID interface{}, name interface{}) *DataFederationServiceMock_Get_Call { + return &DataFederationServiceMock_Get_Call{Call: _e.mock.On("Get", ctx, projectID, name)} +} + +func (_c *DataFederationServiceMock_Get_Call) Run(run func(ctx context.Context, projectID string, name string)) *DataFederationServiceMock_Get_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *DataFederationServiceMock_Get_Call) Return(_a0 *datafederation.DataFederation, _a1 error) *DataFederationServiceMock_Get_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DataFederationServiceMock_Get_Call) RunAndReturn(run func(context.Context, string, string) (*datafederation.DataFederation, error)) *DataFederationServiceMock_Get_Call { + _c.Call.Return(run) + return _c +} + +// Update provides a mock function with given fields: ctx, df +func (_m *DataFederationServiceMock) Update(ctx context.Context, df *datafederation.DataFederation) error { + ret := _m.Called(ctx, df) + + if len(ret) == 0 { + panic("no return value specified for Update") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *datafederation.DataFederation) error); ok { + r0 = rf(ctx, df) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DataFederationServiceMock_Update_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Update' +type DataFederationServiceMock_Update_Call struct { + *mock.Call +} + +// Update is a helper method to define mock.On call +// - ctx context.Context +// - df *datafederation.DataFederation +func (_e *DataFederationServiceMock_Expecter) Update(ctx interface{}, df interface{}) *DataFederationServiceMock_Update_Call { + return &DataFederationServiceMock_Update_Call{Call: _e.mock.On("Update", ctx, df)} +} + +func (_c *DataFederationServiceMock_Update_Call) Run(run func(ctx context.Context, df *datafederation.DataFederation)) *DataFederationServiceMock_Update_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*datafederation.DataFederation)) + }) + return _c +} + +func (_c *DataFederationServiceMock_Update_Call) Return(_a0 error) *DataFederationServiceMock_Update_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DataFederationServiceMock_Update_Call) RunAndReturn(run func(context.Context, *datafederation.DataFederation) error) *DataFederationServiceMock_Update_Call { + _c.Call.Return(run) + return _c +} + +// NewDataFederationServiceMock creates a new instance of DataFederationServiceMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDataFederationServiceMock(t interface { + mock.TestingT + Cleanup(func()) +}) *DataFederationServiceMock { + mock := &DataFederationServiceMock{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/translation/datafederation_private_endpoint_service.go b/internal/mocks/translation/datafederation_private_endpoint_service.go new file mode 100644 index 0000000000..c00af20b12 --- /dev/null +++ b/internal/mocks/translation/datafederation_private_endpoint_service.go @@ -0,0 +1,191 @@ +// Code generated by mockery. DO NOT EDIT. + +package translation + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + datafederation "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/datafederation" +) + +// DatafederationPrivateEndpointServiceMock is an autogenerated mock type for the DatafederationPrivateEndpointService type +type DatafederationPrivateEndpointServiceMock struct { + mock.Mock +} + +type DatafederationPrivateEndpointServiceMock_Expecter struct { + mock *mock.Mock +} + +func (_m *DatafederationPrivateEndpointServiceMock) EXPECT() *DatafederationPrivateEndpointServiceMock_Expecter { + return &DatafederationPrivateEndpointServiceMock_Expecter{mock: &_m.Mock} +} + +// Create provides a mock function with given fields: _a0, _a1 +func (_m *DatafederationPrivateEndpointServiceMock) Create(_a0 context.Context, _a1 *datafederation.DatafederationPrivateEndpointEntry) error { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for Create") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *datafederation.DatafederationPrivateEndpointEntry) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DatafederationPrivateEndpointServiceMock_Create_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Create' +type DatafederationPrivateEndpointServiceMock_Create_Call struct { + *mock.Call +} + +// Create is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *datafederation.DatafederationPrivateEndpointEntry +func (_e *DatafederationPrivateEndpointServiceMock_Expecter) Create(_a0 interface{}, _a1 interface{}) *DatafederationPrivateEndpointServiceMock_Create_Call { + return &DatafederationPrivateEndpointServiceMock_Create_Call{Call: _e.mock.On("Create", _a0, _a1)} +} + +func (_c *DatafederationPrivateEndpointServiceMock_Create_Call) Run(run func(_a0 context.Context, _a1 *datafederation.DatafederationPrivateEndpointEntry)) *DatafederationPrivateEndpointServiceMock_Create_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*datafederation.DatafederationPrivateEndpointEntry)) + }) + return _c +} + +func (_c *DatafederationPrivateEndpointServiceMock_Create_Call) Return(_a0 error) *DatafederationPrivateEndpointServiceMock_Create_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DatafederationPrivateEndpointServiceMock_Create_Call) RunAndReturn(run func(context.Context, *datafederation.DatafederationPrivateEndpointEntry) error) *DatafederationPrivateEndpointServiceMock_Create_Call { + _c.Call.Return(run) + return _c +} + +// Delete provides a mock function with given fields: _a0, _a1 +func (_m *DatafederationPrivateEndpointServiceMock) Delete(_a0 context.Context, _a1 *datafederation.DatafederationPrivateEndpointEntry) error { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for Delete") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *datafederation.DatafederationPrivateEndpointEntry) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DatafederationPrivateEndpointServiceMock_Delete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delete' +type DatafederationPrivateEndpointServiceMock_Delete_Call struct { + *mock.Call +} + +// Delete is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *datafederation.DatafederationPrivateEndpointEntry +func (_e *DatafederationPrivateEndpointServiceMock_Expecter) Delete(_a0 interface{}, _a1 interface{}) *DatafederationPrivateEndpointServiceMock_Delete_Call { + return &DatafederationPrivateEndpointServiceMock_Delete_Call{Call: _e.mock.On("Delete", _a0, _a1)} +} + +func (_c *DatafederationPrivateEndpointServiceMock_Delete_Call) Run(run func(_a0 context.Context, _a1 *datafederation.DatafederationPrivateEndpointEntry)) *DatafederationPrivateEndpointServiceMock_Delete_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*datafederation.DatafederationPrivateEndpointEntry)) + }) + return _c +} + +func (_c *DatafederationPrivateEndpointServiceMock_Delete_Call) Return(_a0 error) *DatafederationPrivateEndpointServiceMock_Delete_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *DatafederationPrivateEndpointServiceMock_Delete_Call) RunAndReturn(run func(context.Context, *datafederation.DatafederationPrivateEndpointEntry) error) *DatafederationPrivateEndpointServiceMock_Delete_Call { + _c.Call.Return(run) + return _c +} + +// List provides a mock function with given fields: ctx, projectID +func (_m *DatafederationPrivateEndpointServiceMock) List(ctx context.Context, projectID string) ([]*datafederation.DatafederationPrivateEndpointEntry, error) { + ret := _m.Called(ctx, projectID) + + if len(ret) == 0 { + panic("no return value specified for List") + } + + var r0 []*datafederation.DatafederationPrivateEndpointEntry + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]*datafederation.DatafederationPrivateEndpointEntry, error)); ok { + return rf(ctx, projectID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) []*datafederation.DatafederationPrivateEndpointEntry); ok { + r0 = rf(ctx, projectID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*datafederation.DatafederationPrivateEndpointEntry) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, projectID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DatafederationPrivateEndpointServiceMock_List_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'List' +type DatafederationPrivateEndpointServiceMock_List_Call struct { + *mock.Call +} + +// List is a helper method to define mock.On call +// - ctx context.Context +// - projectID string +func (_e *DatafederationPrivateEndpointServiceMock_Expecter) List(ctx interface{}, projectID interface{}) *DatafederationPrivateEndpointServiceMock_List_Call { + return &DatafederationPrivateEndpointServiceMock_List_Call{Call: _e.mock.On("List", ctx, projectID)} +} + +func (_c *DatafederationPrivateEndpointServiceMock_List_Call) Run(run func(ctx context.Context, projectID string)) *DatafederationPrivateEndpointServiceMock_List_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *DatafederationPrivateEndpointServiceMock_List_Call) Return(_a0 []*datafederation.DatafederationPrivateEndpointEntry, _a1 error) *DatafederationPrivateEndpointServiceMock_List_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DatafederationPrivateEndpointServiceMock_List_Call) RunAndReturn(run func(context.Context, string) ([]*datafederation.DatafederationPrivateEndpointEntry, error)) *DatafederationPrivateEndpointServiceMock_List_Call { + _c.Call.Return(run) + return _c +} + +// NewDatafederationPrivateEndpointServiceMock creates a new instance of DatafederationPrivateEndpointServiceMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDatafederationPrivateEndpointServiceMock(t interface { + mock.TestingT + Cleanup(func()) +}) *DatafederationPrivateEndpointServiceMock { + mock := &DatafederationPrivateEndpointServiceMock{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/translation/deployment.go b/internal/mocks/translation/deployment.go index afed78b109..34a567e1d6 100644 --- a/internal/mocks/translation/deployment.go +++ b/internal/mocks/translation/deployment.go @@ -5,7 +5,6 @@ package translation import ( mock "github.com/stretchr/testify/mock" - deployment "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/deployment" v1 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" status "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/status" ) @@ -24,19 +23,19 @@ func (_m *DeploymentMock) EXPECT() *DeploymentMock_Expecter { } // GetConnection provides a mock function with given fields: -func (_m *DeploymentMock) GetConnection() *deployment.Connection { +func (_m *DeploymentMock) GetConnection() *status.ConnectionStrings { ret := _m.Called() if len(ret) == 0 { panic("no return value specified for GetConnection") } - var r0 *deployment.Connection - if rf, ok := ret.Get(0).(func() *deployment.Connection); ok { + var r0 *status.ConnectionStrings + if rf, ok := ret.Get(0).(func() *status.ConnectionStrings); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*deployment.Connection) + r0 = ret.Get(0).(*status.ConnectionStrings) } } @@ -60,12 +59,12 @@ func (_c *DeploymentMock_GetConnection_Call) Run(run func()) *DeploymentMock_Get return _c } -func (_c *DeploymentMock_GetConnection_Call) Return(_a0 *deployment.Connection) *DeploymentMock_GetConnection_Call { +func (_c *DeploymentMock_GetConnection_Call) Return(_a0 *status.ConnectionStrings) *DeploymentMock_GetConnection_Call { _c.Call.Return(_a0) return _c } -func (_c *DeploymentMock_GetConnection_Call) RunAndReturn(run func() *deployment.Connection) *DeploymentMock_GetConnection_Call { +func (_c *DeploymentMock_GetConnection_Call) RunAndReturn(run func() *status.ConnectionStrings) *DeploymentMock_GetConnection_Call { _c.Call.Return(run) return _c } diff --git a/internal/translation/datafederation/conversion.go b/internal/translation/datafederation/conversion.go new file mode 100644 index 0000000000..cac17533ba --- /dev/null +++ b/internal/translation/datafederation/conversion.go @@ -0,0 +1,377 @@ +package datafederation + +import ( + "fmt" + "reflect" + + "go.mongodb.org/atlas-sdk/v20231115008/admin" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/cmp" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" +) + +type DataFederation struct { + *akov2.DataFederationSpec + ProjectID string + Hostnames []string +} + +// SpecEqualsTo returns true if the spec of the data federation instance semantically equals to the given one. +// Note: it assumes the spec is already normalized. +func (df *DataFederation) SpecEqualsTo(target *DataFederation) bool { + var dfSpecCopy, targetSpecCopy *akov2.DataFederationSpec + if df != nil { + dfSpecCopy = df.DataFederationSpec.DeepCopy() + } + if target != nil { + targetSpecCopy = target.DataFederationSpec.DeepCopy() + } + return reflect.DeepEqual(pruneSpec(dfSpecCopy), pruneSpec(targetSpecCopy)) +} + +func pruneSpec(spec *akov2.DataFederationSpec) *akov2.DataFederationSpec { + if spec == nil { + return nil + } + + // Atlas embeds AWS config as a value, AKO embeds AWS config as a pointer, + // hence treat the absence of both in AKO equally. + if spec.CloudProviderConfig == nil || spec.CloudProviderConfig.AWS == nil { + spec.CloudProviderConfig = nil + } + + // ignore project references, they are not sent to/from Atlas. + var emptyRef common.ResourceRefNamespaced + spec.Project = emptyRef + + // private endpoints are sub-resources, they have their own conversion and are not part of the data federation entity. + spec.PrivateEndpoints = nil + + // skip "SkipRoleValidation" field as it is a request parameter, not a returned body from/to Atlas. + spec.SkipRoleValidation = false + + // normalize nested empty stores/database slices + if spec.Storage != nil && (len(spec.Storage.Stores) == 0 && len(spec.Storage.Databases) == 0) { + spec.Storage = nil + } + + return spec +} + +func NewDataFederation(spec *akov2.DataFederationSpec, projectID string, hostnames []string) (*DataFederation, error) { + if spec == nil { + return nil, nil + } + + specCopy := spec.DeepCopy() + if err := cmp.Normalize(specCopy); err != nil { + return nil, fmt.Errorf("failed to normalize data federation spec: %w", err) + } + + return &DataFederation{ + DataFederationSpec: specCopy, + ProjectID: projectID, + Hostnames: hostnames, + }, nil +} + +func toAtlas(df *DataFederation) *admin.DataLakeTenant { + if df == nil || df.DataFederationSpec == nil { + return nil + } + + return &admin.DataLakeTenant{ + GroupId: pointer.MakePtrOrNil(df.ProjectID), + CloudProviderConfig: cloudProviderConfigToAtlas(df.CloudProviderConfig), + DataProcessRegion: dataProcessRegionToAtlas(df.DataProcessRegion), + Name: pointer.MakePtrOrNil(df.Name), + Storage: storageToAtlas(df.Storage), + } +} + +func fromAtlas(federation *admin.DataLakeTenant) (*DataFederation, error) { + if federation == nil { + return nil, nil + } + + return NewDataFederation( + &akov2.DataFederationSpec{ + CloudProviderConfig: cloudProviderConfigFromAtlas(federation.CloudProviderConfig), + DataProcessRegion: dataProcessRegionFromAtlas(federation.DataProcessRegion), + Name: federation.GetName(), + Storage: storageFromAtlas(federation.Storage), + }, + federation.GetGroupId(), + federation.GetHostnames(), + ) +} + +func storageFromAtlas(storage *admin.DataLakeStorage) *akov2.Storage { + if storage == nil { + return nil + } + + result := &akov2.Storage{} + for _, atlasDB := range storage.GetDatabases() { + db := akov2.Database{ + MaxWildcardCollections: atlasDB.GetMaxWildcardCollections(), + Name: atlasDB.GetName(), + } + for _, atlasCollection := range atlasDB.GetCollections() { + collection := akov2.Collection{ + Name: atlasCollection.GetName(), + } + for _, atlasDataSource := range atlasCollection.GetDataSources() { + dataSource := akov2.DataSource{ + AllowInsecure: atlasDataSource.GetAllowInsecure(), + Collection: atlasDataSource.GetCollection(), + CollectionRegex: atlasDataSource.GetCollectionRegex(), + Database: atlasDataSource.GetDatabase(), + DatabaseRegex: atlasDataSource.GetDatabaseRegex(), + DatasetName: atlasDataSource.GetDatasetName(), + DatasetPrefix: atlasDataSource.GetDatasetPrefix(), + DefaultFormat: atlasDataSource.GetDefaultFormat(), + Path: atlasDataSource.GetPath(), + ProvenanceFieldName: atlasDataSource.GetProvenanceFieldName(), + StoreName: atlasDataSource.GetStoreName(), + TrimLevel: atlasDataSource.GetTrimLevel(), + } + dataSource.Urls = append(dataSource.Urls, atlasDataSource.GetUrls()...) + collection.DataSources = append(collection.DataSources, dataSource) + } + db.Collections = append(db.Collections, collection) + } + for _, atlasView := range atlasDB.GetViews() { + db.Views = append(db.Views, akov2.View{ + Name: atlasView.GetName(), + Pipeline: atlasView.GetPipeline(), + Source: atlasView.GetSource(), + }) + } + result.Databases = append(result.Databases, db) + } + + for _, atlasStore := range storage.GetStores() { + store := akov2.Store{ + Name: atlasStore.GetName(), + Provider: atlasStore.GetProvider(), + Bucket: atlasStore.GetBucket(), + Delimiter: atlasStore.GetDelimiter(), + IncludeTags: atlasStore.GetIncludeTags(), + Prefix: atlasStore.GetPrefix(), + Public: atlasStore.GetPublic(), + Region: atlasStore.GetRegion(), + ClusterName: atlasStore.GetClusterName(), + AllowInsecure: atlasStore.GetAllowInsecure(), + DefaultFormat: atlasStore.GetDefaultFormat(), + ReadConcern: readConcernFromAtlas(atlasStore.ReadConcern), + ReadPreference: readPreferenceFromAtlas(atlasStore.ReadPreference), + } + store.Urls = append(store.Urls, atlasStore.GetUrls()...) + store.AdditionalStorageClasses = append(store.AdditionalStorageClasses, atlasStore.GetAdditionalStorageClasses()...) + result.Stores = append(result.Stores, store) + } + return result +} + +func storageToAtlas(storage *akov2.Storage) *admin.DataLakeStorage { + if storage == nil { + return nil + } + + result := &admin.DataLakeStorage{} + databases := make([]admin.DataLakeDatabaseInstance, 0, len(storage.Databases)) + for _, db := range storage.Databases { + atlasDB := admin.DataLakeDatabaseInstance{ + MaxWildcardCollections: pointer.MakePtrOrNil(db.MaxWildcardCollections), + Name: pointer.MakePtrOrNil(db.Name), + } + atlasCollections := make([]admin.DataLakeDatabaseCollection, 0, len(db.Collections)) + for _, collection := range db.Collections { + atlasCollection := admin.DataLakeDatabaseCollection{ + Name: pointer.MakePtrOrNil(collection.Name), + } + atlasDataSources := make([]admin.DataLakeDatabaseDataSourceSettings, 0, len(collection.DataSources)) + for _, dataSource := range collection.DataSources { + atlasDataSource := admin.DataLakeDatabaseDataSourceSettings{ + AllowInsecure: pointer.MakePtr(dataSource.AllowInsecure), + Collection: pointer.MakePtrOrNil(dataSource.Collection), + CollectionRegex: pointer.MakePtrOrNil(dataSource.CollectionRegex), + Database: pointer.MakePtrOrNil(dataSource.Database), + DatabaseRegex: pointer.MakePtrOrNil(dataSource.DatabaseRegex), + DatasetName: pointer.MakePtrOrNil(dataSource.DatasetName), + DatasetPrefix: pointer.MakePtrOrNil(dataSource.DatasetPrefix), + DefaultFormat: pointer.MakePtrOrNil(dataSource.DefaultFormat), + Path: pointer.MakePtrOrNil(dataSource.Path), + ProvenanceFieldName: pointer.MakePtrOrNil(dataSource.ProvenanceFieldName), + StoreName: pointer.MakePtrOrNil(dataSource.StoreName), + TrimLevel: pointer.MakePtrOrNil(dataSource.TrimLevel), + } + atlasDataSource.Urls = pointer.GetOrNilIfEmpty(append([]string{}, dataSource.Urls...)) + atlasDataSources = append(atlasDataSources, atlasDataSource) + } + atlasCollection.DataSources = pointer.GetOrNilIfEmpty(atlasDataSources) + atlasCollections = append(atlasCollections, atlasCollection) + } + atlasDB.Collections = pointer.GetOrNilIfEmpty(atlasCollections) + atlasViews := make([]admin.DataLakeApiBase, 0, len(db.Views)) + for _, view := range db.Views { + atlasViews = append(atlasViews, admin.DataLakeApiBase{ + Name: pointer.MakePtrOrNil(view.Name), + Pipeline: pointer.MakePtrOrNil(view.Pipeline), + Source: pointer.MakePtrOrNil(view.Source), + }) + } + atlasDB.Views = pointer.GetOrNilIfEmpty(atlasViews) + databases = append(databases, atlasDB) + } + result.Databases = pointer.GetOrNilIfEmpty(databases) + + stores := make([]admin.DataLakeStoreSettings, 0, len(storage.Stores)) + for _, store := range storage.Stores { + atlasStore := admin.DataLakeStoreSettings{ + Name: pointer.MakePtrOrNil(store.Name), + Provider: store.Provider, + Bucket: pointer.MakePtrOrNil(store.Bucket), + Delimiter: pointer.MakePtrOrNil(store.Delimiter), + IncludeTags: pointer.MakePtr(store.IncludeTags), + Prefix: pointer.MakePtrOrNil(store.Prefix), + Public: pointer.MakePtr(store.Public), + Region: pointer.MakePtrOrNil(store.Region), + ClusterName: pointer.MakePtrOrNil(store.ClusterName), + AllowInsecure: pointer.MakePtr(store.AllowInsecure), + DefaultFormat: pointer.MakePtrOrNil(store.DefaultFormat), + ReadConcern: readConcernToAtlas(store.ReadConcern), + ReadPreference: readPreferenceToAtlas(store.ReadPreference), + } + atlasStore.Urls = pointer.GetOrNilIfEmpty(append([]string{}, store.Urls...)) + additionalStorageClasses := make([]string, 0, len(store.AdditionalStorageClasses)) + additionalStorageClasses = append(additionalStorageClasses, store.AdditionalStorageClasses...) + atlasStore.AdditionalStorageClasses = pointer.GetOrNilIfEmpty(additionalStorageClasses) + stores = append(stores, atlasStore) + } + result.Stores = pointer.GetOrNilIfEmpty(stores) + return result +} + +func readPreferenceFromAtlas(preference *admin.DataLakeAtlasStoreReadPreference) *akov2.ReadPreference { + if preference == nil { + return nil + } + result := &akov2.ReadPreference{ + MaxStalenessSeconds: preference.GetMaxStalenessSeconds(), + Mode: preference.GetMode(), + } + for _, tagset := range preference.GetTagSets() { + var akoTags []akov2.ReadPreferenceTag + if len(tagset) > 0 { + akoTags = make([]akov2.ReadPreferenceTag, 0, len(tagset)) + for _, tag := range tagset { + akoTags = append(akoTags, akov2.ReadPreferenceTag{ + Name: tag.GetName(), + Value: tag.GetValue(), + }) + } + } + result.TagSets = append(result.TagSets, akoTags) + } + return result +} + +func readPreferenceToAtlas(preference *akov2.ReadPreference) *admin.DataLakeAtlasStoreReadPreference { + if preference == nil { + return nil + } + + var atlasTagSets [][]admin.DataLakeAtlasStoreReadPreferenceTag + if len(preference.TagSets) > 0 { + atlasTagSets = make([][]admin.DataLakeAtlasStoreReadPreferenceTag, 0, len(preference.TagSets)) + for _, tagset := range preference.TagSets { + var atlasTags []admin.DataLakeAtlasStoreReadPreferenceTag + if len(tagset) > 0 { + atlasTags = make([]admin.DataLakeAtlasStoreReadPreferenceTag, 0, len(tagset)) + for _, tag := range tagset { + atlasTags = append(atlasTags, admin.DataLakeAtlasStoreReadPreferenceTag{ + Name: pointer.MakePtrOrNil(tag.Name), + Value: pointer.MakePtrOrNil(tag.Value), + }) + } + } + atlasTagSets = append(atlasTagSets, atlasTags) + } + } + + return &admin.DataLakeAtlasStoreReadPreference{ + MaxStalenessSeconds: pointer.MakePtrOrNil(preference.MaxStalenessSeconds), + Mode: pointer.MakePtrOrNil(preference.Mode), + TagSets: pointer.GetOrNilIfEmpty(atlasTagSets), + } +} + +func readConcernFromAtlas(concern *admin.DataLakeAtlasStoreReadConcern) *akov2.ReadConcern { + if concern == nil { + return nil + } + + return &akov2.ReadConcern{ + Level: concern.GetLevel(), + } +} + +func readConcernToAtlas(concern *akov2.ReadConcern) *admin.DataLakeAtlasStoreReadConcern { + if concern == nil { + return nil + } + + return &admin.DataLakeAtlasStoreReadConcern{ + Level: pointer.MakePtrOrNil(concern.Level), + } +} + +func dataProcessRegionFromAtlas(region *admin.DataLakeDataProcessRegion) *akov2.DataProcessRegion { + if region == nil { + return nil + } + return &akov2.DataProcessRegion{ + CloudProvider: region.GetCloudProvider(), + Region: region.GetRegion(), + } +} + +func dataProcessRegionToAtlas(region *akov2.DataProcessRegion) *admin.DataLakeDataProcessRegion { + if region == nil { + return nil + } + return &admin.DataLakeDataProcessRegion{ + CloudProvider: region.CloudProvider, + Region: region.Region, + } +} + +func cloudProviderConfigFromAtlas(config *admin.DataLakeCloudProviderConfig) *akov2.CloudProviderConfig { + if config == nil { + return nil + } + result := &akov2.CloudProviderConfig{} + if aws, ok := config.GetAwsOk(); ok { + result.AWS = &akov2.AWSProviderConfig{ + RoleID: aws.GetRoleId(), + TestS3Bucket: aws.GetTestS3Bucket(), + } + } + return result +} + +func cloudProviderConfigToAtlas(config *akov2.CloudProviderConfig) *admin.DataLakeCloudProviderConfig { + if config == nil || config.AWS == nil { + return nil + } + return &admin.DataLakeCloudProviderConfig{ + Aws: admin.DataLakeAWSCloudProviderConfig{ + RoleId: config.AWS.RoleID, + TestS3Bucket: config.AWS.TestS3Bucket, + }, + } +} diff --git a/internal/translation/datafederation/conversion_endpoints.go b/internal/translation/datafederation/conversion_endpoints.go new file mode 100644 index 0000000000..ffd26b681a --- /dev/null +++ b/internal/translation/datafederation/conversion_endpoints.go @@ -0,0 +1,66 @@ +package datafederation + +import ( + "fmt" + + "go.mongodb.org/atlas-sdk/v20231115008/admin" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/cmp" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" +) + +type DatafederationPrivateEndpointEntry struct { + *akov2.DataFederationPE + ProjectID string +} + +func NewDatafederationPrivateEndpointEntry(pe *akov2.DataFederationPE, projectID string) *DatafederationPrivateEndpointEntry { + if pe == nil { + return nil + } + return &DatafederationPrivateEndpointEntry{DataFederationPE: pe, ProjectID: projectID} +} + +func endpointsFromAtlas(endpoints []admin.PrivateNetworkEndpointIdEntry, projectID string) ([]*DatafederationPrivateEndpointEntry, error) { + result := make([]*DatafederationPrivateEndpointEntry, 0, len(endpoints)) + for _, entry := range endpoints { + result = append(result, endpointFromAtlas(&entry, projectID)) + } + if err := cmp.Normalize(result); err != nil { + return nil, fmt.Errorf("error normalizing data federation private endpoints: %w", err) + } + return result, nil +} + +func endpointFromAtlas(endpoint *admin.PrivateNetworkEndpointIdEntry, projectID string) *DatafederationPrivateEndpointEntry { + result := &DatafederationPrivateEndpointEntry{ + ProjectID: projectID, + } + if endpoint != nil { + result.DataFederationPE = &akov2.DataFederationPE{ + EndpointID: endpoint.GetEndpointId(), + Provider: endpoint.GetProvider(), + Type: endpoint.GetType(), + Comment: endpoint.GetComment(), + CustomerEndpointDNSName: endpoint.GetCustomerEndpointDNSName(), + Region: endpoint.GetRegion(), + } + } + return result +} + +func endpointToAtlas(ep *DatafederationPrivateEndpointEntry) *admin.PrivateNetworkEndpointIdEntry { + if ep == nil || ep.DataFederationPE == nil { + return nil + } + + return &admin.PrivateNetworkEndpointIdEntry{ + EndpointId: ep.EndpointID, + Provider: pointer.MakePtrOrNil(ep.Provider), + Type: pointer.MakePtrOrNil(ep.Type), + Comment: pointer.MakePtrOrNil(ep.Comment), + CustomerEndpointDNSName: pointer.MakePtrOrNil(ep.CustomerEndpointDNSName), + Region: pointer.MakePtrOrNil(ep.Region), + } +} diff --git a/internal/translation/datafederation/conversion_endpoints_test.go b/internal/translation/datafederation/conversion_endpoints_test.go new file mode 100644 index 0000000000..5d850b97e9 --- /dev/null +++ b/internal/translation/datafederation/conversion_endpoints_test.go @@ -0,0 +1,30 @@ +package datafederation + +import ( + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + fuzz "github.com/google/gofuzz" + "github.com/stretchr/testify/require" +) + +func TestRoundtrip_DataFederationPE(t *testing.T) { + f := fuzz.New() + + for i := 0; i < 100; i++ { + fuzzed := &DatafederationPrivateEndpointEntry{} + f.Fuzz(fuzzed) + // ignore non-Atlas fields + fuzzed.ProjectID = "" + + toAtlasResult := endpointToAtlas(fuzzed) + fromAtlasResult := endpointFromAtlas(toAtlasResult, "") + + equals := reflect.DeepEqual(fuzzed, fromAtlasResult) + if !equals { + t.Log(cmp.Diff(fuzzed, fromAtlasResult)) + } + require.True(t, equals) + } +} diff --git a/internal/translation/datafederation/conversion_test.go b/internal/translation/datafederation/conversion_test.go new file mode 100644 index 0000000000..c074824423 --- /dev/null +++ b/internal/translation/datafederation/conversion_test.go @@ -0,0 +1,30 @@ +package datafederation + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + fuzz "github.com/google/gofuzz" + "github.com/stretchr/testify/require" +) + +func TestRoundtrip_DataFederation(t *testing.T) { + f := fuzz.New() + + for i := 0; i < 100; i++ { + fuzzed := &DataFederation{} + f.Fuzz(fuzzed) + fuzzed, err := NewDataFederation(fuzzed.DataFederationSpec, fuzzed.ProjectID, fuzzed.Hostnames) + require.NoError(t, err) + + toAtlasResult := toAtlas(fuzzed) + fromAtlasResult, err := fromAtlas(toAtlasResult) + require.NoError(t, err) + + equals := fuzzed.SpecEqualsTo(fromAtlasResult) + if !equals { + t.Log(cmp.Diff(fuzzed, fromAtlasResult)) + } + require.True(t, equals) + } +} diff --git a/internal/translation/datafederation/datafederation.go b/internal/translation/datafederation/datafederation.go new file mode 100644 index 0000000000..7afb0923b1 --- /dev/null +++ b/internal/translation/datafederation/datafederation.go @@ -0,0 +1,86 @@ +package datafederation + +import ( + "context" + "errors" + "fmt" + "net/http" + + "go.mongodb.org/atlas-sdk/v20231115008/admin" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/types" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" +) + +var ( + ErrorNotFound = errors.New("data federation not found") +) + +type DataFederationService interface { + Get(ctx context.Context, projectID, name string) (*DataFederation, error) + Create(ctx context.Context, df *DataFederation) error + Update(ctx context.Context, df *DataFederation) error + Delete(ctx context.Context, projectID, name string) error +} + +type AtlasDataFederationService struct { + api admin.DataFederationApi +} + +func NewAtlasDataFederationService(ctx context.Context, provider atlas.Provider, secretRef *types.NamespacedName, log *zap.SugaredLogger) (*AtlasDataFederationService, error) { + client, err := translation.NewVersionedClient(ctx, provider, secretRef, log) + if err != nil { + return nil, fmt.Errorf("failed to create versioned client: %w", err) + } + return &AtlasDataFederationService{client.DataFederationApi}, nil +} + +func (dfs *AtlasDataFederationService) Get(ctx context.Context, projectID, name string) (*DataFederation, error) { + atlasDataFederation, resp, err := dfs.api.GetFederatedDatabase(ctx, projectID, name).Execute() + + if resp != nil && resp.StatusCode == http.StatusNotFound { + return nil, errors.Join(ErrorNotFound, err) + } + + if err != nil { + return nil, fmt.Errorf("failed to get data federation database %q: %w", name, err) + } + return fromAtlas(atlasDataFederation) +} + +func (dfs *AtlasDataFederationService) Create(ctx context.Context, df *DataFederation) error { + atlasDataFederation := toAtlas(df) + _, _, err := dfs.api. + CreateFederatedDatabase(ctx, df.ProjectID, atlasDataFederation). + SkipRoleValidation(df.SkipRoleValidation). + Execute() + if err != nil { + return fmt.Errorf("failed to create data federation database %q: %w", df.ProjectID, err) + } + return nil +} + +func (dfs *AtlasDataFederationService) Update(ctx context.Context, df *DataFederation) error { + atlasDataFederation := toAtlas(df) + _, _, err := dfs.api. + UpdateFederatedDatabase(ctx, df.ProjectID, df.Name, atlasDataFederation). + SkipRoleValidation(df.SkipRoleValidation). + Execute() + if err != nil { + return fmt.Errorf("failed to update data federation database %q: %w", df.ProjectID, err) + } + return nil +} + +func (dfs *AtlasDataFederationService) Delete(ctx context.Context, projectID, name string) error { + _, resp, err := dfs.api.DeleteFederatedDatabase(ctx, projectID, name).Execute() + if resp != nil && resp.StatusCode == http.StatusNotFound { + return errors.Join(ErrorNotFound, err) + } + if err != nil { + return fmt.Errorf("failed to delete data federation database %q: %w", projectID, err) + } + return nil +} diff --git a/internal/translation/datafederation/datafederation_test.go b/internal/translation/datafederation/datafederation_test.go new file mode 100644 index 0000000000..5ca2640de8 --- /dev/null +++ b/internal/translation/datafederation/datafederation_test.go @@ -0,0 +1,52 @@ +package datafederation + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" + + "go.mongodb.org/atlas-sdk/v20231115008/admin" + "go.uber.org/zap" + "sigs.k8s.io/controller-runtime/pkg/client" + + atlasmocks "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/mocks/atlas" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" +) + +func TestNewDatafederationService(t *testing.T) { + for _, tt := range []struct { + name string + provider atlas.Provider + wantErr string + }{ + { + name: "success", + provider: &atlasmocks.TestProvider{ + SdkClientFunc: func(_ *client.ObjectKey, _ *zap.SugaredLogger) (*admin.APIClient, string, error) { + return &admin.APIClient{}, "", nil + }, + }, + }, + { + name: "failure", + provider: &atlasmocks.TestProvider{ + SdkClientFunc: func(_ *client.ObjectKey, _ *zap.SugaredLogger) (*admin.APIClient, string, error) { + return nil, "", errors.New("fake error") + }, + }, + wantErr: "failed to create versioned client: failed to instantiate Versioned Atlas client: fake error", + }, + } { + t.Run(tt.name, func(t *testing.T) { + _, err := NewAtlasDataFederationService(context.Background(), tt.provider, &types.NamespacedName{}, zap.S()) + gotErr := "" + if err != nil { + gotErr = err.Error() + } + require.Equal(t, tt.wantErr, gotErr) + }) + } +} diff --git a/internal/translation/datafederation/datafederationendpoints.go b/internal/translation/datafederation/datafederationendpoints.go new file mode 100644 index 0000000000..9e950ca802 --- /dev/null +++ b/internal/translation/datafederation/datafederationendpoints.go @@ -0,0 +1,57 @@ +package datafederation + +import ( + "context" + "fmt" + + "go.mongodb.org/atlas-sdk/v20231115008/admin" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/types" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" +) + +type DatafederationPrivateEndpointService interface { + List(ctx context.Context, projectID string) ([]*DatafederationPrivateEndpointEntry, error) + Create(context.Context, *DatafederationPrivateEndpointEntry) error + Delete(context.Context, *DatafederationPrivateEndpointEntry) error +} + +type DatafederationPrivateEndpoints struct { + api admin.DataFederationApi +} + +func NewDatafederationPrivateEndpointService(ctx context.Context, provider atlas.Provider, secretRef *types.NamespacedName, log *zap.SugaredLogger) (*DatafederationPrivateEndpoints, error) { + client, err := translation.NewVersionedClient(ctx, provider, secretRef, log) + if err != nil { + return nil, fmt.Errorf("failed to create versioned client: %w", err) + } + return &DatafederationPrivateEndpoints{client.DataFederationApi}, nil +} + +func (d *DatafederationPrivateEndpoints) List(ctx context.Context, projectID string) ([]*DatafederationPrivateEndpointEntry, error) { + paginatedResponse, _, err := d.api.ListDataFederationPrivateEndpoints(ctx, projectID).Execute() + if err != nil { + return nil, fmt.Errorf("failed to list data federation private endpoints from Atlas: %w", err) + } + + return endpointsFromAtlas(paginatedResponse.GetResults(), projectID) +} + +func (d *DatafederationPrivateEndpoints) Create(ctx context.Context, aep *DatafederationPrivateEndpointEntry) error { + ep := endpointToAtlas(aep) + _, _, err := d.api.CreateDataFederationPrivateEndpoint(ctx, aep.ProjectID, ep).Execute() + if err != nil { + return fmt.Errorf("failed to create data federation private endpoint: %w", err) + } + return nil +} + +func (d *DatafederationPrivateEndpoints) Delete(ctx context.Context, aep *DatafederationPrivateEndpointEntry) error { + _, _, err := d.api.DeleteDataFederationPrivateEndpoint(ctx, aep.ProjectID, aep.EndpointID).Execute() + if err != nil { + return fmt.Errorf("failed to delete data federation private endpoint: %w", err) + } + return nil +} diff --git a/licenses.csv b/licenses.csv index 5eb6d499bd..ac3f233405 100644 --- a/licenses.csv +++ b/licenses.csv @@ -113,7 +113,7 @@ golang.org/x/exp,https://cs.opensource.google/go/x/exp/+/8a7402ab:LICENSE,BSD-3- golang.org/x/net,https://cs.opensource.google/go/x/net/+/v0.30.0:LICENSE,BSD-3-Clause golang.org/x/oauth2,https://cs.opensource.google/go/x/oauth2/+/v0.23.0:LICENSE,BSD-3-Clause golang.org/x/sync,https://cs.opensource.google/go/x/sync/+/v0.8.0:LICENSE,BSD-3-Clause -golang.org/x/sys,https://cs.opensource.google/go/x/sys/+/v0.26.0:LICENSE,BSD-3-Clause +golang.org/x/sys/unix,https://cs.opensource.google/go/x/sys/+/v0.26.0:LICENSE,BSD-3-Clause golang.org/x/term,https://cs.opensource.google/go/x/term/+/v0.25.0:LICENSE,BSD-3-Clause golang.org/x/text,https://cs.opensource.google/go/x/text/+/v0.19.0:LICENSE,BSD-3-Clause golang.org/x/time/rate,https://cs.opensource.google/go/x/time/+/v0.7.0:LICENSE,BSD-3-Clause diff --git a/pkg/api/v1/atlasdatafederation_types.go b/pkg/api/v1/atlasdatafederation_types.go index c365f5da39..229c018f01 100644 --- a/pkg/api/v1/atlasdatafederation_types.go +++ b/pkg/api/v1/atlasdatafederation_types.go @@ -17,12 +17,9 @@ limitations under the License. package v1 import ( - "go.mongodb.org/atlas/mongodbatlas" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/compat" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/kube" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" @@ -49,6 +46,9 @@ type DataFederationSpec struct { // +optional PrivateEndpoints []DataFederationPE `json:"privateEndpoints,omitempty"` + + // +optional + SkipRoleValidation bool `json:"skipRoleValidation,omitempty"` } type CloudProviderConfig struct { @@ -96,30 +96,57 @@ type DataSource struct { CollectionRegex string `json:"collectionRegex,omitempty"` Database string `json:"database,omitempty"` DatabaseRegex string `json:"databaseRegex,omitempty"` + DatasetName string `json:"datasetName,omitempty"` + DatasetPrefix string `json:"datasetPrefix,omitempty"` // +kubebuilder:validation:Enum:=.avro;.avro.bz2;.avro.gz;.bson;.bson.bz2;.bson.gz;.bsonx;.csv;.csv.bz2;.csv.gz;.json;.json.bz2;.json.gz;.orc;.parquet;.tsv;.tsv.bz2;.tsv.gz DefaultFormat string `json:"defaultFormat,omitempty"` Path string `json:"path,omitempty"` ProvenanceFieldName string `json:"provenanceFieldName,omitempty"` StoreName string `json:"storeName,omitempty"` + TrimLevel int `json:"trimLevel,omitempty"` Urls []string `json:"urls,omitempty"` } type Store struct { - Name string `json:"name,omitempty"` - Provider string `json:"provider,omitempty"` - AdditionalStorageClasses []string `json:"additionalStorageClasses,omitempty"` - Bucket string `json:"bucket,omitempty"` - Delimiter string `json:"delimiter,omitempty"` - IncludeTags bool `json:"includeTags,omitempty"` - Prefix string `json:"prefix,omitempty"` - Public bool `json:"public,omitempty"` - Region string `json:"region,omitempty"` + Name string `json:"name,omitempty"` + Provider string `json:"provider,omitempty"` + AdditionalStorageClasses []string `json:"additionalStorageClasses,omitempty"` + Bucket string `json:"bucket,omitempty"` + Delimiter string `json:"delimiter,omitempty"` + IncludeTags bool `json:"includeTags,omitempty"` + Prefix string `json:"prefix,omitempty"` + Public bool `json:"public,omitempty"` + Region string `json:"region,omitempty"` + ClusterName string `json:"clusterName,omitempty"` + AllowInsecure bool `json:"allowInsecure,omitempty"` + DefaultFormat string `json:"defaultFormat,omitempty"` + ReadConcern *ReadConcern `json:"readConcern,omitempty"` + ReadPreference *ReadPreference `json:"readPreference,omitempty"` + Urls []string `json:"urls,omitempty"` +} + +type ReadPreference struct { + MaxStalenessSeconds int `json:"maxStalenessSeconds,omitempty"` + Mode string `json:"mode,omitempty"` + TagSets [][]ReadPreferenceTag `json:"tagSets,omitempty"` +} + +type ReadPreferenceTag struct { + Name string `json:"name,omitempty"` + Value string `json:"value,omitempty"` +} + +type ReadConcern struct { + Level string `json:"level,omitempty"` } type DataFederationPE struct { - EndpointID string `json:"endpointId,omitempty"` - Provider string `json:"provider,omitempty"` - Type string `json:"type,omitempty"` + EndpointID string `json:"endpointId,omitempty"` + Provider string `json:"provider,omitempty"` + Type string `json:"type,omitempty"` + Comment string `json:"comment,omitempty"` + CustomerEndpointDNSName string `json:"customerEndpointDNSName,omitempty"` + Region string `json:"region,omitempty"` } func (pe DataFederationPE) Identifier() interface{} { @@ -177,12 +204,6 @@ func (c *AtlasDataFederation) UpdateStatus(conditions []api.Condition, options . } } -func (c *AtlasDataFederation) ToAtlas() (*mongodbatlas.DataFederationInstance, error) { - result := &mongodbatlas.DataFederationInstance{} - err := compat.JSONCopy(result, c.Spec) - return result, err -} - func NewDataFederationInstance(projectName, instanceName, namespace string) *AtlasDataFederation { return &AtlasDataFederation{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/api/v1/atlasdatafederation_types_test.go b/pkg/api/v1/atlasdatafederation_types_test.go deleted file mode 100644 index fc644201c6..0000000000 --- a/pkg/api/v1/atlasdatafederation_types_test.go +++ /dev/null @@ -1,180 +0,0 @@ -package v1 - -import ( - "encoding/json" - "fmt" - "reflect" - "testing" - - "github.com/stretchr/testify/assert" - "go.mongodb.org/atlas/mongodbatlas" - - "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" -) - -func TestAtlasDataFederation_ToAtlas(t *testing.T) { - type fields struct { - Spec DataFederationSpec - } - tests := []struct { - name string - fields fields - want *mongodbatlas.DataFederationInstance - wantErr assert.ErrorAssertionFunc - }{ - { - name: "Should convert all fields", - fields: struct{ Spec DataFederationSpec }{Spec: DataFederationSpec{ - Project: common.ResourceRefNamespaced{ - Name: "testName", - Namespace: "testNamespace", - }, - Name: "testName", - CloudProviderConfig: &CloudProviderConfig{AWS: &AWSProviderConfig{ - RoleID: "testRoleID", - TestS3Bucket: "testS3Bucket", - }}, - DataProcessRegion: &DataProcessRegion{ - CloudProvider: "AWS", - Region: "SYDNEY_AUS", - }, - Storage: &Storage{ - Databases: []Database{ - { - Collections: []Collection{ - { - DataSources: []DataSource{ - { - AllowInsecure: true, - Collection: "test-collection-1", - CollectionRegex: "test-collection-regex", - Database: "test-db-1", - DatabaseRegex: "test-db-regex", - DefaultFormat: "test-format", - Path: "test-path", - ProvenanceFieldName: "test-field-name", - StoreName: "http-test", - Urls: []string{"https://data.cityofnewyork.us/api/views/vfnx-vebw/rows.csv"}, - }, - }, - Name: "test-collection-1", - }, - }, - MaxWildcardCollections: 0, - Name: "test-db-1", - Views: []View{ - { - Name: "test-view-1", - Pipeline: "test-pipeline-1", - Source: "test-store-source", - }, - }, - }, - }, - Stores: []Store{ - { - Name: "http-test", - Provider: "http", - AdditionalStorageClasses: []string{"test-storage-class"}, - Bucket: "test-bucket", - Delimiter: ",", - IncludeTags: true, - Prefix: "test-prefix", - Public: true, - Region: "SYDNEY_AUS", - }, - }, - }, - PrivateEndpoints: []DataFederationPE{ - { - EndpointID: "test-id", - Provider: "AWS", - Type: "DATA_LAKE", - }, - }, - }}, - want: &mongodbatlas.DataFederationInstance{ - CloudProviderConfig: &mongodbatlas.CloudProviderConfig{AWSConfig: mongodbatlas.AwsCloudProviderConfig{ - ExternalID: "", - IAMAssumedRoleARN: "", - IAMUserARN: "", - RoleID: "testRoleID", - TestS3Bucket: "testS3Bucket", - }}, - DataProcessRegion: &mongodbatlas.DataProcessRegion{ - CloudProvider: "AWS", - Region: "SYDNEY_AUS", - }, - Storage: &mongodbatlas.DataFederationStorage{ - Databases: []*mongodbatlas.DataFederationDatabase{ - { - Collections: []*mongodbatlas.DataFederationCollection{ - { - DataSources: []*mongodbatlas.DataFederationDataSource{ - { - AllowInsecure: pointer.MakePtr(true), - Collection: "test-collection-1", - CollectionRegex: "test-collection-regex", - Database: "test-db-1", - DatabaseRegex: "test-db-regex", - DefaultFormat: "test-format", - Path: "test-path", - ProvenanceFieldName: "test-field-name", - StoreName: "http-test", - Urls: []*string{pointer.MakePtr[string]("https://data.cityofnewyork.us/api/views/vfnx-vebw/rows.csv")}, - }, - }, - Name: "test-collection-1", - }, - }, - MaxWildcardCollections: 0, - Name: "test-db-1", - Views: []*mongodbatlas.DataFederationDatabaseView{ - { - Name: "test-view-1", - Pipeline: "test-pipeline-1", - Source: "test-store-source", - }, - }, - }, - }, - Stores: []*mongodbatlas.DataFederationStore{ - { - Name: "http-test", - Provider: "http", - AdditionalStorageClasses: []*string{pointer.MakePtr[string]("test-storage-class")}, - Bucket: "test-bucket", - Delimiter: ",", - IncludeTags: pointer.MakePtr(true), - Prefix: "test-prefix", - Region: "SYDNEY_AUS", - Public: pointer.MakePtr(true), - }, - }, - }, - Name: "testName", - }, - wantErr: assert.NoError, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &AtlasDataFederation{ - Spec: tt.fields.Spec, - } - got, err := c.ToAtlas() - if !tt.wantErr(t, err, "ToAtlas()") { - return - } - if !reflect.DeepEqual(got, tt.want) { - g, _ := json.MarshalIndent(got, "", " ") - w, _ := json.MarshalIndent(tt.want, "", " ") - fmt.Println("GOT", string(g)) - fmt.Println("WANT", string(w)) - } - - assert.Equalf(t, tt.want, got, "ToAtlas()") - }) - } -} diff --git a/pkg/api/v1/zz_generated.deepcopy.go b/pkg/api/v1/zz_generated.deepcopy.go index 657519c604..92a03c181e 100644 --- a/pkg/api/v1/zz_generated.deepcopy.go +++ b/pkg/api/v1/zz_generated.deepcopy.go @@ -2353,6 +2353,62 @@ func (in *ProjectSettings) DeepCopy() *ProjectSettings { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReadConcern) DeepCopyInto(out *ReadConcern) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReadConcern. +func (in *ReadConcern) DeepCopy() *ReadConcern { + if in == nil { + return nil + } + out := new(ReadConcern) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReadPreference) DeepCopyInto(out *ReadPreference) { + *out = *in + if in.TagSets != nil { + in, out := &in.TagSets, &out.TagSets + *out = make([][]ReadPreferenceTag, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = make([]ReadPreferenceTag, len(*in)) + copy(*out, *in) + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReadPreference. +func (in *ReadPreference) DeepCopy() *ReadPreference { + if in == nil { + return nil + } + out := new(ReadPreference) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReadPreferenceTag) DeepCopyInto(out *ReadPreferenceTag) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReadPreferenceTag. +func (in *ReadPreferenceTag) DeepCopy() *ReadPreferenceTag { + if in == nil { + return nil + } + out := new(ReadPreferenceTag) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Resource) DeepCopyInto(out *Resource) { *out = *in @@ -2707,6 +2763,21 @@ func (in *Store) DeepCopyInto(out *Store) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.ReadConcern != nil { + in, out := &in.ReadConcern, &out.ReadConcern + *out = new(ReadConcern) + **out = **in + } + if in.ReadPreference != nil { + in, out := &in.ReadPreference, &out.ReadPreference + *out = new(ReadPreference) + (*in).DeepCopyInto(*out) + } + if in.Urls != nil { + in, out := &in.Urls, &out.Urls + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Store. diff --git a/pkg/controller/atlasdatafederation/connectionsecrets.go b/pkg/controller/atlasdatafederation/connectionsecrets.go index 50b2aaf6e3..993c1eab17 100644 --- a/pkg/controller/atlasdatafederation/connectionsecrets.go +++ b/pkg/controller/atlasdatafederation/connectionsecrets.go @@ -8,20 +8,21 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/stringutil" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/datafederation" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/connectionsecret" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" ) -func (r *AtlasDataFederationReconciler) ensureConnectionSecrets(ctx *workflow.Context, project *akov2.AtlasProject, df *akov2.AtlasDataFederation) workflow.Result { +func (r *AtlasDataFederationReconciler) ensureConnectionSecrets(ctx *workflow.Context, federationService datafederation.DataFederationService, project *akov2.AtlasProject, df *akov2.AtlasDataFederation) workflow.Result { databaseUsers := akov2.AtlasDatabaseUserList{} err := r.Client.List(ctx.Context, &databaseUsers, &client.ListOptions{}) if err != nil { return workflow.Terminate(workflow.Internal, err.Error()) } - atlasDF, _, err := ctx.Client.DataFederation.Get(ctx.Context, project.ID(), df.Spec.Name) + atlasDF, err := federationService.Get(ctx.Context, project.ID(), df.Spec.Name) if err != nil { return workflow.Terminate(workflow.Internal, err.Error()) } diff --git a/pkg/controller/atlasdatafederation/datafederation.go b/pkg/controller/atlasdatafederation/datafederation.go index 5b4b4fd26d..5f9652c6e2 100644 --- a/pkg/controller/atlasdatafederation/datafederation.go +++ b/pkg/controller/atlasdatafederation/datafederation.go @@ -1,42 +1,29 @@ package atlasdatafederation import ( - "net/http" + "errors" - "go.mongodb.org/atlas/mongodbatlas" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "go.uber.org/zap" - - "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/compat" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/datafederation" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" ) -func (r *AtlasDataFederationReconciler) ensureDataFederation(ctx *workflow.Context, project *akov2.AtlasProject, dataFederation *akov2.AtlasDataFederation) workflow.Result { - log := ctx.Log - +func (r *AtlasDataFederationReconciler) ensureDataFederation(ctx *workflow.Context, project *akov2.AtlasProject, dataFederation *akov2.AtlasDataFederation, federationService datafederation.DataFederationService) workflow.Result { projectID := project.ID() operatorSpec := &dataFederation.Spec - dataFederationToAtlas, err := dataFederation.ToAtlas() + akoDataFederation, err := datafederation.NewDataFederation(&dataFederation.Spec, projectID, nil) if err != nil { - return workflow.Terminate(workflow.Internal, "can not convert DataFederation (operator -> atlas)") + return workflow.Terminate(workflow.Internal, err.Error()) } - atlasSpec, resp, err := ctx.Client.DataFederation.Get(ctx.Context, projectID, operatorSpec.Name) + atlasDataFederation, err := federationService.Get(ctx.Context, projectID, operatorSpec.Name) if err != nil { - if resp == nil { + if !errors.Is(err, datafederation.ErrorNotFound) { return workflow.Terminate(workflow.Internal, err.Error()) } - if resp.StatusCode != http.StatusNotFound { - return workflow.Terminate(workflow.DataFederationNotCreatedInAtlas, err.Error()) - } - - _, _, err = ctx.Client.DataFederation.Create(ctx.Context, projectID, dataFederationToAtlas) + err = federationService.Create(ctx.Context, akoDataFederation) if err != nil { return workflow.Terminate(workflow.DataFederationNotCreatedInAtlas, err.Error()) } @@ -44,57 +31,14 @@ func (r *AtlasDataFederationReconciler) ensureDataFederation(ctx *workflow.Conte return workflow.InProgress(workflow.DataFederationCreating, "Data Federation is being created") } - dfFromAtlas, err := DataFederationFromAtlas(atlasSpec) - if err != nil { - return workflow.Terminate(workflow.Internal, "can not convert DataFederation (atlas -> operator)") - } - - if areEqual, _ := dataFederationEqual(*dfFromAtlas, *operatorSpec, log); areEqual { + if akoDataFederation.SpecEqualsTo(atlasDataFederation) { return workflow.OK() } - _, _, err = ctx.Client.DataFederation.Update(ctx.Context, projectID, dataFederation.Spec.Name, dataFederationToAtlas, nil) + err = federationService.Update(ctx.Context, akoDataFederation) if err != nil { return workflow.Terminate(workflow.DataFederationNotUpdatedInAtlas, err.Error()) } return workflow.InProgress(workflow.DataFederationUpdating, "Data Federation is being updated") } - -func DataFederationFromAtlas(atlasDF *mongodbatlas.DataFederationInstance) (*akov2.DataFederationSpec, error) { - dfSpec := &akov2.DataFederationSpec{} - err := compat.JSONCopy(dfSpec, atlasDF) - return dfSpec, err -} - -func dataFederationEqual(atlasSpec, operatorSpec akov2.DataFederationSpec, log *zap.SugaredLogger) (areEqual bool, diff string) { - mergedSpec, err := getMergedSpec(atlasSpec, operatorSpec) - if err != nil { - log.Errorf("failed to merge Data Federation specs: %s", err.Error()) - return false, "" - } - - d := cmp.Diff(atlasSpec, mergedSpec, cmpopts.EquateEmpty()) - if d != "" { - log.Debugf("Data Federation diff: \n%s", d) - } - - return d == "", d -} - -func getMergedSpec(atlasSpec, operatorSpec akov2.DataFederationSpec) (akov2.DataFederationSpec, error) { - mergedSpec := akov2.DataFederationSpec{} - - operatorSpec.PrivateEndpoints = []akov2.DataFederationPE{} - - if err := compat.JSONCopy(&mergedSpec, atlasSpec); err != nil { - return mergedSpec, err - } - if err := compat.JSONCopy(&mergedSpec, operatorSpec); err != nil { - return mergedSpec, err - } - - mergedSpec.Project = common.ResourceRefNamespaced{} - - return mergedSpec, nil -} diff --git a/pkg/controller/atlasdatafederation/datafederation_controller.go b/pkg/controller/atlasdatafederation/datafederation_controller.go index 2906162cda..4833f2eca3 100644 --- a/pkg/controller/atlasdatafederation/datafederation_controller.go +++ b/pkg/controller/atlasdatafederation/datafederation_controller.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" - "go.mongodb.org/atlas/mongodbatlas" "go.uber.org/zap" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -20,6 +19,7 @@ import ( "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/kube" "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/datafederation" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" @@ -93,26 +93,31 @@ func (r *AtlasDataFederationReconciler) Reconcile(context context.Context, req c return result.ReconcileResult(), nil } - atlasClient, orgID, err := r.AtlasProvider.Client(ctx.Context, project.ConnectionSecretObjectKey(), log) + endpointService, err := datafederation.NewDatafederationPrivateEndpointService(ctx.Context, r.AtlasProvider, project.ConnectionSecretObjectKey(), log) if err != nil { - result := workflow.Terminate(workflow.AtlasAPIAccessNotConfigured, err.Error()) - ctx.SetConditionFromResult(api.DataFederationReadyType, result) + result = workflow.Terminate(workflow.AtlasAPIAccessNotConfigured, err.Error()) + ctx.SetConditionFromResult(api.DatabaseUserReadyType, result) + return result.ReconcileResult(), nil + } + + dataFederationService, err := datafederation.NewAtlasDataFederationService(ctx.Context, r.AtlasProvider, project.ConnectionSecretObjectKey(), log) + if err != nil { + result = workflow.Terminate(workflow.AtlasAPIAccessNotConfigured, err.Error()) + ctx.SetConditionFromResult(api.DatabaseUserReadyType, result) return result.ReconcileResult(), nil } - ctx.OrgID = orgID - ctx.Client = atlasClient - if result = r.ensureDataFederation(ctx, project, dataFederation); !result.IsOk() { + if result = r.ensureDataFederation(ctx, project, dataFederation, dataFederationService); !result.IsOk() { ctx.SetConditionFromResult(api.DataFederationReadyType, result) return result.ReconcileResult(), nil } - if result = r.ensurePrivateEndpoints(ctx, project, dataFederation); !result.IsOk() { + if result = r.ensurePrivateEndpoints(ctx, project, dataFederation, endpointService); !result.IsOk() { ctx.SetConditionFromResult(api.DataFederationPEReadyType, result) return result.ReconcileResult(), nil } - if result = r.ensureConnectionSecrets(ctx, project, dataFederation); !result.IsOk() { + if result = r.ensureConnectionSecrets(ctx, dataFederationService, project, dataFederation); !result.IsOk() { return result.ReconcileResult(), nil } @@ -133,7 +138,7 @@ func (r *AtlasDataFederationReconciler) Reconcile(context context.Context, req c } if !dataFederation.GetDeletionTimestamp().IsZero() { - return r.handleDelete(ctx, log, dataFederation, project, atlasClient).ReconcileResult(), nil + return r.handleDelete(ctx, log, dataFederation, project, dataFederationService).ReconcileResult(), nil } err = customresource.ApplyLastConfigApplied(context, project, r.Client) @@ -149,7 +154,7 @@ func (r *AtlasDataFederationReconciler) Reconcile(context context.Context, req c return workflow.OK().ReconcileResult(), nil } -func (r *AtlasDataFederationReconciler) handleDelete(ctx *workflow.Context, log *zap.SugaredLogger, dataFederation *akov2.AtlasDataFederation, project *akov2.AtlasProject, atlasClient *mongodbatlas.Client) workflow.Result { +func (r *AtlasDataFederationReconciler) handleDelete(ctx *workflow.Context, log *zap.SugaredLogger, dataFederation *akov2.AtlasDataFederation, project *akov2.AtlasProject, service datafederation.DataFederationService) workflow.Result { if customresource.HaveFinalizer(dataFederation, customresource.FinalizerLabel) { if customresource.IsResourcePolicyKeepOrDefault(dataFederation, r.ObjectDeletionProtection) { log.Info("Not removing AtlasDataFederation from Atlas as per configuration") @@ -160,7 +165,7 @@ func (r *AtlasDataFederationReconciler) handleDelete(ctx *workflow.Context, log ctx.SetConditionFromResult(api.DataFederationReadyType, result) return result } - if err := r.deleteDataFederationFromAtlas(ctx.Context, atlasClient, dataFederation, project, log); err != nil { + if err := r.deleteDataFederationFromAtlas(ctx.Context, service, dataFederation, project, log); err != nil { log.Errorf("failed to remove DataFederation from Atlas: %s", err) result := workflow.Terminate(workflow.Internal, err.Error()) ctx.SetConditionFromResult(api.DataFederationReadyType, result) @@ -177,13 +182,12 @@ func (r *AtlasDataFederationReconciler) handleDelete(ctx *workflow.Context, log return workflow.OK() } -func (r *AtlasDataFederationReconciler) deleteDataFederationFromAtlas(ctx context.Context, client *mongodbatlas.Client, df *akov2.AtlasDataFederation, project *akov2.AtlasProject, log *zap.SugaredLogger) error { +func (r *AtlasDataFederationReconciler) deleteDataFederationFromAtlas(ctx context.Context, service datafederation.DataFederationService, df *akov2.AtlasDataFederation, project *akov2.AtlasProject, log *zap.SugaredLogger) error { log.Infof("Deleting DataFederation instance: %s from Atlas", df.Spec.Name) - _, err := client.DataFederation.Delete(ctx, project.ID(), df.Spec.Name) + err := service.Delete(ctx, project.ID(), df.Spec.Name) - var apiError *mongodbatlas.ErrorResponse - if errors.As(err, &apiError) && apiError.Error() == "DATA_LAKE_TENANT_NOT_FOUND_FOR_NAME" { + if errors.Is(err, datafederation.ErrorNotFound) { log.Info("DataFederation doesn't exist or is already deleted") return nil } diff --git a/pkg/controller/atlasdatafederation/datafederation_controller_test.go b/pkg/controller/atlasdatafederation/datafederation_controller_test.go index a0ac520422..9d90e24c54 100644 --- a/pkg/controller/atlasdatafederation/datafederation_controller_test.go +++ b/pkg/controller/atlasdatafederation/datafederation_controller_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - "go.mongodb.org/atlas/mongodbatlas" + "github.com/stretchr/testify/mock" "go.uber.org/zap/zaptest" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -14,6 +14,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/mocks/translation" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/datafederation" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/status" @@ -22,17 +24,10 @@ import ( "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" ) -type DataFederationMock struct { - mongodbatlas.DataFederationService -} - -func (m *DataFederationMock) Delete(context.Context, string, string) (*mongodbatlas.Response, error) { - return nil, nil -} - func TestDeleteConnectionSecrets(t *testing.T) { for _, tc := range []struct { name string + service func(serviceMock *translation.DataFederationServiceMock) datafederation.DataFederationService atlasProject *akov2.AtlasProject dataFederation *akov2.AtlasDataFederation connectionSecrets []*corev1.Secret @@ -71,6 +66,10 @@ func TestDeleteConnectionSecrets(t *testing.T) { }, { name: "federation object without secrets", + service: func(serviceMock *translation.DataFederationServiceMock) datafederation.DataFederationService { + serviceMock.EXPECT().Delete(context.Background(), mock.Anything, mock.Anything).Return(nil) + return serviceMock + }, atlasProject: &akov2.AtlasProject{ ObjectMeta: metav1.ObjectMeta{Name: "fooProject", Namespace: "bar"}, }, @@ -92,6 +91,10 @@ func TestDeleteConnectionSecrets(t *testing.T) { }, { name: "federation object without secrets", + service: func(serviceMock *translation.DataFederationServiceMock) datafederation.DataFederationService { + serviceMock.EXPECT().Delete(context.Background(), mock.Anything, mock.Anything).Return(nil) + return serviceMock + }, atlasProject: &akov2.AtlasProject{ ObjectMeta: metav1.ObjectMeta{Name: "fooProject", Namespace: "bar"}, }, @@ -115,6 +118,10 @@ func TestDeleteConnectionSecrets(t *testing.T) { }, { name: "federation object with secrets", + service: func(serviceMock *translation.DataFederationServiceMock) datafederation.DataFederationService { + serviceMock.EXPECT().Delete(context.Background(), mock.Anything, mock.Anything).Return(nil) + return serviceMock + }, atlasProject: &akov2.AtlasProject{ ObjectMeta: metav1.ObjectMeta{Name: "fooProject", Namespace: "bar"}, Status: status.AtlasProjectStatus{ID: "123"}, @@ -201,15 +208,17 @@ func TestDeleteConnectionSecrets(t *testing.T) { WithObjects(objects...). Build() project := &akov2.AtlasProject{} - atlasClient := &mongodbatlas.Client{ - DataFederation: &DataFederationMock{}, - } r := &AtlasDataFederationReconciler{ Client: fakeClient, Log: logger, } - gotResult := r.handleDelete(ctx, logger, tc.dataFederation, project, atlasClient) + + var svc datafederation.DataFederationService + if tc.service != nil { + svc = tc.service(translation.NewDataFederationServiceMock(t)) + } + gotResult := r.handleDelete(ctx, logger, tc.dataFederation, project, svc) assert.Equal(t, tc.wantResult, gotResult) gotDataFederation := &akov2.AtlasDataFederation{} diff --git a/pkg/controller/atlasdatafederation/private_endpoint.go b/pkg/controller/atlasdatafederation/private_endpoint.go index d7f27f43db..d616f2e34c 100644 --- a/pkg/controller/atlasdatafederation/private_endpoint.go +++ b/pkg/controller/atlasdatafederation/private_endpoint.go @@ -3,24 +3,28 @@ package atlasdatafederation import ( "context" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/datafederation" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/set" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" ) -func (r *AtlasDataFederationReconciler) ensurePrivateEndpoints(ctx *workflow.Context, project *akov2.AtlasProject, dataFederation *akov2.AtlasDataFederation) workflow.Result { - clientDF := NewClient(ctx.Client) - +func (r *AtlasDataFederationReconciler) ensurePrivateEndpoints(ctx *workflow.Context, project *akov2.AtlasProject, dataFederation *akov2.AtlasDataFederation, service datafederation.DatafederationPrivateEndpointService) workflow.Result { projectID := project.ID() - specPEs := dataFederation.Spec.PrivateEndpoints + specPEs := make([]*datafederation.DatafederationPrivateEndpointEntry, 0, len(dataFederation.Spec.PrivateEndpoints)) + for _, pe := range dataFederation.Spec.PrivateEndpoints { + specPEs = append(specPEs, datafederation.NewDatafederationPrivateEndpointEntry(&pe, projectID)) + } - atlasPEs, err := getAllDataFederationPEs(ctx.Context, clientDF, projectID) + //NewDatafederationPrivateEndpointEntry + atlasPEs, err := getAllDataFederationPEs(ctx.Context, service, projectID) if err != nil { ctx.Log.Debugw("getAllDataFederationPEs error", "err", err.Error()) } - result := syncPrivateEndpointsWithAtlas(ctx, clientDF, projectID, specPEs, atlasPEs) + result := syncPrivateEndpointsWithAtlas(ctx, service, projectID, specPEs, atlasPEs) if !result.IsOk() { ctx.SetConditionFromResult(api.DataFederationPEReadyType, result) return result @@ -29,12 +33,12 @@ func (r *AtlasDataFederationReconciler) ensurePrivateEndpoints(ctx *workflow.Con return workflow.OK() } -func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, clientDF *DataFederationServiceOp, projectID string, specPEs, atlasPEs []akov2.DataFederationPE) workflow.Result { +func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, service datafederation.DatafederationPrivateEndpointService, projectID string, specPEs, atlasPEs []*datafederation.DatafederationPrivateEndpointEntry) workflow.Result { endpointsToCreate := set.Difference(specPEs, atlasPEs) ctx.Log.Debugw("Data Federation PEs to Create", "endpoints", endpointsToCreate) for _, e := range endpointsToCreate { - endpoint := e.(akov2.DataFederationPE) - if _, _, err := clientDF.CreateOnePrivateEndpoint(ctx.Context, projectID, endpoint); err != nil { + endpoint := e.(*datafederation.DatafederationPrivateEndpointEntry) + if err := service.Create(ctx.Context, endpoint); err != nil { return workflow.Terminate(workflow.Internal, err.Error()) } } @@ -42,8 +46,8 @@ func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, clientDF *DataFederati endpointsToDelete := set.Difference(atlasPEs, specPEs) ctx.Log.Debugw("Data Federation PEs to Delete", "endpoints", endpointsToDelete) for _, item := range endpointsToDelete { - endpoint := item.(akov2.DataFederationPE) - if _, _, err := clientDF.DeleteOnePrivateEndpoint(ctx.Context, projectID, endpoint.EndpointID); err != nil { + endpoint := item.(*datafederation.DatafederationPrivateEndpointEntry) + if err := service.Delete(ctx.Context, endpoint); err != nil { return workflow.Terminate(workflow.Internal, err.Error()) } } @@ -51,10 +55,10 @@ func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, clientDF *DataFederati return workflow.OK() } -func getAllDataFederationPEs(ctx context.Context, client *DataFederationServiceOp, projectID string) (endpoints []akov2.DataFederationPE, err error) { - endpoints, _, err = client.GetAllPrivateEndpoints(ctx, projectID) +func getAllDataFederationPEs(ctx context.Context, service datafederation.DatafederationPrivateEndpointService, projectID string) (endpoints []*datafederation.DatafederationPrivateEndpointEntry, err error) { + endpoints, err = service.List(ctx, projectID) if endpoints == nil { - endpoints = make([]akov2.DataFederationPE, 0) + endpoints = make([]*datafederation.DatafederationPrivateEndpointEntry, 0) } return }