Skip to content

Commit

Permalink
Merge pull request #493 from netgroup-polito/qcfe/operators-parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
kingmakerbot authored Jul 8, 2021
2 parents 8892add + 66609e0 commit 26a907b
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 70 deletions.
2 changes: 2 additions & 0 deletions operators/cmd/instance-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func main() {
var containerKaniko string
var containerEnvFileBrowserImg string
var containerEnvFileBrowserImgTag string
var maxConcurrentReconciles int

flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
Expand All @@ -96,6 +97,7 @@ func main() {
flag.StringVar(&containerKaniko, "container-kaniko-img", "gcr.io/kaniko-project/executor", "The image for the Kaniko container to be deployed")
flag.StringVar(&containerEnvFileBrowserImg, "container-env-filebrowser-img", "filebrowser/filebrowser", "The image name for the filebrowser image (sidecar for gui-based file manager)")
flag.StringVar(&containerEnvFileBrowserImgTag, "container-env-filebrowser-img-tag", "latest", "The tag for the FileBrowser container (the gui-based file manager)")
flag.IntVar(&maxConcurrentReconciles, "max-concurrent-reconciles", 8, "The maximum number of concurrent Reconciles which can be run")
klog.InitFlags(nil)
flag.Parse()

Expand Down
62 changes: 9 additions & 53 deletions operators/cmd/tenant-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ package main
import (
"context"
"flag"
"fmt"
"strings"
"time"

"github.com/Nerzal/gocloak/v7"
"github.com/go-resty/resty/v2"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -60,6 +58,7 @@ func main() {
var ncURL string
var ncTnOpUser string
var ncTnOpPsw string
var maxConcurrentReconciles int

flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
Expand All @@ -75,6 +74,7 @@ func main() {
flag.StringVar(&ncURL, "nc-url", "", "The base URL for the nextcloud actor.")
flag.StringVar(&ncTnOpUser, "nc-tenant-operator-user", "", "The username of the acting account for nextcloud.")
flag.StringVar(&ncTnOpPsw, "nc-tenant-operator-psw", "", "The password of the acting account for nextcloud.")
flag.IntVar(&maxConcurrentReconciles, "max-concurrent-reconciles", 8, "The maximum number of concurrent Reconciles which can be run")
klog.InitFlags(nil)
flag.Parse()

Expand Down Expand Up @@ -106,12 +106,12 @@ func main() {
klog.Fatal("Unable to start manager", err)
}

kcA, err := newKcActor(kcURL, kcTnOpUser, kcTnOpPsw, kcTargetRealm, kcTargetClient, kcLoginRealm)
kcA, err := controllers.NewKcActor(kcURL, kcTnOpUser, kcTnOpPsw, kcTargetRealm, kcTargetClient, kcLoginRealm)
if err != nil {
klog.Fatal("Error when setting up keycloak", err)
}

go checkAndRenewTokenPeriodically(context.Background(), kcA.Client, kcA.Token, kcTnOpUser, kcTnOpPsw, kcLoginRealm, 2*time.Minute, 5*time.Minute)
go checkAndRenewTokenPeriodically(context.Background(), kcA, kcTnOpUser, kcTnOpPsw, kcLoginRealm, 2*time.Minute, 5*time.Minute)

httpClient := resty.New().SetCookieJar(nil)
NcA := controllers.NcActor{TnOpUser: ncTnOpUser, TnOpPsw: ncTnOpPsw, Client: httpClient, BaseURL: ncURL}
Expand All @@ -122,6 +122,7 @@ func main() {
NcA: &NcA,
TargetLabelKey: targetLabelKey,
TargetLabelValue: targetLabelValue,
Concurrency: maxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
klog.Fatal("Unable to create controller for Tenant", err)
}
Expand Down Expand Up @@ -152,37 +153,14 @@ func main() {
}
}

// newKcActor sets up a keycloak client with the specified parameters and performs the first login.
func newKcActor(kcURL, kcUser, kcPsw, targetRealmName, targetClient, loginRealm string) (*controllers.KcActor, error) {
kcClient := gocloak.NewClient(kcURL)
token, err := kcClient.LoginAdmin(context.Background(), kcUser, kcPsw, loginRealm)
if err != nil {
klog.Error("Unable to login as admin on keycloak", err)
return nil, err
}
kcTargetClientID, err := getClientID(context.Background(), kcClient, token.AccessToken, targetRealmName, targetClient)
if err != nil {
klog.Errorf("Error when getting client id for %s", targetClient)
return nil, err
}
return &controllers.KcActor{
Client: kcClient,
Token: token,
TargetClientID: kcTargetClientID,
TargetRealm: targetRealmName,
UserRequiredActions: []string{"UPDATE_PASSWORD", "VERIFY_EMAIL"},
EmailActionsLifeSpanS: 60 * 60 * 24 * 30, // 30 Days
}, nil
}

// checkAndRenewTokenPeriodically checks every intervalCheck if the token is about to expire in less than expireLimit seconds or is already expired, if so it renews it.
func checkAndRenewTokenPeriodically(ctx context.Context, kcClient gocloak.GoCloak, token *gocloak.JWT, kcAdminUser, kcAdminPsw, loginRealm string, intervalCheck, expireLimit time.Duration) {
func checkAndRenewTokenPeriodically(ctx context.Context, kcA *controllers.KcActor, kcAdminUser, kcAdminPsw, loginRealm string, intervalCheck, expireLimit time.Duration) {
kcRenewTokenTicker := time.NewTicker(intervalCheck)
for {
// wait intervalCheck
<-kcRenewTokenTicker.C
// take expiration date of token from tokenJWT claims
_, claims, err := kcClient.DecodeAccessToken(ctx, token.AccessToken, loginRealm, "")
_, claims, err := kcA.Client.DecodeAccessToken(ctx, kcA.GetAccessToken(), loginRealm, "")
if err != nil {
klog.Fatal("Error when decoding token", err)
}
Expand All @@ -192,34 +170,12 @@ func checkAndRenewTokenPeriodically(ctx context.Context, kcClient gocloak.GoCloa

// if token is about to expire, renew it
if tokenExpiresIn < expireLimit {
newToken, err := kcClient.LoginAdmin(ctx, kcAdminUser, kcAdminPsw, loginRealm)
newToken, err := kcA.Client.LoginAdmin(ctx, kcAdminUser, kcAdminPsw, loginRealm)
if err != nil {
klog.Fatal("Error when renewing token", err)
}
*token = *newToken
kcA.SetToken(newToken)
klog.Info("Keycloak token renewed")
}
}
}

// getClientID returns the ID of the target client given the human id, to be used with the gocloak library.
func getClientID(ctx context.Context, kcClient gocloak.GoCloak, token, realmName, targetClient string) (string, error) {
clients, err := kcClient.GetClients(ctx, token, realmName, gocloak.GetClientsParams{ClientID: &targetClient})
if err != nil {
klog.Errorf("Error when getting clientID for client %s", targetClient)
klog.Error(err)
return "", err
}

switch len(clients) {
case 0:
klog.Error(nil, "Error, no clientID for client %s", targetClient)
return "", fmt.Errorf("no client ID for client %s", targetClient)
case 1:
targetClientID := *clients[0].ID
return targetClientID, nil
default:
klog.Error(nil, "Error, got too many clientIDs for client %s", targetClient)
return "", fmt.Errorf("too many clientIDs for client %s", targetClient)
}
}
5 changes: 5 additions & 0 deletions operators/pkg/instance-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"

crownlabsv1alpha2 "github.com/netgroup-polito/CrownLabs/operators/api/v1alpha2"
Expand Down Expand Up @@ -59,6 +60,7 @@ type InstanceReconciler struct {
Oauth2ProxyImage string
OidcClientSecret string
OidcProviderURL string
Concurrency int
ContainerEnvOpts ContainerEnvOpts

// This function, if configured, is deferred at the beginning of the Reconcile.
Expand Down Expand Up @@ -160,6 +162,9 @@ func (r *InstanceReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Also Deployments are watched in order to better handle container environment.
Owns(&appsv1.Deployment{}).
Owns(&cdiv1.DataVolume{}, builder.WithPredicates(dataVolumePredicate())).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.Concurrency,
}).
Complete(r)
}

Expand Down
90 changes: 76 additions & 14 deletions operators/pkg/tenant-controller/keycloak_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"

gocloak "github.com/Nerzal/gocloak/v7"
"k8s.io/klog/v2"
Expand All @@ -13,13 +14,74 @@ import (
// KcActor contains the needed objects and infos to use keycloak functionalities.
type KcActor struct {
Client gocloak.GoCloak
Token *gocloak.JWT
token *gocloak.JWT
tokenMutex sync.RWMutex
TargetRealm string
TargetClientID string
UserRequiredActions []string
EmailActionsLifeSpanS int
}

// GetAccessToken thread-safely returns the access token stored into the Token field.
func (kcA *KcActor) GetAccessToken() string {
kcA.tokenMutex.RLock()
defer kcA.tokenMutex.RUnlock()
return kcA.token.AccessToken
}

// SetToken thread-safely stores a new JWT inside the KcActor struct.
func (kcA *KcActor) SetToken(newToken *gocloak.JWT) {
kcA.tokenMutex.Lock()
defer kcA.tokenMutex.Unlock()
*kcA.token = *newToken
}

// NewKcActor sets up a keycloak client with the specified parameters and performs the first login.
func NewKcActor(kcURL, kcUser, kcPsw, targetRealmName, targetClient, loginRealm string) (*KcActor, error) {
kcClient := gocloak.NewClient(kcURL)
token, err := kcClient.LoginAdmin(context.Background(), kcUser, kcPsw, loginRealm)
if err != nil {
klog.Error("Unable to login as admin on keycloak", err)
return nil, err
}
kcTargetClientID, err := getClientID(context.Background(), kcClient, token.AccessToken, targetRealmName, targetClient)
if err != nil {
klog.Errorf("Error when getting client id for %s", targetClient)
return nil, err
}
return &KcActor{
Client: kcClient,
TargetClientID: kcTargetClientID,
TargetRealm: targetRealmName,
token: token,
tokenMutex: sync.RWMutex{},
UserRequiredActions: []string{"UPDATE_PASSWORD", "VERIFY_EMAIL"},
EmailActionsLifeSpanS: 60 * 60 * 24 * 30, // 30 Days
}, nil
}

// getClientID returns the ID of the target client given the human id, to be used with the gocloak library.
func getClientID(ctx context.Context, kcClient gocloak.GoCloak, token, realmName, targetClient string) (string, error) {
clients, err := kcClient.GetClients(ctx, token, realmName, gocloak.GetClientsParams{ClientID: &targetClient})
if err != nil {
klog.Errorf("Error when getting clientID for client %s", targetClient)
klog.Error(err)
return "", err
}

switch len(clients) {
case 0:
klog.Error(nil, "Error, no clientID for client %s", targetClient)
return "", fmt.Errorf("no client ID for client %s", targetClient)
case 1:
targetClientID := *clients[0].ID
return targetClientID, nil
default:
klog.Error(nil, "Error, got too many clientIDs for client %s", targetClient)
return "", fmt.Errorf("too many clientIDs for client %s", targetClient)
}
}

// createKcRoles takes as argument a map with each pair with the roleName as the key and its description as value.
func (kcA *KcActor) createKcRoles(ctx context.Context, rolesToCreate map[string]string) error {
for newRoleName, newRoleDescr := range rolesToCreate {
Expand All @@ -36,12 +98,12 @@ func (kcA *KcActor) createKcRole(ctx context.Context, newRoleName, newRoleDescr
roleAfter := gocloak.Role{Name: &newRoleName, Description: &newRoleDescr, ClientRole: &tr}

// check if keycloak role already esists
role, err := kcA.Client.GetClientRole(ctx, kcA.Token.AccessToken, kcA.TargetRealm, kcA.TargetClientID, newRoleName)
role, err := kcA.Client.GetClientRole(ctx, kcA.GetAccessToken(), kcA.TargetRealm, kcA.TargetClientID, newRoleName)
if err != nil && strings.Contains(err.Error(), "Could not find role") {
// role didn't exist
// need to create new role
klog.Infof("Role didn't exist %s", newRoleName)
createdRoleName, errCreate := kcA.Client.CreateClientRole(ctx, kcA.Token.AccessToken, kcA.TargetRealm, kcA.TargetClientID, roleAfter)
createdRoleName, errCreate := kcA.Client.CreateClientRole(ctx, kcA.GetAccessToken(), kcA.TargetRealm, kcA.TargetClientID, roleAfter)
if errCreate != nil {
klog.Errorf("Error when creating role -> %s", errCreate)
return errCreate
Expand All @@ -55,7 +117,7 @@ func (kcA *KcActor) createKcRole(ctx context.Context, newRoleName, newRoleDescr

if *role.Name == newRoleName {
klog.Infof("Role already existed %s", newRoleName)
err := kcA.Client.UpdateRole(ctx, kcA.Token.AccessToken, kcA.TargetRealm, kcA.TargetClientID, roleAfter)
err := kcA.Client.UpdateRole(ctx, kcA.GetAccessToken(), kcA.TargetRealm, kcA.TargetClientID, roleAfter)
if err != nil {
klog.Errorf("Error when creating role -> %s", err)
return err
Expand All @@ -69,7 +131,7 @@ func (kcA *KcActor) createKcRole(ctx context.Context, newRoleName, newRoleDescr

func (kcA *KcActor) deleteKcRoles(ctx context.Context, rolesToDelete map[string]string) error {
for role := range rolesToDelete {
if err := kcA.Client.DeleteClientRole(ctx, kcA.Token.AccessToken, kcA.TargetRealm, kcA.TargetClientID, role); err != nil {
if err := kcA.Client.DeleteClientRole(ctx, kcA.GetAccessToken(), kcA.TargetRealm, kcA.TargetClientID, role); err != nil {
if !strings.Contains(err.Error(), "404") {
klog.Errorf("Could not delete user role %s -> %s", role, err)
return err
Expand All @@ -81,7 +143,7 @@ func (kcA *KcActor) deleteKcRoles(ctx context.Context, rolesToDelete map[string]

func (kcA *KcActor) getUserInfo(ctx context.Context, username string) (userID, email *string, err error) {
// using Exact in the GetUsersParams deosn't work cause keycloak doesn't offer the field in the API
usersFound, err := kcA.Client.GetUsers(ctx, kcA.Token.AccessToken, kcA.TargetRealm, gocloak.GetUsersParams{Username: &username})
usersFound, err := kcA.Client.GetUsers(ctx, kcA.GetAccessToken(), kcA.TargetRealm, gocloak.GetUsersParams{Username: &username})
if err != nil {
klog.Errorf("Error when trying to find user %s -> %s", username, err)
return nil, nil, err
Expand Down Expand Up @@ -123,13 +185,13 @@ func (kcA *KcActor) createKcUser(ctx context.Context, username, firstName, lastN
Enabled: &tr,
EmailVerified: &fa,
}
newUserID, err := kcA.Client.CreateUser(ctx, kcA.Token.AccessToken, kcA.TargetRealm, newUser)
newUserID, err := kcA.Client.CreateUser(ctx, kcA.GetAccessToken(), kcA.TargetRealm, newUser)
if err != nil {
klog.Errorf("Error when creating user %s -> %s", username, err)
return nil, err
}
klog.Infof("User %s created", username)
if err = kcA.Client.ExecuteActionsEmail(ctx, kcA.Token.AccessToken, kcA.TargetRealm, gocloak.ExecuteActionsEmail{
if err = kcA.Client.ExecuteActionsEmail(ctx, kcA.GetAccessToken(), kcA.TargetRealm, gocloak.ExecuteActionsEmail{
UserID: &newUserID,
Lifespan: &kcA.EmailActionsLifeSpanS,
Actions: &kcA.UserRequiredActions,
Expand All @@ -155,13 +217,13 @@ func (kcA *KcActor) updateKcUser(ctx context.Context, userID, firstName, lastNam
if requireUserActions {
updatedUser.EmailVerified = &fa
}
err := kcA.Client.UpdateUser(ctx, kcA.Token.AccessToken, kcA.TargetRealm, updatedUser)
err := kcA.Client.UpdateUser(ctx, kcA.GetAccessToken(), kcA.TargetRealm, updatedUser)
if err != nil {
klog.Errorf("Error when updating user %s %s -> %s", firstName, lastName, err)
return err
}
if requireUserActions {
if err = kcA.Client.ExecuteActionsEmail(ctx, kcA.Token.AccessToken, kcA.TargetRealm, gocloak.ExecuteActionsEmail{
if err = kcA.Client.ExecuteActionsEmail(ctx, kcA.GetAccessToken(), kcA.TargetRealm, gocloak.ExecuteActionsEmail{
UserID: &userID,
Lifespan: &kcA.EmailActionsLifeSpanS,
Actions: &kcA.UserRequiredActions,
Expand All @@ -179,7 +241,7 @@ func (kcA *KcActor) updateUserRoles(ctx context.Context, roleNames []string, use
// convert workspaces to actual keyloak role
for i, roleName := range roleNames {
// check if role exists and get roleID to use with gocloak
gotRole, err := kcA.Client.GetClientRole(ctx, kcA.Token.AccessToken, kcA.TargetRealm, kcA.TargetClientID, roleName)
gotRole, err := kcA.Client.GetClientRole(ctx, kcA.GetAccessToken(), kcA.TargetRealm, kcA.TargetClientID, roleName)
if err != nil {
klog.Errorf("Error when getting info on client role %s -> %s", roleName, err)
return err
Expand All @@ -188,22 +250,22 @@ func (kcA *KcActor) updateUserRoles(ctx context.Context, roleNames []string, use
rolesToSet[i].Name = gotRole.Name
}
// get current roles of user
userCurrentRoles, err := kcA.Client.GetClientRolesByUserID(ctx, kcA.Token.AccessToken, kcA.TargetRealm, kcA.TargetClientID, userID)
userCurrentRoles, err := kcA.Client.GetClientRolesByUserID(ctx, kcA.GetAccessToken(), kcA.TargetRealm, kcA.TargetClientID, userID)
if err != nil {
klog.Errorf("Error when getting roles of user with ID %s -> %s", userID, err)
return err
}
rolesToDelete := subtractRoles(userCurrentRoles, rolesToSet, editOnlyPrefix)
if len(rolesToDelete) > 0 {
// this is idempotent
err = kcA.Client.DeleteClientRoleFromUser(ctx, kcA.Token.AccessToken, kcA.TargetRealm, kcA.TargetClientID, userID, rolesToDelete)
err = kcA.Client.DeleteClientRoleFromUser(ctx, kcA.GetAccessToken(), kcA.TargetRealm, kcA.TargetClientID, userID, rolesToDelete)
if err != nil {
klog.Errorf("Error when removing user roles to user with ID %s -> %s", userID, err)
return err
}
}
// // this is idempotent
err = kcA.Client.AddClientRoleToUser(ctx, kcA.Token.AccessToken, kcA.TargetRealm, kcA.TargetClientID, userID, rolesToSet)
err = kcA.Client.AddClientRoleToUser(ctx, kcA.GetAccessToken(), kcA.TargetRealm, kcA.TargetClientID, userID, rolesToSet)
if err != nil {
klog.Errorf("Error when adding user roles to user with ID %s -> %s", userID, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion operators/pkg/tenant-controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var reqActions = []string{"UPDATE_PASSWORD", "VERIFY_EMAIL"}
var emailActionLifespan = 60 * 60 * 24 * 30
var kcA = KcActor{
Client: mKcClient,
Token: mToken,
token: mToken,
TargetRealm: kcTargetRealm,
TargetClientID: kcTargetClientID,
UserRequiredActions: reqActions,
Expand Down
Loading

0 comments on commit 26a907b

Please sign in to comment.