Skip to content

Commit

Permalink
Minor refactors (#183)
Browse files Browse the repository at this point in the history
Improving names, constants, and comments

Signed-off-by: Max Lambrecht <[email protected]>
  • Loading branch information
Max Lambrecht authored May 31, 2023
1 parent 98449d9 commit e3f19df
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 79 deletions.
2 changes: 2 additions & 0 deletions pkg/common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package constants

const (
TCPProtocol = "tcp"
HTTPSScheme = "https"
DefaultLogLevel = "INFO"
JSONContentType = "application/json"
Galadriel = "galadriel"
GaladrielServerName = "galadriel-server"
)
71 changes: 36 additions & 35 deletions pkg/harvester/bundlemanager/fedbundles.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import (
"google.golang.org/grpc/codes"
)

// FederatedBundlesSynchronizer is responsible for periodically syncing the federated bundles in SPIRE Server
// with the Federated bundles fetched from Galadriel Server.
// This process involves:
// 1. Fetching the federated bundles from Galadriel Server.
// 2. Verifying the integrity of the bundles.
// 3. Setting the new bundles to SPIRE Server.
// 4. Deleting the bundles from SPIRE Server for relationships that no longer exist.
// The deletion of bundles in SPIRE Server is done using the DISSOCIATE mode deletes the federated bundles
// dissociating the registration entries from the federated trust domain.
// FederatedBundlesSynchronizer is responsible for periodically synchronizing the federated bundles
// in the SPIRE Server with those fetched from the Galadriel Server. The synchronization process consists of the following steps:
// 1. Fetch the federated bundles from the Galadriel Server.
// 2. Verify the integrity of these bundles using the provided bundle verifiers.
// 3. Update the SPIRE Server with the new bundles.
// 4. If any relationships no longer exist, remove the corresponding bundles from the SPIRE Server.
//
// The removal of bundles is done in DISSOCIATE mode, which dissociates the registration entries
// from the non-existent federated trust domains. It also maintains a last-known state of federated
// bundles fetched from the Galadriel Server to optimize synchronizations.
type FederatedBundlesSynchronizer struct {
spireClient spireclient.Client
galadrielClient galadrielclient.Client
Expand All @@ -36,7 +37,7 @@ type FederatedBundlesSynchronizer struct {
logger logrus.FieldLogger

// last state of Federated Bundles fetched from Galadriel Server
lastFederatesBundlesDigests map[spiffeid.TrustDomain][]byte
lastFederatedBundleDigests map[spiffeid.TrustDomain][]byte
}

// FederatedBundlesSynchronizerConfig holds the configuration for FederatedBundlesSynchronizer.
Expand All @@ -58,8 +59,8 @@ func NewFederatedBundlesSynchronizer(config *FederatedBundlesSynchronizerConfig)
}
}

