Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Supports PRIVATE_LINK networking type in mongodbatlas_stream_connection resource and data sources #2940

Open
wants to merge 12 commits into
base: CLOUDP-288973-stream-privatelink
Choose a base branch
from
3 changes: 3 additions & 0 deletions .changelog/2940.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/mongodbatlas_stream_connection: Supports Privatelink networking access type for Kafka Stream Connections
```
4 changes: 2 additions & 2 deletions docs/data-sources/stream_connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ If `type` is of value `Kafka` the following additional attributes are defined:
* `access` - Information about the networking access. See [access](#access).

### Access
* `name` - Id of the vpc peer when the type is `VPC`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attribute exists in the API and SDK but is not a used attribute. It was created some time ago but it won't be used going forward https://jira.mongodb.org/browse/CLOUDP-294716 . When we first implemented the networking attribute for VPC peering, this attribute existed but was later removed. We removed it from the code but remained in the docs

* `type` - Selected networking type. Either `PUBLIC` or `VPC`. Defaults to `PUBLIC`.
* `type` - Selected networking type. Either `PUBLIC`, `VPC` or `PRIVATE_LINK`. Defaults to `PUBLIC`.
* `connection_id` - Id of the Private Link connection when type is `PRIVATE_LINK`.

To learn more, see: [MongoDB Atlas API - Stream Connection](https://www.mongodb.com/docs/atlas/reference/api-resources-spec/#tag/Streams/operation/getStreamConnection) Documentation.
The [Terraform Provider Examples Section](https://github.com/mongodb/terraform-provider-mongodbatlas/blob/master/examples/mongodbatlas_stream_instance/atlas-streams-user-journey.md) also contains details on the overall support for Atlas Streams Processing in Terraform.
4 changes: 2 additions & 2 deletions docs/data-sources/stream_connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ If `type` is of value `Kafka` the following additional attributes are defined:
* `access` - Information about the networking access. See [access](#access).

### Access
* `name` - Id of the vpc peer when the type is `VPC`.
* `type` - Networking type. Either `PUBLIC` or `VPC`. Default is `PUBLIC`.
* `type` - Selected networking type. Either `PUBLIC`, `VPC` or `PRIVATE_LINK`. Defaults to `PUBLIC`.
* `connection_id` - Id of the Private Link connection when type is `PRIVATE_LINK`.

To learn more, see: [MongoDB Atlas API - Stream Connection](https://www.mongodb.com/docs/atlas/reference/api-resources-spec/#tag/Streams/operation/listStreamConnections) Documentation.
The [Terraform Provider Examples Section](https://github.com/mongodb/terraform-provider-mongodbatlas/blob/master/examples/mongodbatlas_stream_instance/atlas-streams-user-journey.md) also contains details on the overall support for Atlas Streams Processing in Terraform.
4 changes: 2 additions & 2 deletions docs/resources/stream_connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ If `type` is of value `Kafka` the following additional arguments are defined:
* `access` - Information about the networking access. See [access](#access).

### Access
* `name` - Id of the vpc peer when the type is `VPC`.
* `type` - Selected networking type. Either `PUBLIC` or `VPC`. Defaults to `PUBLIC`.
* `type` - Selected networking type. Either `PUBLIC`, `VPC` or `PRIVATE_LINK`. Defaults to `PUBLIC`.
* `connection_id` - Id of the Private Link connection when type is `PRIVATE_LINK`.

## Import

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (d *streamConnectionDS) Read(ctx context.Context, req datasource.ReadReques
return
}

newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, nil, apiResp)
newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, nil, nil, apiResp)
if diags.HasError() {
resp.Diagnostics.Append(diags...)
return
Expand Down
18 changes: 14 additions & 4 deletions internal/service/streamconnection/model_stream_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ func NewStreamConnectionReq(ctx context.Context, plan *TFStreamConnectionModel)
}
streamConnection.Networking = &admin.StreamsKafkaNetworking{
Access: &admin.StreamsKafkaNetworkingAccess{
Type: networkingModel.Access.Type.ValueStringPointer(),
Type: networkingModel.Access.Type.ValueStringPointer(),
ConnectionId: networkingModel.Access.ConnectionID.ValueStringPointer(),
},
}
}

return &streamConnection, nil
}

func NewTFStreamConnection(ctx context.Context, projID, instanceName string, currAuthConfig *types.Object, apiResp *admin.StreamsConnection) (*TFStreamConnectionModel, diag.Diagnostics) {
func NewTFStreamConnection(ctx context.Context, projID, instanceName string, currAuthConfig, currNetworkingConfig *types.Object, apiResp *admin.StreamsConnection) (*TFStreamConnectionModel, diag.Diagnostics) {
EspenAlbert marked this conversation as resolved.
Show resolved Hide resolved
rID := fmt.Sprintf("%s-%s-%s", instanceName, projID, conversion.SafeString(apiResp.Name))
connectionModel := TFStreamConnectionModel{
ID: types.StringValue(rID),
Expand Down Expand Up @@ -128,9 +129,18 @@ func NewTFStreamConnection(ctx context.Context, projID, instanceName string, cur

connectionModel.Networking = types.ObjectNull(NetworkingObjectType.AttrTypes)
if apiResp.Networking != nil {
connectionID := types.StringNull()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the apiResp.Networking?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After testing this I realized that the API does not return the ConnectionID, this will be fixed in https://jira.mongodb.org/browse/CLOUDP-294715 and meanwhile I have implemented this to be able to test and use the feature

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extra implementation will be removed once connectionID is returned, and as you said in the other comment, this affects the import which makes the user experience lack a certain feature. Will push for this to be fixed soon

if currNetworkingConfig != nil && !currNetworkingConfig.IsNull() { // if config is available (create & update of resource) connectionID value from the config is set in new state
configNetworkingModel := &TFNetworkingModel{}
if diags := currNetworkingConfig.As(ctx, configNetworkingModel, basetypes.ObjectAsOptions{}); diags.HasError() {
return nil, diags
}
connectionID = configNetworkingModel.Access.ConnectionID
}
networkingModel, diags := types.ObjectValueFrom(ctx, NetworkingObjectType.AttrTypes, TFNetworkingModel{
Access: TFNetworkingAccessModel{
Type: types.StringPointerValue(apiResp.Networking.Access.Type),
Type: types.StringPointerValue(apiResp.Networking.Access.Type),
ConnectionID: connectionID,
},
})
if diags.HasError() {
Expand Down Expand Up @@ -175,7 +185,7 @@ func NewTFStreamConnections(ctx context.Context,
for i := range input {
projectID := streamConnectionsConfig.ProjectID.ValueString()
instanceName := streamConnectionsConfig.InstanceName.ValueString()
connectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, nil, &input[i])
connectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, nil, nil, &input[i])
if diags.HasError() {
return nil, diags
}
Expand Down
118 changes: 83 additions & 35 deletions internal/service/streamconnection/model_stream_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,41 @@ import (
)

const (
connectionName = "Connection"
typeValue = ""
clusterName = "Cluster0"
dummyProjectID = "111111111111111111111111"
instanceName = "InstanceName"
authMechanism = "PLAIN"
authUsername = "user1"
securityProtocol = "SSL"
bootstrapServers = "localhost:9092,another.host:9092"
dbRole = "customRole"
dbRoleType = "CUSTOM"
sampleConnectionName = "sample_stream_solar"
networkingType = "PUBLIC"
connectionName = "Connection"
EspenAlbert marked this conversation as resolved.
Show resolved Hide resolved
typeValue = ""
clusterName = "Cluster0"
dummyProjectID = "111111111111111111111111"
instanceName = "InstanceName"
authMechanism = "PLAIN"
authUsername = "user1"
securityProtocol = "SSL"
bootstrapServers = "localhost:9092,another.host:9092"
dbRole = "customRole"
dbRoleType = "CUSTOM"
sampleConnectionName = "sample_stream_solar"
networkingType = "PUBLIC"
privatelinkNetworkingType = "PRIVATE_LINK"
)

var connectionID = "connectionID"

var configMap = map[string]string{
"auto.offset.reset": "earliest",
}

type sdkToTFModelTestCase struct {
SDKResp *admin.StreamsConnection
providedProjID string
providedInstanceName string
providedAuthConfig *types.Object
expectedTFModel *streamconnection.TFStreamConnectionModel
name string
SDKResp *admin.StreamsConnection
providedProjID string
providedInstanceName string
providedAuthConfig *types.Object
providedNetworkingConfig *types.Object
expectedTFModel *streamconnection.TFStreamConnectionModel
name string
}

func TestStreamConnectionSDKToTFModel(t *testing.T) {
var authConfigWithPasswordDefined = tfAuthenticationObject(t, authMechanism, authUsername, "raw password")
var privateLinkNetworkingConfig = tfNetworkingObject(t, privatelinkNetworkingType, &connectionID)

testCases := []sdkToTFModelTestCase{
{
Expand All @@ -55,9 +60,10 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) {
Type: admin.PtrString(dbRoleType),
},
},
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: nil,
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: nil,
providedNetworkingConfig: nil,
expectedTFModel: &streamconnection.TFStreamConnectionModel{
ProjectID: types.StringValue(dummyProjectID),
InstanceName: types.StringValue(instanceName),
Expand Down Expand Up @@ -87,9 +93,10 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) {
BrokerPublicCertificate: admin.PtrString(DummyCACert),
},
},
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: &authConfigWithPasswordDefined,
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: &authConfigWithPasswordDefined,
providedNetworkingConfig: nil,
expectedTFModel: &streamconnection.TFStreamConnectionModel{
ProjectID: types.StringValue(dummyProjectID),
InstanceName: types.StringValue(instanceName),
Expand All @@ -109,9 +116,10 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) {
Name: admin.PtrString(connectionName),
Type: admin.PtrString("Kafka"),
},
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: nil,
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: nil,
providedNetworkingConfig: nil,
expectedTFModel: &streamconnection.TFStreamConnectionModel{
ProjectID: types.StringValue(dummyProjectID),
InstanceName: types.StringValue(instanceName),
Expand Down Expand Up @@ -140,9 +148,10 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) {
BrokerPublicCertificate: admin.PtrString(DummyCACert),
},
},
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: nil,
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: nil,
providedNetworkingConfig: nil,
expectedTFModel: &streamconnection.TFStreamConnectionModel{
ProjectID: types.StringValue(dummyProjectID),
InstanceName: types.StringValue(instanceName),
Expand All @@ -156,6 +165,44 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) {
Networking: types.ObjectNull(streamconnection.NetworkingObjectType.AttrTypes),
},
},
{
name: "Kafka connection type SDK response with Private link networking config",
SDKResp: &admin.StreamsConnection{
Name: admin.PtrString(connectionName),
Type: admin.PtrString("Kafka"),
Authentication: &admin.StreamsKafkaAuthentication{
Mechanism: admin.PtrString(authMechanism),
Username: admin.PtrString(authUsername),
},
BootstrapServers: admin.PtrString(bootstrapServers),
Config: &configMap,
Security: &admin.StreamsKafkaSecurity{
Protocol: admin.PtrString(securityProtocol),
BrokerPublicCertificate: admin.PtrString(DummyCACert),
},
Networking: &admin.StreamsKafkaNetworking{
Access: &admin.StreamsKafkaNetworkingAccess{
Type: admin.PtrString(privatelinkNetworkingType),
},
},
},
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: nil,
providedNetworkingConfig: &privateLinkNetworkingConfig,
expectedTFModel: &streamconnection.TFStreamConnectionModel{
ProjectID: types.StringValue(dummyProjectID),
InstanceName: types.StringValue(instanceName),
ConnectionName: types.StringValue(connectionName),
Type: types.StringValue("Kafka"),
Authentication: tfAuthenticationObjectWithNoPassword(t, authMechanism, authUsername),
BootstrapServers: types.StringValue(bootstrapServers),
Config: tfConfigMap(t, configMap),
Security: tfSecurityObject(t, DummyCACert, securityProtocol),
DBRoleToExecute: types.ObjectNull(streamconnection.DBRoleToExecuteObjectType.AttrTypes),
Networking: tfNetworkingObject(t, privatelinkNetworkingType, &connectionID),
},
},
{
name: "Sample connection type sample_stream_solar sample",
SDKResp: &admin.StreamsConnection{
Expand All @@ -180,7 +227,7 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
resultModel, diags := streamconnection.NewTFStreamConnection(context.Background(), tc.providedProjID, tc.providedInstanceName, tc.providedAuthConfig, tc.SDKResp)
resultModel, diags := streamconnection.NewTFStreamConnection(context.Background(), tc.providedProjID, tc.providedInstanceName, tc.providedAuthConfig, tc.providedNetworkingConfig, tc.SDKResp)
if diags.HasError() {
t.Fatalf("unexpected errors found: %s", diags.Errors()[0].Summary())
}
Expand Down Expand Up @@ -264,7 +311,7 @@ func TestStreamConnectionsSDKToTFModel(t *testing.T) {
Config: tfConfigMap(t, configMap),
Security: tfSecurityObject(t, DummyCACert, securityProtocol),
DBRoleToExecute: types.ObjectNull(streamconnection.DBRoleToExecuteObjectType.AttrTypes),
Networking: tfNetworkingObject(t, networkingType),
Networking: tfNetworkingObject(t, networkingType, nil),
},
{
ID: types.StringValue(fmt.Sprintf("%s-%s-%s", instanceName, dummyProjectID, connectionName)),
Expand Down Expand Up @@ -485,11 +532,12 @@ func tfDBRoleToExecuteObject(t *testing.T, role, roleType string) types.Object {
return auth
}

func tfNetworkingObject(t *testing.T, networkingType string) types.Object {
func tfNetworkingObject(t *testing.T, networkingType string, connectionID *string) types.Object {
t.Helper()
networking, diags := types.ObjectValueFrom(context.Background(), streamconnection.NetworkingObjectType.AttrTypes, streamconnection.TFNetworkingModel{
Access: streamconnection.TFNetworkingAccessModel{
Type: types.StringValue(networkingType),
Type: types.StringValue(networkingType),
ConnectionID: types.StringPointerValue(connectionID),
},
})
if diags.HasError() {
Expand Down
3 changes: 3 additions & 0 deletions internal/service/streamconnection/resource_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func ResourceSchema(ctx context.Context) schema.Schema {
"type": schema.StringAttribute{
Required: true,
},
"connection_id": schema.StringAttribute{
Optional: true,
},
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,13 @@ var DBRoleToExecuteObjectType = types.ObjectType{AttrTypes: map[string]attr.Type
}}

type TFNetworkingAccessModel struct {
Type types.String `tfsdk:"type"`
Type types.String `tfsdk:"type"`
ConnectionID types.String `tfsdk:"connection_id"`
}

var NetworkingAccessObjectType = types.ObjectType{AttrTypes: map[string]attr.Type{
"type": types.StringType,
"type": types.StringType,
"connection_id": types.StringType,
}}

type TFNetworkingModel struct {
Expand Down Expand Up @@ -122,7 +124,7 @@ func (r *streamConnectionRS) Create(ctx context.Context, req resource.CreateRequ
return
}

newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, &streamConnectionPlan.Authentication, apiResp)
newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, &streamConnectionPlan.Authentication, &streamConnectionPlan.Networking, apiResp)
if diags.HasError() {
resp.Diagnostics.Append(diags...)
return
Expand Down Expand Up @@ -151,7 +153,7 @@ func (r *streamConnectionRS) Read(ctx context.Context, req resource.ReadRequest,
return
}

newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, &streamConnectionState.Authentication, apiResp)
newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, &streamConnectionState.Authentication, &streamConnectionState.Networking, apiResp)
if diags.HasError() {
resp.Diagnostics.Append(diags...)
return
Expand Down Expand Up @@ -181,7 +183,7 @@ func (r *streamConnectionRS) Update(ctx context.Context, req resource.UpdateRequ
return
}

newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, &streamConnectionPlan.Authentication, apiResp)
newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, &streamConnectionPlan.Authentication, &streamConnectionPlan.Networking, apiResp)
if diags.HasError() {
resp.Diagnostics.Append(diags...)
return
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//nolint:gocritic
package streamprivatelinkendpoint

import (
Expand Down
Loading