Skip to content

Commit

Permalink
Add support for namespace suffixes (#2291)
Browse files Browse the repository at this point in the history
* use data stream name provided in Beater config

* add the same fix for CNVM
kubasobon authored Jul 1, 2024
1 parent 7ec369d commit 20eb7f2
Showing 6 changed files with 28 additions and 22 deletions.
29 changes: 17 additions & 12 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -34,10 +34,11 @@ import (
)

const (
DefaultNamespace = "default"
VulnerabilityType = "vuln_mgmt"
AssetInventoryType = "asset_inventory"
ResultsDatastreamIndexPrefix = "logs-cloud_security_posture.findings"
DefaultNamespace = "default"
VulnerabilityType = "vuln_mgmt"
AssetInventoryType = "asset_inventory"
defaultFindingsIndexPrefix = "logs-cloud_security_posture.findings"
defaultVulnerabilityIndexPrefix = "logs-cloud_security_posture.vulnerabilities"
)

type Fetcher struct {
@@ -55,6 +56,7 @@ type Config struct {
BundlePath string `config:"bundle_path"`
PackagePolicyId string `config:"package_policy_id"`
PackagePolicyRevision int `config:"revision"`
Index string `config:"index"`
}

type CloudConfig struct {
@@ -117,6 +119,17 @@ const (
OrganizationAccount = "organization-account"
)

// Datastream returns the name of a Data Stream to publish Cloudbeat events to.
func (c *Config) Datastream() string {
if c.Index != "" {
return c.Index
}
if c.Type == VulnerabilityType {
return defaultVulnerabilityIndexPrefix + "-" + DefaultNamespace
}
return defaultFindingsIndexPrefix + "-" + DefaultNamespace
}

func New(cfg *config.C) (*Config, error) {
c, err := defaultConfig()
if err != nil {
@@ -181,14 +194,6 @@ func getBundlePath() (string, error) {
return filepath.Join(filepath.Dir(ex), "bundle.tar.gz"), nil
}

// Datastream function to generate the datastream value
func Datastream(namespace string, indexPrefix string) string {
if namespace == "" {
namespace = DefaultNamespace
}
return indexPrefix + "-" + namespace
}

func isSupportedBenchmark(benchmark string) bool {
for _, s := range SupportedCIS {
if benchmark == s {
2 changes: 1 addition & 1 deletion internal/flavors/benchmark/builder/builder.go
Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ func (b *Builder) buildBase(ctx context.Context, log *logp.Logger, cfg *config.C
return nil, err
}

transformer := transformer.NewTransformer(log, b.bdp, cdp, b.idp)
transformer := transformer.NewTransformer(log, cfg, b.bdp, cdp, b.idp)
return &basebenchmark{
log: log,
manager: manager,
6 changes: 2 additions & 4 deletions internal/transformer/events_creator.go
Original file line number Diff line number Diff line change
@@ -34,8 +34,6 @@ import (
"github.com/elastic/cloudbeat/internal/resources/fetching"
)

var resultsIndex = config.Datastream("", config.ResultsDatastreamIndexPrefix)

const (
ecsCategoryConfiguration = "configuration"
ecsKindState = "state"
@@ -61,10 +59,10 @@ type ECSEvent struct {
Type []string `json:"type"`
}

func NewTransformer(log *logp.Logger, bdp dataprovider.CommonDataProvider, cdp dataprovider.ElasticCommonDataProvider, idp dataprovider.IdProvider) *Transformer {
func NewTransformer(log *logp.Logger, cfg *config.Config, bdp dataprovider.CommonDataProvider, cdp dataprovider.ElasticCommonDataProvider, idp dataprovider.IdProvider) *Transformer {
return &Transformer{
log: log,
index: resultsIndex,
index: cfg.Datastream(),
idProvider: idp,
benchmarkDataProvider: bdp,
commonDataProvider: cdp,
3 changes: 2 additions & 1 deletion internal/transformer/events_creator_test.go
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/elastic/cloudbeat/internal/config"
"github.com/elastic/cloudbeat/internal/dataprovider"
"github.com/elastic/cloudbeat/internal/evaluator"
"github.com/elastic/cloudbeat/internal/resources/fetching"
@@ -170,7 +171,7 @@ func (s *EventsCreatorTestSuite) TestTransformer_ProcessAggregatedResources() {
bdp := tt.bdpp()
idp := tt.idpp()

transformer := NewTransformer(testhelper.NewLogger(s.T()), bdp, cdp, idp)
transformer := NewTransformer(testhelper.NewLogger(s.T()), &config.Config{}, bdp, cdp, idp)
generatedEvents, _ := transformer.CreateBeatEvents(ctx, tt.input)

for _, event := range generatedEvents {
8 changes: 5 additions & 3 deletions internal/vulnerability/events_creator.go
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
"golang.org/x/exp/maps"

"github.com/elastic/cloudbeat/internal/config"
"github.com/elastic/cloudbeat/internal/dataprovider"
"github.com/elastic/cloudbeat/internal/resources/fetching"
"github.com/elastic/cloudbeat/internal/resources/providers/awslib/ec2"
@@ -165,22 +166,23 @@ const (
vulScoreSystemClass = "CVSS"
vectorHeader = "CVSS:"
vulEcsCategory = "vulnerability"
vulIndex = "logs-cloud_security_posture.vulnerabilities-default"
)

type EventsCreator struct {
log *logp.Logger
cloudDataProvider dataprovider.CommonDataProvider
commonDataProvider Enricher
ch chan []beat.Event
index string
}

func NewEventsCreator(log *logp.Logger, bdp dataprovider.CommonDataProvider, cdp dataprovider.ElasticCommonDataProvider) EventsCreator {
func NewEventsCreator(log *logp.Logger, cfg *config.Config, bdp dataprovider.CommonDataProvider, cdp dataprovider.ElasticCommonDataProvider) EventsCreator {
return EventsCreator{
log: log,
commonDataProvider: dataprovider.NewEnricher(cdp),
cloudDataProvider: bdp,
ch: make(chan []beat.Event),
index: cfg.Datastream(),
}
}

@@ -282,7 +284,7 @@ func (e EventsCreator) generateEvent(reportResult trivyTypes.Result, vul trivyTy

event := beat.Event{
// TODO: Maybe configure or get from somewhere else?
Meta: mapstr.M{libevents.FieldMetaIndex: vulIndex},
Meta: mapstr.M{libevents.FieldMetaIndex: e.index},
Timestamp: timestamp,
Fields: mapstr.M{
// TODO: Replace sequence with more generic approach
2 changes: 1 addition & 1 deletion internal/vulnerability/worker.go
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ func NewVulnerabilityWorker(log *logp.Logger, c *config.Config, bdp dataprovider
return nil, fmt.Errorf("VulnerabilityWorker: could not get init NewVulnerabilityScanner: %w", err)
}

eventsCreator := NewEventsCreator(log, bdp, cdp)
eventsCreator := NewEventsCreator(log, c, bdp, cdp)
cleaner := NewVulnerabilityCleaner(log, provider)

return &VulnerabilityWorker{

0 comments on commit 20eb7f2

Please sign in to comment.