// Run starts the synchronization process.
func (s *FederatedBundlesSynchronizer) Run(ctx context.Context) error {
// StartSyncing starts the synchronization process.
func (s *FederatedBundlesSynchronizer) StartSyncing(ctx context.Context) error {
s.logger.Info("Federated Bundles Synchronizer started")

ticker := time.NewTicker(s.syncInterval)
Expand All @@ -68,7 +69,7 @@ func (s *FederatedBundlesSynchronizer) Run(ctx context.Context) error {
for {
select {
case <-ticker.C:
if err := s.syncFederatedBundles(ctx); err != nil {
if err := s.synchronizeFederatedBundles(ctx); err != nil {
s.logger.Errorf("Failed to sync federated bundles with Galadriel Server: %v", err)
}
case <-ctx.Done():
Expand All @@ -78,40 +79,40 @@ func (s *FederatedBundlesSynchronizer) Run(ctx context.Context) error {
}
}

func (s *FederatedBundlesSynchronizer) syncFederatedBundles(ctx context.Context) error {
s.logger.Debug("Syncing federated bundles with Galadriel Server")
func (s *FederatedBundlesSynchronizer) synchronizeFederatedBundles(ctx context.Context) error {
s.logger.Debug("Synchronize federated bundles with Galadriel Server")

spireCtx, spireCancel := context.WithTimeout(ctx, spireCallTimeout)
if spireCancel == nil {
spireCallCtx, spireCallCancel := context.WithTimeout(ctx, spireCallTimeout)
if spireCallCancel == nil {
return fmt.Errorf("failed to create context for SPIRE call")
}
defer spireCancel()
defer spireCallCancel()

fedBundlesInSPIRE, err := s.fetchSPIREFederatedBundles(spireCtx)
fedBundlesInSPIRE, err := s.fetchSPIREFederatedBundles(spireCallCtx)
if err != nil {
return fmt.Errorf("failed to fetch federated bundles from SPIRE Server: %w", err)
}

galadrielCtx, galadrielCancel := context.WithTimeout(ctx, galadrielCallTimeout)
if galadrielCancel == nil {
galadrielCallCtx, galadrielCallCancel := context.WithTimeout(ctx, galadrielCallTimeout)
if galadrielCallCancel == nil {
return fmt.Errorf("failed to create context for Galadriel call")
}
defer galadrielCancel()
defer galadrielCallCancel()

bundles, digests, err := s.galadrielClient.SyncBundles(galadrielCtx, fedBundlesInSPIRE)
bundles, digests, err := s.galadrielClient.SyncBundles(galadrielCallCtx, fedBundlesInSPIRE)
if err != nil {
return fmt.Errorf("failed to sync federated bundles with Galadriel Server: %w", err)
}

// if the federated bundles have not changed since last server poll, skip the sync
if equalMaps(s.lastFederatesBundlesDigests, digests) {
if areMapsEqual(s.lastFederatedBundleDigests, digests) {
s.logger.Debug("Federated bundles have not changed")
return nil
}

bundlesToSet := make([]*spiffebundle.Bundle, 0)
for _, b := range bundles {
if err := s.verifyBundle(b); err != nil {
if err := s.validateBundleIntegrity(b); err != nil {
s.logger.Errorf("Failed to verify bundle for trust domain %q: %v", b.TrustDomainName, err)
continue // skip the bundle
}
Expand All @@ -125,35 +126,35 @@ func (s *FederatedBundlesSynchronizer) syncFederatedBundles(ctx context.Context)
bundlesToSet = append(bundlesToSet, spireBundle)
}

setStatuses, err := s.spireClient.SetFederatedBundles(spireCtx, bundlesToSet)
setStatuses, err := s.spireClient.SetFederatedBundles(spireCallCtx, bundlesToSet)
if err != nil {
s.logger.Errorf("Failed to set federated bundles in SPIRE Server: %v", err)
} else {
s.logFederatedBundleSetStatuses(setStatuses)
}

bundlesToDelete := s.getTrustDomainsToDelete(fedBundlesInSPIRE, digests)
bundlesToDelete := s.findTrustDomainsToDelete(fedBundlesInSPIRE, digests)
if len(bundlesToDelete) == 0 {
// No updates to be made, update the last state and return
s.lastFederatesBundlesDigests = digests
s.lastFederatedBundleDigests = digests
return nil
}

deleteStatuses, err := s.spireClient.DeleteFederatedBundles(spireCtx, bundlesToDelete)
deleteStatuses, err := s.spireClient.DeleteFederatedBundles(spireCallCtx, bundlesToDelete)
if err != nil {
s.logger.Errorf("Failed to delete federated bundles in SPIRE Server: %v", err)
} else {
s.logFederatedBundleDeleteStatuses(deleteStatuses)
}

// update the last state of federated bundles
s.lastFederatesBundlesDigests = digests
s.lastFederatedBundleDigests = digests

return nil
}

// getTrustDomainsToDelete returns a slice of trust domains to delete based on the provided bundles and digests map.
func (s *FederatedBundlesSynchronizer) getTrustDomainsToDelete(bundles []*entity.Bundle, digests map[spiffeid.TrustDomain][]byte) []spiffeid.TrustDomain {
// findTrustDomainsToDelete returns a slice of trust domains to delete based on the provided bundles and digests map.
func (s *FederatedBundlesSynchronizer) findTrustDomainsToDelete(bundles []*entity.Bundle, digests map[spiffeid.TrustDomain][]byte) []spiffeid.TrustDomain {
trustDomainsToDelete := make([]spiffeid.TrustDomain, 0)
for _, b := range bundles {
if _, ok := digests[b.TrustDomainName]; !ok {
Expand All @@ -164,9 +165,9 @@ func (s *FederatedBundlesSynchronizer) getTrustDomainsToDelete(bundles []*entity
return trustDomainsToDelete
}

// verifyBundle verifies the bundle using the given verifiers.
// validateBundleIntegrity verifies the bundle using the given verifiers.
// If one of the verifiers can verify the bundle, it returns nil.
func (s *FederatedBundlesSynchronizer) verifyBundle(bundle *entity.Bundle) error {
func (s *FederatedBundlesSynchronizer) validateBundleIntegrity(bundle *entity.Bundle) error {
var certChain []*x509.Certificate
if len(bundle.SigningCertificate) > 0 {
var err error
Expand Down Expand Up @@ -231,7 +232,7 @@ func (s *FederatedBundlesSynchronizer) logFederatedBundleDeleteStatuses(deleteSt
}
}

func equalMaps(map1, map2 map[spiffeid.TrustDomain][]byte) bool {
func areMapsEqual(map1, map2 map[spiffeid.TrustDomain][]byte) bool {
if len(map1) == 0 && len(map2) == 0 {
return true
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/harvester/bundlemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func NewBundleManager(c *Config) *BundleManager {
// Run runs the bundle synchronization processes.
func (bm *BundleManager) Run(ctx context.Context) error {
tasks := []func(ctx context.Context) error{
bm.federatedBundlesSynchronizer.Run,
bm.spireBundleSynchronizer.Run,
bm.federatedBundlesSynchronizer.StartSyncing,
bm.spireBundleSynchronizer.StartSyncing,
}

err := util.RunTasks(ctx, tasks...)
Expand Down
38 changes: 20 additions & 18 deletions pkg/harvester/bundlemanager/spirebundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/spiffe/go-spiffe/v2/bundle/spiffebundle"
)

// SpireBundleSynchronizer is responsible for periodically fetching the bundle from the SPIRE Server,
// signing it, and uploading it to the Galadriel Server.
// SpireBundleSynchronizer manages the synchronization of bundles from the SPIRE server.
// It periodically fetches the bundle from the SPIRE Server, signs it, and uploads it to the Galadriel Server.
type SpireBundleSynchronizer struct {
spireClient spireclient.Client
galadrielClient galadrielclient.Client
Expand Down Expand Up @@ -46,8 +46,9 @@ func NewSpireSynchronizer(config *SpireSynchronizerConfig) *SpireBundleSynchroni
}
}

// Run starts the SPIRE bundle Synchronizer process.
func (s *SpireBundleSynchronizer) Run(ctx context.Context) error {
// StartSyncing initializes the SPIRE bundle synchronization process.
// It starts an infinite loop that periodically fetches the SPIRE bundle, signs it and uploads it to the Galadriel Server.
func (s *SpireBundleSynchronizer) StartSyncing(ctx context.Context) error {
s.logger.Info("SPIRE Bundle Synchronizer started")

ticker := time.NewTicker(s.syncInterval)
Expand All @@ -56,7 +57,7 @@ func (s *SpireBundleSynchronizer) Run(ctx context.Context) error {
for {
select {
case <-ticker.C:
err := s.syncSPIREBundle(ctx)
err := s.syncSpireBundleToGaladriel(ctx)
if err != nil {
s.logger.Errorf("Failed to sync SPIRE bundle: %v", err)
}
Expand All @@ -67,18 +68,19 @@ func (s *SpireBundleSynchronizer) Run(ctx context.Context) error {
}
}

// syncSPIREBundle periodically checks the SPIRE Server for a new bundle, signs it, and uploads the signed bundle to the Galadriel Server.
func (s *SpireBundleSynchronizer) syncSPIREBundle(ctx context.Context) error {
// syncSPIREBundle fetches a new bundle from the SPIRE Server,
// signs it if it's new, and then uploads it to the Galadriel Server.
func (s *SpireBundleSynchronizer) syncSpireBundleToGaladriel(ctx context.Context) error {
s.logger.Debug("Checking SPIRE Server for a new bundle")

spireCtx, spireCancel := context.WithTimeout(ctx, spireCallTimeout)
if spireCancel == nil {
spireCallCtx, spireCallCancel := context.WithTimeout(ctx, spireCallTimeout)
if spireCallCancel == nil {
return fmt.Errorf("failed to create context for SPIRE call")
}
defer spireCancel()
defer spireCallCancel()

// Fetch SPIRE bundle
bundle, err := s.spireClient.GetBundle(spireCtx)
bundle, err := s.spireClient.GetBundle(spireCallCtx)
if err != nil {
return err
}
Expand All @@ -91,19 +93,19 @@ func (s *SpireBundleSynchronizer) syncSPIREBundle(ctx context.Context) error {
s.logger.Debug("New bundle from SPIRE Server")

// Generate the bundle to upload
b, err := s.generateBundleToUpload(bundle)
bundleToUpload, err := s.prepareBundleForUpload(bundle)
if err != nil {
return fmt.Errorf("failed to create bundle to upload: %w", err)
}

galadrielCtx, galadrielCancel := context.WithTimeout(ctx, galadrielCallTimeout)
if galadrielCancel == nil {
galadrielCallCtx, galadrielCallCancel := context.WithTimeout(ctx, galadrielCallTimeout)
if galadrielCallCancel == nil {
return fmt.Errorf("failed to create context for Galadriel Server call")
}
defer galadrielCancel()
defer galadrielCallCancel()

// Upload the bundle to Galadriel Server
err = s.galadrielClient.PostBundle(galadrielCtx, b)
err = s.galadrielClient.PostBundle(galadrielCallCtx, bundleToUpload)
if err != nil {
return fmt.Errorf("failed to upload bundle to Galadriel Server: %w", err)
}
Expand All @@ -113,9 +115,9 @@ func (s *SpireBundleSynchronizer) syncSPIREBundle(ctx context.Context) error {
return nil
}

// generateBundleToUpload creates an entity.Bundle using the provided SPIRE bundle.
// prepareBundleForUpload creates an entity.Bundle using the provided SPIRE bundle.
// It marshals the SPIRE bundle, generates the bundle signature, and calculates the digest.
func (s *SpireBundleSynchronizer) generateBundleToUpload(spireBundle *spiffebundle.Bundle) (*entity.Bundle, error) {
func (s *SpireBundleSynchronizer) prepareBundleForUpload(spireBundle *spiffebundle.Bundle) (*entity.Bundle, error) {
bundleBytes, err := spireBundle.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal SPIRE bundle: %w", err)
Expand Down
Loading

0 comments on commit e3f19df

Please sign in to comment.