Skip to content

Commit

Permalink
try fix controller
Browse files Browse the repository at this point in the history
  • Loading branch information
josvazg committed Feb 7, 2025
1 parent 6e5fc49 commit ae9904f
Show file tree
Hide file tree
Showing 31 changed files with 599 additions and 1,225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ limitations under the License.
import (
"context"
"errors"
"fmt"
"reflect"
"time"

Expand All @@ -41,15 +40,14 @@ import (

"github.com/mongodb/mongodb-atlas-kubernetes/v2/api"
akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/api/v1/provider"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/api/v1/status"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlas"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/customresource"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/reconciler"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/statushandler"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/workflow"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/indexer"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/networkcontainer"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/networkpeering"
)

Expand Down Expand Up @@ -86,13 +84,6 @@ func NewAtlasNetworkPeeringsReconciler(
}
}

type reconcileRequest struct {
workflowCtx *workflow.Context
service networkpeering.NetworkPeeringService
projectID string
networkPeering *akov2.AtlasNetworkPeering
}

//+kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasnetworkpeerings,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasnetworkpeerings/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasnetworkpeerings/finalizers,verbs=update
Expand Down Expand Up @@ -137,237 +128,22 @@ func (r *AtlasNetworkPeeringReconciler) Reconcile(ctx context.Context, req ctrl.
if err != nil {
return r.terminate(workflowCtx, &akoNetworkPeering, workflow.AtlasAPIAccessNotConfigured, err), nil
}
sdkClient, orgID, err := r.AtlasProvider.SdkClient(ctx, credentials, r.Log)
sdkClientSet, orgID, err := r.AtlasProvider.SdkClientSet(ctx, credentials, r.Log)
if err != nil {
return r.terminate(workflowCtx, &akoNetworkPeering, workflow.AtlasAPIAccessNotConfigured, err), nil
}
project, err := r.ResolveProject(ctx, sdkClient, &akoNetworkPeering, orgID)
project, err := r.ResolveProject(ctx, sdkClientSet.SdkClient20231115008, &akoNetworkPeering, orgID)
if err != nil {
return r.terminate(workflowCtx, &akoNetworkPeering, workflow.AtlasAPIAccessNotConfigured, err), nil
}
return r.handle(&reconcileRequest{
workflowCtx: workflowCtx,
service: networkpeering.NewNetworkPeeringService(sdkClient.NetworkPeeringApi),
projectID: project.ID,
networkPeering: &akoNetworkPeering,
return r.handle(workflowCtx, &reconcileRequest{
service: networkpeering.NewNetworkPeeringServiceFromClientSet(sdkClientSet),
containerService: networkcontainer.NewNetworkContainerServiceFromClientSet(sdkClientSet),
projectID: project.ID,
networkPeering: &akoNetworkPeering,
}), nil
}

func (r *AtlasNetworkPeeringReconciler) handle(req *reconcileRequest) ctrl.Result {
r.Log.Infow("handling network peering reconcile request",
"service set", (req.service != nil), "projectID", req.projectID, "networkPeering", req.networkPeering)
var atlasPeer *networkpeering.NetworkPeer
if req.networkPeering.Status.ID != "" {
peer, err := req.service.GetPeer(req.workflowCtx.Context, req.projectID, req.networkPeering.Status.ID)
if err != nil && !errors.Is(err, networkpeering.ErrNotFound) {
return r.terminate(req.workflowCtx, req.networkPeering, workflow.Internal, err)
}
atlasPeer = peer
}
inAtlas := atlasPeer != nil
deleted := req.networkPeering.DeletionTimestamp != nil

switch {
case !deleted && !inAtlas:
return r.create(req)
case !deleted && inAtlas:
return r.sync(req, atlasPeer)
case deleted && inAtlas:
return r.delete(req, atlasPeer)
default:
return r.unmanage(req) // deleted && !inAtlas
}
}

func (r *AtlasNetworkPeeringReconciler) create(req *reconcileRequest) ctrl.Result {
container, err := r.handleContainer(req)
if err != nil {
return r.terminate(req.workflowCtx, req.networkPeering, workflow.Internal, err)
}
req.workflowCtx.EnsureStatusOption(updateContainerStatusOption(container))
specPeer := networkpeering.NewNetworkPeeringSpec(&req.networkPeering.Spec.AtlasNetworkPeeringConfig)
specPeer.ContainerID = container.ID
newPeer, err := req.service.CreatePeer(req.workflowCtx.Context, req.projectID, specPeer)
if err != nil {
return r.terminate(req.workflowCtx, req.networkPeering, workflow.Internal, err)
}
req.workflowCtx.EnsureStatusOption(updatePeeringStatusOption(newPeer))
return workflow.InProgress(
workflow.NetworkPeeringConnectionCreating,
fmt.Sprintf("Network Peering Connection %s is %s",
req.networkPeering.Status.ID, req.networkPeering.Status.Status),
).ReconcileResult()
}

func (r *AtlasNetworkPeeringReconciler) sync(req *reconcileRequest, atlasPeer *networkpeering.NetworkPeer) ctrl.Result {
container, err := r.handleContainer(req)
if err != nil {
return r.terminate(req.workflowCtx, req.networkPeering, workflow.Internal, err)
}
req.workflowCtx.EnsureStatusOption(updateContainerStatusOption(container))
switch {
case atlasPeer.Failed():
err := fmt.Errorf("peer connection failed: %s", atlasPeer.ErrorMessage)
return r.terminate(req.workflowCtx, req.networkPeering, workflow.Internal, err)
case !atlasPeer.Available():
return r.inProgress(req, atlasPeer)
}
return r.ready(req, atlasPeer)
}

func (r *AtlasNetworkPeeringReconciler) delete(req *reconcileRequest, atlasPeer *networkpeering.NetworkPeer) ctrl.Result {
id := req.networkPeering.Status.ID
peer := atlasPeer
if id != "" && !atlasPeer.Closing() {
if err := req.service.DeletePeer(req.workflowCtx.Context, req.projectID, id); err != nil {
wrappedErr := fmt.Errorf("failed to delete peer connection %s: %w", id, err)
return r.terminate(req.workflowCtx, req.networkPeering, workflow.Internal, wrappedErr)
}
closingPeer, err := req.service.GetPeer(req.workflowCtx.Context, req.projectID, id)
if err != nil {
wrappedErr := fmt.Errorf("failed to get closing peer connection %s: %w", id, err)
return r.terminate(req.workflowCtx, req.networkPeering, workflow.Internal, wrappedErr)
}
peer = closingPeer
}
return r.inProgress(req, peer)
}

func (r *AtlasNetworkPeeringReconciler) unmanage(req *reconcileRequest) ctrl.Result {
req.workflowCtx.EnsureStatusOption(clearPeeringStatusOption())
if _, err := r.handleContainer(req); err != nil {
containerID := containerID(req.networkPeering)
if !errors.Is(err, networkpeering.ErrContainerInUse) {
wrappedErr := fmt.Errorf("failed to clear container %s: %w", containerID, err)
return r.terminate(req.workflowCtx, req.networkPeering, workflow.Internal, wrappedErr)
}
}
req.workflowCtx.EnsureStatusOption(clearContainerStatusOption())
if err := customresource.ManageFinalizer(req.workflowCtx.Context, r.Client, req.networkPeering, customresource.UnsetFinalizer); err != nil {
return r.terminate(req.workflowCtx, req.networkPeering, workflow.AtlasFinalizerNotRemoved, err)
}

return workflow.Deleted().ReconcileResult()
}

func (r *AtlasNetworkPeeringReconciler) inProgress(req *reconcileRequest, peer *networkpeering.NetworkPeer) ctrl.Result {
req.workflowCtx.EnsureStatusOption(updatePeeringStatusOption(peer))

return workflow.InProgress(
workflow.NetworkPeeringConnectionPending,
fmt.Sprintf("Network Peering Connection %s is %s", peer.ID, peer.Status),
).ReconcileResult()
}

func (r *AtlasNetworkPeeringReconciler) ready(req *reconcileRequest, peer *networkpeering.NetworkPeer) ctrl.Result {
if err := customresource.ManageFinalizer(req.workflowCtx.Context, r.Client, req.networkPeering, customresource.SetFinalizer); err != nil {
return r.terminate(req.workflowCtx, req.networkPeering, workflow.AtlasFinalizerNotSet, err)
}

req.workflowCtx.EnsureStatusOption(updatePeeringStatusOption(peer))
req.workflowCtx.SetConditionTrue(api.ReadyType)

if req.networkPeering.Spec.ExternalProjectRef != nil {
return workflow.Requeue(r.independentSyncPeriod).ReconcileResult()
}

return workflow.OK().ReconcileResult()
}

func (r *AtlasNetworkPeeringReconciler) terminate(
ctx *workflow.Context,
resource api.AtlasCustomResource,
reason workflow.ConditionReason,
err error,
) ctrl.Result {
condition := api.ReadyType
r.Log.Errorf("resource %T(%s/%s) failed on condition %s: %s",
resource, resource.GetNamespace(), resource.GetName(), condition, err)
result := workflow.Terminate(reason, err)
ctx.SetConditionFalse(api.ReadyType).SetConditionFromResult(condition, result)

return result.ReconcileResult()
}

func updateContainerStatusOption(container *networkpeering.ProviderContainer) status.AtlasNetworkPeeringStatusOption {
return func(peeringStatus *status.AtlasNetworkPeeringStatus) {
applyContainerStatus(peeringStatus, container)
}
}

func applyContainerStatus(peeringStatus *status.AtlasNetworkPeeringStatus, container *networkpeering.ProviderContainer) {
peeringStatus.ContainerID = container.ID
peeringStatus.ContainerProvisioned = container.Provisioned
providerName := provider.ProviderName(container.Provider)
switch {
case providerName == provider.ProviderAWS && container.AWSStatus != nil:
if peeringStatus.AWSStatus == nil {
peeringStatus.AWSStatus = &status.AWSStatus{}
}
peeringStatus.AWSStatus.ContainerVpcID = container.AWSStatus.VpcID
case providerName == provider.ProviderAzure && container.AzureStatus != nil:
if peeringStatus.AzureStatus == nil {
peeringStatus.AzureStatus = &status.AzureStatus{}
}
peeringStatus.AzureStatus.AzureSubscriptionID = container.AzureStatus.AzureSubscriptionID
peeringStatus.AzureStatus.VnetName = container.AzureStatus.VnetName
case providerName == provider.ProviderGCP && container.GoogleStatus != nil:
if peeringStatus.GoogleStatus == nil {
peeringStatus.GoogleStatus = &status.GoogleStatus{}
}
peeringStatus.GoogleStatus.GCPProjectID = container.GoogleStatus.GCPProjectID
peeringStatus.GoogleStatus.NetworkName = container.GoogleStatus.NetworkName
}
}

func updatePeeringStatusOption(peer *networkpeering.NetworkPeer) status.AtlasNetworkPeeringStatusOption {
return func(peeringStatus *status.AtlasNetworkPeeringStatus) {
applyPeeringStatus(peeringStatus, peer)
}
}

func applyPeeringStatus(peeringStatus *status.AtlasNetworkPeeringStatus, peer *networkpeering.NetworkPeer) {
peeringStatus.ID = peer.ID
peeringStatus.Status = peer.Status
peeringStatus.Error = peer.ErrorMessage
providerName := provider.ProviderName(peer.Provider)
if providerName == provider.ProviderAWS && peer.AWSStatus != nil {
if peeringStatus.AWSStatus == nil {
peeringStatus.AWSStatus = &status.AWSStatus{}
}
peeringStatus.AWSStatus.ConnectionID = peer.AWSStatus.ConnectionID
}
}

func clearPeeringStatusOption() status.AtlasNetworkPeeringStatusOption {
return func(peeringStatus *status.AtlasNetworkPeeringStatus) {
clearPeeringStatus(peeringStatus)
}
}

func clearPeeringStatus(peeringStatus *status.AtlasNetworkPeeringStatus) {
peeringStatus.ID = ""
peeringStatus.Status = ""
peeringStatus.Error = ""
if peeringStatus.AWSStatus != nil {
peeringStatus.AWSStatus.ConnectionID = ""
}
}

func clearContainerStatusOption() status.AtlasNetworkPeeringStatusOption {
return func(peeringStatus *status.AtlasNetworkPeeringStatus) {
clearContainerStatus(peeringStatus)
}
}

func clearContainerStatus(peeringStatus *status.AtlasNetworkPeeringStatus) {
peeringStatus.ContainerID = ""
peeringStatus.ContainerProvisioned = false
peeringStatus.AWSStatus = nil
peeringStatus.AzureStatus = nil
peeringStatus.GoogleStatus = nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AtlasNetworkPeeringReconciler) SetupWithManager(mgr ctrl.Manager, skipNameValidation bool) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
103 changes: 0 additions & 103 deletions internal/controller/atlasnetworkpeering/container.go

This file was deleted.

Loading

0 comments on commit ae9904f

Please sign in to comment.