diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index be1d66c4..3e783adc 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -2,7 +2,9 @@ package constants const ( TCPProtocol = "tcp" + HTTPSScheme = "https" DefaultLogLevel = "INFO" + JSONContentType = "application/json" Galadriel = "galadriel" GaladrielServerName = "galadriel-server" ) diff --git a/pkg/harvester/bundlemanager/fedbundles.go b/pkg/harvester/bundlemanager/fedbundles.go index d5da4686..20214902 100644 --- a/pkg/harvester/bundlemanager/fedbundles.go +++ b/pkg/harvester/bundlemanager/fedbundles.go @@ -19,15 +19,16 @@ import ( "google.golang.org/grpc/codes" ) -// FederatedBundlesSynchronizer is responsible for periodically syncing the federated bundles in SPIRE Server -// with the Federated bundles fetched from Galadriel Server. -// This process involves: -// 1. Fetching the federated bundles from Galadriel Server. -// 2. Verifying the integrity of the bundles. -// 3. Setting the new bundles to SPIRE Server. -// 4. Deleting the bundles from SPIRE Server for relationships that no longer exist. -// The deletion of bundles in SPIRE Server is done using the DISSOCIATE mode deletes the federated bundles -// dissociating the registration entries from the federated trust domain. +// FederatedBundlesSynchronizer is responsible for periodically synchronizing the federated bundles +// in the SPIRE Server with those fetched from the Galadriel Server. The synchronization process consists of the following steps: +// 1. Fetch the federated bundles from the Galadriel Server. +// 2. Verify the integrity of these bundles using the provided bundle verifiers. +// 3. Update the SPIRE Server with the new bundles. +// 4. If any relationships no longer exist, remove the corresponding bundles from the SPIRE Server. +// +// The removal of bundles is done in DISSOCIATE mode, which dissociates the registration entries +// from the non-existent federated trust domains. It also maintains a last-known state of federated +// bundles fetched from the Galadriel Server to optimize synchronizations. type FederatedBundlesSynchronizer struct { spireClient spireclient.Client galadrielClient galadrielclient.Client @@ -36,7 +37,7 @@ type FederatedBundlesSynchronizer struct { logger logrus.FieldLogger // last state of Federated Bundles fetched from Galadriel Server - lastFederatesBundlesDigests map[spiffeid.TrustDomain][]byte + lastFederatedBundleDigests map[spiffeid.TrustDomain][]byte } // FederatedBundlesSynchronizerConfig holds the configuration for FederatedBundlesSynchronizer. @@ -58,8 +59,8 @@ func NewFederatedBundlesSynchronizer(config *FederatedBundlesSynchronizerConfig) } } -// Run starts the synchronization process. -func (s *FederatedBundlesSynchronizer) Run(ctx context.Context) error { +// StartSyncing starts the synchronization process. +func (s *FederatedBundlesSynchronizer) StartSyncing(ctx context.Context) error { s.logger.Info("Federated Bundles Synchronizer started") ticker := time.NewTicker(s.syncInterval) @@ -68,7 +69,7 @@ func (s *FederatedBundlesSynchronizer) Run(ctx context.Context) error { for { select { case <-ticker.C: - if err := s.syncFederatedBundles(ctx); err != nil { + if err := s.synchronizeFederatedBundles(ctx); err != nil { s.logger.Errorf("Failed to sync federated bundles with Galadriel Server: %v", err) } case <-ctx.Done(): @@ -78,40 +79,40 @@ func (s *FederatedBundlesSynchronizer) Run(ctx context.Context) error { } } -func (s *FederatedBundlesSynchronizer) syncFederatedBundles(ctx context.Context) error { - s.logger.Debug("Syncing federated bundles with Galadriel Server") +func (s *FederatedBundlesSynchronizer) synchronizeFederatedBundles(ctx context.Context) error { + s.logger.Debug("Synchronize federated bundles with Galadriel Server") - spireCtx, spireCancel := context.WithTimeout(ctx, spireCallTimeout) - if spireCancel == nil { + spireCallCtx, spireCallCancel := context.WithTimeout(ctx, spireCallTimeout) + if spireCallCancel == nil { return fmt.Errorf("failed to create context for SPIRE call") } - defer spireCancel() + defer spireCallCancel() - fedBundlesInSPIRE, err := s.fetchSPIREFederatedBundles(spireCtx) + fedBundlesInSPIRE, err := s.fetchSPIREFederatedBundles(spireCallCtx) if err != nil { return fmt.Errorf("failed to fetch federated bundles from SPIRE Server: %w", err) } - galadrielCtx, galadrielCancel := context.WithTimeout(ctx, galadrielCallTimeout) - if galadrielCancel == nil { + galadrielCallCtx, galadrielCallCancel := context.WithTimeout(ctx, galadrielCallTimeout) + if galadrielCallCancel == nil { return fmt.Errorf("failed to create context for Galadriel call") } - defer galadrielCancel() + defer galadrielCallCancel() - bundles, digests, err := s.galadrielClient.SyncBundles(galadrielCtx, fedBundlesInSPIRE) + bundles, digests, err := s.galadrielClient.SyncBundles(galadrielCallCtx, fedBundlesInSPIRE) if err != nil { return fmt.Errorf("failed to sync federated bundles with Galadriel Server: %w", err) } // if the federated bundles have not changed since last server poll, skip the sync - if equalMaps(s.lastFederatesBundlesDigests, digests) { + if areMapsEqual(s.lastFederatedBundleDigests, digests) { s.logger.Debug("Federated bundles have not changed") return nil } bundlesToSet := make([]*spiffebundle.Bundle, 0) for _, b := range bundles { - if err := s.verifyBundle(b); err != nil { + if err := s.validateBundleIntegrity(b); err != nil { s.logger.Errorf("Failed to verify bundle for trust domain %q: %v", b.TrustDomainName, err) continue // skip the bundle } @@ -125,21 +126,21 @@ func (s *FederatedBundlesSynchronizer) syncFederatedBundles(ctx context.Context) bundlesToSet = append(bundlesToSet, spireBundle) } - setStatuses, err := s.spireClient.SetFederatedBundles(spireCtx, bundlesToSet) + setStatuses, err := s.spireClient.SetFederatedBundles(spireCallCtx, bundlesToSet) if err != nil { s.logger.Errorf("Failed to set federated bundles in SPIRE Server: %v", err) } else { s.logFederatedBundleSetStatuses(setStatuses) } - bundlesToDelete := s.getTrustDomainsToDelete(fedBundlesInSPIRE, digests) + bundlesToDelete := s.findTrustDomainsToDelete(fedBundlesInSPIRE, digests) if len(bundlesToDelete) == 0 { // No updates to be made, update the last state and return - s.lastFederatesBundlesDigests = digests + s.lastFederatedBundleDigests = digests return nil } - deleteStatuses, err := s.spireClient.DeleteFederatedBundles(spireCtx, bundlesToDelete) + deleteStatuses, err := s.spireClient.DeleteFederatedBundles(spireCallCtx, bundlesToDelete) if err != nil { s.logger.Errorf("Failed to delete federated bundles in SPIRE Server: %v", err) } else { @@ -147,13 +148,13 @@ func (s *FederatedBundlesSynchronizer) syncFederatedBundles(ctx context.Context) } // update the last state of federated bundles - s.lastFederatesBundlesDigests = digests + s.lastFederatedBundleDigests = digests return nil } -// getTrustDomainsToDelete returns a slice of trust domains to delete based on the provided bundles and digests map. -func (s *FederatedBundlesSynchronizer) getTrustDomainsToDelete(bundles []*entity.Bundle, digests map[spiffeid.TrustDomain][]byte) []spiffeid.TrustDomain { +// findTrustDomainsToDelete returns a slice of trust domains to delete based on the provided bundles and digests map. +func (s *FederatedBundlesSynchronizer) findTrustDomainsToDelete(bundles []*entity.Bundle, digests map[spiffeid.TrustDomain][]byte) []spiffeid.TrustDomain { trustDomainsToDelete := make([]spiffeid.TrustDomain, 0) for _, b := range bundles { if _, ok := digests[b.TrustDomainName]; !ok { @@ -164,9 +165,9 @@ func (s *FederatedBundlesSynchronizer) getTrustDomainsToDelete(bundles []*entity return trustDomainsToDelete } -// verifyBundle verifies the bundle using the given verifiers. +// validateBundleIntegrity verifies the bundle using the given verifiers. // If one of the verifiers can verify the bundle, it returns nil. -func (s *FederatedBundlesSynchronizer) verifyBundle(bundle *entity.Bundle) error { +func (s *FederatedBundlesSynchronizer) validateBundleIntegrity(bundle *entity.Bundle) error { var certChain []*x509.Certificate if len(bundle.SigningCertificate) > 0 { var err error @@ -231,7 +232,7 @@ func (s *FederatedBundlesSynchronizer) logFederatedBundleDeleteStatuses(deleteSt } } -func equalMaps(map1, map2 map[spiffeid.TrustDomain][]byte) bool { +func areMapsEqual(map1, map2 map[spiffeid.TrustDomain][]byte) bool { if len(map1) == 0 && len(map2) == 0 { return true } diff --git a/pkg/harvester/bundlemanager/manager.go b/pkg/harvester/bundlemanager/manager.go index 5180e5f6..36aa608a 100644 --- a/pkg/harvester/bundlemanager/manager.go +++ b/pkg/harvester/bundlemanager/manager.go @@ -75,8 +75,8 @@ func NewBundleManager(c *Config) *BundleManager { // Run runs the bundle synchronization processes. func (bm *BundleManager) Run(ctx context.Context) error { tasks := []func(ctx context.Context) error{ - bm.federatedBundlesSynchronizer.Run, - bm.spireBundleSynchronizer.Run, + bm.federatedBundlesSynchronizer.StartSyncing, + bm.spireBundleSynchronizer.StartSyncing, } err := util.RunTasks(ctx, tasks...) diff --git a/pkg/harvester/bundlemanager/spirebundle.go b/pkg/harvester/bundlemanager/spirebundle.go index a21d1c05..70fdcabf 100644 --- a/pkg/harvester/bundlemanager/spirebundle.go +++ b/pkg/harvester/bundlemanager/spirebundle.go @@ -14,8 +14,8 @@ import ( "github.com/spiffe/go-spiffe/v2/bundle/spiffebundle" ) -// SpireBundleSynchronizer is responsible for periodically fetching the bundle from the SPIRE Server, -// signing it, and uploading it to the Galadriel Server. +// SpireBundleSynchronizer manages the synchronization of bundles from the SPIRE server. +// It periodically fetches the bundle from the SPIRE Server, signs it, and uploads it to the Galadriel Server. type SpireBundleSynchronizer struct { spireClient spireclient.Client galadrielClient galadrielclient.Client @@ -46,8 +46,9 @@ func NewSpireSynchronizer(config *SpireSynchronizerConfig) *SpireBundleSynchroni } } -// Run starts the SPIRE bundle Synchronizer process. -func (s *SpireBundleSynchronizer) Run(ctx context.Context) error { +// StartSyncing initializes the SPIRE bundle synchronization process. +// It starts an infinite loop that periodically fetches the SPIRE bundle, signs it and uploads it to the Galadriel Server. +func (s *SpireBundleSynchronizer) StartSyncing(ctx context.Context) error { s.logger.Info("SPIRE Bundle Synchronizer started") ticker := time.NewTicker(s.syncInterval) @@ -56,7 +57,7 @@ func (s *SpireBundleSynchronizer) Run(ctx context.Context) error { for { select { case <-ticker.C: - err := s.syncSPIREBundle(ctx) + err := s.syncSpireBundleToGaladriel(ctx) if err != nil { s.logger.Errorf("Failed to sync SPIRE bundle: %v", err) } @@ -67,18 +68,19 @@ func (s *SpireBundleSynchronizer) Run(ctx context.Context) error { } } -// syncSPIREBundle periodically checks the SPIRE Server for a new bundle, signs it, and uploads the signed bundle to the Galadriel Server. -func (s *SpireBundleSynchronizer) syncSPIREBundle(ctx context.Context) error { +// syncSPIREBundle fetches a new bundle from the SPIRE Server, +// signs it if it's new, and then uploads it to the Galadriel Server. +func (s *SpireBundleSynchronizer) syncSpireBundleToGaladriel(ctx context.Context) error { s.logger.Debug("Checking SPIRE Server for a new bundle") - spireCtx, spireCancel := context.WithTimeout(ctx, spireCallTimeout) - if spireCancel == nil { + spireCallCtx, spireCallCancel := context.WithTimeout(ctx, spireCallTimeout) + if spireCallCancel == nil { return fmt.Errorf("failed to create context for SPIRE call") } - defer spireCancel() + defer spireCallCancel() // Fetch SPIRE bundle - bundle, err := s.spireClient.GetBundle(spireCtx) + bundle, err := s.spireClient.GetBundle(spireCallCtx) if err != nil { return err } @@ -91,19 +93,19 @@ func (s *SpireBundleSynchronizer) syncSPIREBundle(ctx context.Context) error { s.logger.Debug("New bundle from SPIRE Server") // Generate the bundle to upload - b, err := s.generateBundleToUpload(bundle) + bundleToUpload, err := s.prepareBundleForUpload(bundle) if err != nil { return fmt.Errorf("failed to create bundle to upload: %w", err) } - galadrielCtx, galadrielCancel := context.WithTimeout(ctx, galadrielCallTimeout) - if galadrielCancel == nil { + galadrielCallCtx, galadrielCallCancel := context.WithTimeout(ctx, galadrielCallTimeout) + if galadrielCallCancel == nil { return fmt.Errorf("failed to create context for Galadriel Server call") } - defer galadrielCancel() + defer galadrielCallCancel() // Upload the bundle to Galadriel Server - err = s.galadrielClient.PostBundle(galadrielCtx, b) + err = s.galadrielClient.PostBundle(galadrielCallCtx, bundleToUpload) if err != nil { return fmt.Errorf("failed to upload bundle to Galadriel Server: %w", err) } @@ -113,9 +115,9 @@ func (s *SpireBundleSynchronizer) syncSPIREBundle(ctx context.Context) error { return nil } -// generateBundleToUpload creates an entity.Bundle using the provided SPIRE bundle. +// prepareBundleForUpload creates an entity.Bundle using the provided SPIRE bundle. // It marshals the SPIRE bundle, generates the bundle signature, and calculates the digest. -func (s *SpireBundleSynchronizer) generateBundleToUpload(spireBundle *spiffebundle.Bundle) (*entity.Bundle, error) { +func (s *SpireBundleSynchronizer) prepareBundleForUpload(spireBundle *spiffebundle.Bundle) (*entity.Bundle, error) { bundleBytes, err := spireBundle.Marshal() if err != nil { return nil, fmt.Errorf("failed to marshal SPIRE bundle: %w", err) diff --git a/pkg/harvester/galadrielclient/client.go b/pkg/harvester/galadrielclient/client.go index a75c8004..0b1f76cb 100644 --- a/pkg/harvester/galadrielclient/client.go +++ b/pkg/harvester/galadrielclient/client.go @@ -27,9 +27,6 @@ import ( ) const ( - scheme = "https://" - jsonContentType = "application/json" - jwtRotationInterval = 5 * time.Minute onboardPath = "/trust-domain/onboard" tokenFile = "jwt-token" @@ -40,7 +37,7 @@ var ( NotOnboardedErr = errors.New("client has not been onboarded to Galadriel Server") ) -// Client represents a client to interact with the Galadriel Server +// Client represents a client to interact with the Galadriel Server API. type Client interface { SyncBundles(context.Context, []*entity.Bundle) ([]*entity.Bundle, map[spiffeid.TrustDomain][]byte, error) PostBundle(context.Context, *entity.Bundle) error @@ -48,7 +45,7 @@ type Client interface { UpdateRelationship(context.Context, uuid.UUID, entity.ConsentStatus) (*entity.Relationship, error) } -// Config is a struct that holds the configuration for the Galadriel Server client +// Config is a struct that holds the configuration for the Galadriel Server client. type Config struct { TrustDomain spiffeid.TrustDomain GaladrielServerAddress *net.TCPAddr @@ -62,12 +59,12 @@ type Config struct { type client struct { client harvester.ClientInterface trustDomain spiffeid.TrustDomain - jwtStore *jwtProvider + jwtStore *jwtStore logger logrus.FieldLogger } -// jwtProvider is a struct that holds the JWT access token -type jwtProvider struct { +// jwtStore is a struct that holds the JWT access token +type jwtStore struct { mu sync.RWMutex jwt string tokenFilePath string // File path for storing the JWT token @@ -91,7 +88,7 @@ func NewClient(ctx context.Context, cfg *Config) (Client, error) { return nil, errors.New("data dir cannot be empty") } - jp, err := newJWTProvider(cfg.DataDir, tokenFile, cfg.Logger) + jwtProvider, err := newJwtStore(cfg.DataDir, tokenFile, cfg.Logger) if err != nil { return nil, fmt.Errorf("failed to create JWT provider: %w", err) } @@ -101,12 +98,12 @@ func NewClient(ctx context.Context, cfg *Config) (Client, error) { return nil, fmt.Errorf("failed to create TLS client for server %s: %w", cfg.GaladrielServerAddress, err) } - serverAddress := fmt.Sprintf("%s%s", scheme, cfg.GaladrielServerAddress.String()) + serverAddress := fmt.Sprintf("%s://%s", constants.HTTPSScheme, cfg.GaladrielServerAddress.String()) // Create harvester client harvesterClient, err := harvester.NewClient(serverAddress, harvester.WithHTTPClient(c), - harvester.WithRequestEditorFn(createJWTTokenReqEditor(jp))) + harvester.WithRequestEditorFn(createJWTTokenReqEditor(jwtProvider))) if err != nil { return nil, fmt.Errorf("failed to create harvester client: %w", err) } @@ -115,7 +112,7 @@ func NewClient(ctx context.Context, cfg *Config) (Client, error) { trustDomain: cfg.TrustDomain, client: harvesterClient, logger: cfg.Logger, - jwtStore: jp, + jwtStore: jwtProvider, } // if the user provided a join token, try to onboard the Harvester to Galadriel Server @@ -125,7 +122,7 @@ func NewClient(ctx context.Context, cfg *Config) (Client, error) { } } - if !client.isOnboarded() { + if !client.isClientOnboarded() { // this happens if the user did not provide a join token and the Harvester cannot find a stored jwt token return nil, errors.New("harvester is not onboarded to Galadriel Server. A join token is required") } @@ -335,8 +332,8 @@ func (c *client) PostBundle(ctx context.Context, bundle *entity.Bundle) error { return nil } -// isOnboarded Check if the client has been onboarded by checking if there is a JWT token -func (c *client) isOnboarded() bool { +// isClientOnboarded Check if the client has been onboarded by checking if there is a JWT token +func (c *client) isClientOnboarded() bool { return c.jwtStore.getToken() != "" } @@ -433,7 +430,7 @@ func createTLSClient(trustBundlePath string) (*http.Client, error) { } // createEmptyTokenFile creates an empty token file -func (p *jwtProvider) createEmptyTokenFile() error { +func (p *jwtStore) createEmptyTokenFile() error { // Create the file and close it immediately to create an empty file file, err := os.Create(p.tokenFilePath) if err != nil { @@ -445,7 +442,7 @@ func (p *jwtProvider) createEmptyTokenFile() error { } // createJWTTokenReqEditor creates a request editor function that adds the JWT token to the request's Authorization header -func createJWTTokenReqEditor(jp *jwtProvider) harvester.RequestEditorFn { +func createJWTTokenReqEditor(jp *jwtStore) harvester.RequestEditorFn { return func(ctx context.Context, req *http.Request) error { if req.URL.Path == onboardPath { return nil @@ -453,7 +450,7 @@ func createJWTTokenReqEditor(jp *jwtProvider) harvester.RequestEditorFn { token := jp.getToken() req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) - req.Header.Set("Content-Type", jsonContentType) + req.Header.Set("Content-Type", constants.JSONContentType) return nil } } @@ -510,13 +507,13 @@ func createEntityBundle(trustDomainName string, b *harvester.BundlesUpdatesItem) return ret, nil } -func newJWTProvider(dataDir, tokenFileName string, logger logrus.FieldLogger) (*jwtProvider, error) { +func newJwtStore(dataDir, tokenFileName string, logger logrus.FieldLogger) (*jwtStore, error) { if logger == nil { return nil, errors.New("logger is required") } tokenStoragePath := filepath.Join(dataDir, tokenFileName) - jp := &jwtProvider{ + jp := &jwtStore{ mu: sync.RWMutex{}, jwt: "", tokenFilePath: tokenStoragePath, @@ -538,7 +535,7 @@ func newJWTProvider(dataDir, tokenFileName string, logger logrus.FieldLogger) (* return jp, nil } -func (p *jwtProvider) setToken(jwt string) { +func (p *jwtStore) setToken(jwt string) { p.mu.Lock() // Sanitize token removing leading and trailing spaces and quotes @@ -556,14 +553,14 @@ func (p *jwtProvider) setToken(jwt string) { } } -func (p *jwtProvider) getToken() string { +func (p *jwtStore) getToken() string { p.mu.RLock() defer p.mu.RUnlock() return p.jwt } // loadToken loads the JWT token from the disk storage -func (p *jwtProvider) loadToken() error { +func (p *jwtStore) loadToken() error { p.mu.Lock() defer p.mu.Unlock() @@ -580,7 +577,7 @@ func (p *jwtProvider) loadToken() error { } // saveToken saves the JWT token to disk storage -func (p *jwtProvider) saveToken() error { +func (p *jwtStore) saveToken() error { p.mu.RLock() defer p.mu.RUnlock()