Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logs agent integ test #433

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
21aa051
update config files
varunch77 Nov 18, 2024
734f8e0
Add integ test for Logs agent
varunch77 Nov 18, 2024
5ed1a93
Have the agent sleep after starting it
varunch77 Nov 19, 2024
96522cd
update flush time to 90s
varunch77 Nov 19, 2024
b98c612
Update flush time to 120 seconds
varunch77 Nov 19, 2024
bdd40b9
Add delay to ensure tag deletion propagates
varunch77 Nov 19, 2024
43d2ca6
Change order of tests
varunch77 Nov 19, 2024
9f28f03
Change order of tests again
varunch77 Nov 19, 2024
e1bb9ce
Enable instance metadata tags
varunch77 Nov 19, 2024
fd59713
Clean up unused code
varunch77 Nov 19, 2024
6e00757
Differentiate sleep times
varunch77 Nov 19, 2024
bb229f7
Merge branch 'main' into add-logs-agent-integ-test
varunch77 Nov 19, 2024
84cc3d3
Address nits and make minor changes
varunch77 Nov 22, 2024
7a3b428
UGet region from EC2 metadata instead of hardcoding
varunch77 Nov 22, 2024
e37d84a
Revert EC2 metadata change
varunch77 Nov 22, 2024
79744f1
Revert EC2 metadata change
varunch77 Nov 22, 2024
7950725
Address misc comments
varunch77 Nov 22, 2024
37cb8a5
update code to search for log group directly
varunch77 Nov 22, 2024
006a057
Revert to search by prefix
varunch77 Nov 22, 2024
eb95406
address comments
varunch77 Dec 4, 2024
df7abce
remove unused packageS
varunch77 Dec 4, 2024
8dcc2d4
use clients from the awsservice package
varunch77 Dec 4, 2024
9f5722b
Merge remote-tracking branch 'origin/main' into add-logs-agent-integ-…
varunch77 Dec 4, 2024
6f1e4cf
Make ValidateEntity a common package
varunch77 Dec 4, 2024
5bd4a65
modify ValidateLogs to be a function in a common package
varunch77 Dec 4, 2024
d116feb
Merge remote-tracking branch 'origin/main' into add-logs-agent-integ-…
varunch77 Dec 4, 2024
6c272ee
Increase flush time
varunch77 Dec 5, 2024
58f0a88
run linter
varunch77 Dec 5, 2024
a555d2c
Merge branch 'main' into add-logs-agent-integ-test
varunch77 Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 318 additions & 2 deletions test/cloudwatchlogs/publish_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package cloudwatchlogs

import (
"context"
"errors"
"fmt"
"log"
"os"
Expand All @@ -15,7 +17,13 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2Types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/aws/amazon-cloudwatch-agent-test/environment"
Expand All @@ -29,9 +37,20 @@ const (
logLineId2 = "bar"
logFilePath = "/tmp/cwagent_log_test.log" // TODO: not sure how well this will work on Windows
sleepForFlush = 20 * time.Second // default flush interval is 5 seconds
sleepForExtendedFlush = 180 * time.Second // increase flush time for the two main tests
retryWaitTime = 30 * time.Second
configPathAutoRemoval = "resources/config_auto_removal.json"
standardLogGroupClass = "STANDARD"
infrequentAccessLogGroupClass = "INFREQUENT_ACCESS"
cwlPerfEndpoint = "https://logs.us-west-2.amazonaws.com"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on the fact that we are running the test in us-west-2...

I know we only run our tests in us-west-2 but can we detect the region on the host and then populate the endpoint accordingly so that in the future we can support other regions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may already be constrained by the region being hardcoded elsewhere: https://github.com/aws/amazon-cloudwatch-agent-test/blob/main/util/awsservice/constant.go#L58C1-L60C78. We would need to make edits there but I'm a little wary of what the blast radius of that would look like for our testing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to hardcode the endpoint here. The code I wrote originally had to use a custom endpoint because it was a beta endpoint but this is the public us-west-2 endpoint which the client will automatically resolve

pdxRegionalCode = "us-west-2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

region hardcoding is not needed since entity feature is GA


entityType = "@entity.KeyAttributes.Type"
entityName = "@entity.KeyAttributes.Name"
entityEnvironment = "@entity.KeyAttributes.Environment"
entityPlatform = "@entity.Attributes.PlatformType"
entityInstanceId = "@entity.Attributes.EC2.InstanceId"
queryString = "fields @message, @entity.KeyAttributes.Type, @entity.KeyAttributes.Name, @entity.KeyAttributes.Environment, @entity.Attributes.PlatformType, @entity.Attributes.EC2.InstanceId"
)

var (
Expand Down Expand Up @@ -70,6 +89,9 @@ var (
logGroupClass: types.LogGroupClassInfrequentAccess,
},
}
resourceNotFoundException *types.ResourceNotFoundException
cwlClient *cloudwatchlogs.Client
ec2Client *ec2.Client
)

type writeToCloudWatchTestInput struct {
Expand All @@ -86,8 +108,28 @@ type cloudWatchLogGroupClassTestInput struct {
logGroupClass types.LogGroupClass
}

type expectedEntity struct {
entityType string
name string
environment string
platformType string
instanceId string
}

func init() {
environment.RegisterEnvironmentMetaDataFlags()
awsCfg, err := config.LoadDefaultConfig(
context.Background(),
config.WithRegion(pdxRegionalCode),
)
if err != nil {
log.Fatalf("Failed to load default config: %v", err)
}

cwlClient = cloudwatchlogs.NewFromConfig(awsCfg, func(o *cloudwatchlogs.Options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to create a new client. we have globally available client that already uses us-west-2 as the region" https://github.com/aws/amazon-cloudwatch-agent-test/blob/main/util/awsservice/constant.go#L74

o.BaseEndpoint = aws.String(cwlPerfEndpoint)
})
ec2Client = ec2.NewFromConfig(awsCfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

}

// TestWriteLogsToCloudWatch writes N number of logs, and then validates that N logs
Expand Down Expand Up @@ -119,9 +161,9 @@ func TestWriteLogsToCloudWatch(t *testing.T) {

// ensure that there is enough time from the "start" time and the first log line,
// so we don't miss it in the GetLogEvents call
time.Sleep(sleepForFlush)
time.Sleep(sleepForExtendedFlush)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just increase sleepForFlush? Do we use sleepForFlush anywhere else in the code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we use sleepForFlush in other places too. For the other test cases, waiting for 180 seconds instead of 20 seconds would be overkill and sometimes results in a timeout as it can cause all the test cases in the file take over an hour to run. This is because we call sleepForFlush before and after the agent runs and some of the test cases are set to loop multiple times so this extra 160 seconds adds up quickly. We only want the tests to wait for 180 seconds in two places, which is why I just created another variable.

writeLogLines(t, f, param.iterations)
time.Sleep(sleepForFlush)
time.Sleep(sleepForExtendedFlush)
common.StopAgent()
end := time.Now()

Expand All @@ -139,6 +181,129 @@ func TestWriteLogsToCloudWatch(t *testing.T) {
}
}

// TestWriteLogsWithEntityInfo writes logs and validates that the
// log events are associated with entities from CloudWatch Logs
func TestWriteLogsWithEntityInfo(t *testing.T) {
instanceId := awsservice.GetInstanceId()
log.Printf("Found instance id %s", instanceId)

// Define tags to create for EC2 test case
tagsToCreate := []ec2Types.Tag{
{
Key: aws.String("service"),
Value: aws.String("service-test"),
},
}

testCases := map[string]struct {
agentConfigPath string
iterations int
useEC2Tag bool
expectedEntity expectedEntity
}{
"IAMRole": {
agentConfigPath: filepath.Join("resources", "config_log.json"),
iterations: 1000,
expectedEntity: expectedEntity{
entityType: "Service",
name: "cwa-e2e-iam-role", //should match the name of the IAM role used in our testing
environment: "ec2:default",
platformType: "AWS::EC2",
instanceId: instanceId,
},
},
"ServiceInConfig": {
agentConfigPath: filepath.Join("resources", "config_log_service_name.json"),
iterations: 1000,
expectedEntity: expectedEntity{
entityType: "Service",
name: "service-in-config", //should match the service.name value in the config file
environment: "environment-in-config", //should match the deployment.environment value in the config file
platformType: "AWS::EC2",
instanceId: instanceId,
},
},
"EC2Tags": {
agentConfigPath: filepath.Join("resources", "config_log.json"),
iterations: 1000,
useEC2Tag: true,
expectedEntity: expectedEntity{
entityType: "Service",
name: "service-test", //should match the value in tagsToCreate
environment: "ec2:default",
platformType: "AWS::EC2",
instanceId: instanceId,
},
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
t.Cleanup(func() {
// delete the log group/stream after each test case
awsservice.DeleteLogGroupAndStream(instanceId, instanceId)

// delete EC2 tags added to the instance for the test
if testCase.useEC2Tag {
input := &ec2.DeleteTagsInput{
Resources: []string{instanceId},
Tags: tagsToCreate,
}
_, err := ec2Client.DeleteTags(context.TODO(), input)
assert.NoError(t, err)
// Add a short delay to ensure tag deletion propagates
time.Sleep(5 * time.Second)
}
})
if testCase.useEC2Tag {
// enable instance metadata tags
modifyInput := &ec2.ModifyInstanceMetadataOptionsInput{
InstanceId: aws.String(instanceId),
InstanceMetadataTags: ec2Types.InstanceMetadataTagsStateEnabled,
}
_, modifyErr := ec2Client.ModifyInstanceMetadataOptions(context.TODO(), modifyInput)
assert.NoError(t, modifyErr)

input := &ec2.CreateTagsInput{
Resources: []string{instanceId},
Tags: tagsToCreate,
}
_, createErr := ec2Client.CreateTags(context.TODO(), input)
assert.NoError(t, createErr)
}
id := uuid.New()
f, err := os.Create(logFilePath + "-" + id.String())
if err != nil {
t.Fatalf("Error occurred creating log file for writing: %v", err)
}

// Defer file closing and removal with error handling
defer func() {
if err := f.Close(); err != nil {
t.Errorf("Error occurred closing log file: %v", err)
}
if err := os.Remove(logFilePath + "-" + id.String()); err != nil {
t.Errorf("Error occurred removing log file: %v", err)
}
}()

common.DeleteFile(common.AgentLogFile)
common.TouchFile(common.AgentLogFile)

common.CopyFile(testCase.agentConfigPath, configOutputPath)

common.StartAgent(configOutputPath, true, false)
time.Sleep(sleepForExtendedFlush)
writeLogLines(t, f, testCase.iterations)
time.Sleep(sleepForExtendedFlush)
common.StopAgent()
end := time.Now()
begin := end.Add(-sleepForExtendedFlush * 4)

ValidateEntity(t, instanceId, instanceId, &begin, &end, testCase.expectedEntity)
})
}
}

// TestAutoRemovalStopAgent configures agent to monitor a file with auto removal on.
// Then it restarts the agent.
// Verify the file is NOT removed.
Expand Down Expand Up @@ -332,3 +497,154 @@ func checkData(t *testing.T, start time.Time, lineCount int) {
)
assert.NoError(t, err)
}

func ValidateEntity(t *testing.T, logGroup, logStream string, begin, end *time.Time, expectedEntity expectedEntity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonblocking but recommended: This function has a lot of overlaps with this function: https://github.com/aws/amazon-cloudwatch-agent-test/blob/main/test/entity/entity_test.go#L241

Maybe good idea to move ValidateEntity to a common package and just use for any entity validation tests in the future.

log.Printf("Validating entity for log group: %s, stream: %s", logGroup, logStream)

logGroupInfo, err := getLogGroup(logGroup)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just use IsLogGroupExists from our common package: https://github.com/aws/amazon-cloudwatch-agent-test/blob/main/util/awsservice/cloudwatchlogs.go#L137

The helper functions that I wrote was because we had to create a new client in us-east-1 for beta purpose but they are no longer needed

for _, lg := range logGroupInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no way to get a specific log group when you provide the name?

Seems expensive to get all log groups in an account. We have a ton in our test account..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a way to just search for one log group (why not is beyond me..) but I updated this so it passes in the log group as a prefix to search by (commit 006a057)

if *lg.LogGroupName == logGroup {
log.Println("Log group " + *lg.LogGroupName + " exists")
break
}
}
assert.NoError(t, err)

log.Printf("Query start time is " + begin.String() + " and end time is " + end.String())
queryId, err := getLogQueryId(logGroup, begin, end)
assert.NoError(t, err)
log.Printf("queryId is " + *queryId)
result, err := getQueryResult(queryId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getLogQueryId and getQueryResult are not needed anymore since we can just use default client now. There's a helper function here: https://github.com/aws/amazon-cloudwatch-agent-test/blob/main/util/awsservice/cloudwatchlogs.go#L202

assert.NoError(t, err)
if !assert.NotZero(t, len(result)) {
return
}
requiredEntityFields := map[string]bool{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand why we need this...when we pass in the query:

fields @message, @entity.KeyAttributes.Type, @entity.KeyAttributes.Name, @entity.KeyAttributes.Environment, @entity.Attributes.PlatformType, @entity.Attributes.EC2.InstanceId

Doesn't the fact that there are results mean that the logs all have the required fields?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 510 isn't checking the content of the logs but instead checking to see that our query actually returned logs at all. If our query came back empty handed, we want to fail immediately instead of attempting to validate it and check that its fields match our expectations.

entityType: false,
entityName: false,
entityEnvironment: false,
entityPlatform: false,
entityInstanceId: false,
}
for _, field := range result[0] {
switch aws.ToString(field.Field) {
case entityType:
requiredEntityFields[entityType] = true
assert.Equal(t, expectedEntity.entityType, aws.ToString(field.Value))
case entityName:
requiredEntityFields[entityName] = true
assert.Equal(t, expectedEntity.name, aws.ToString(field.Value))
case entityEnvironment:
requiredEntityFields[entityEnvironment] = true
assert.Equal(t, expectedEntity.environment, aws.ToString(field.Value))
case entityPlatform:
requiredEntityFields[entityPlatform] = true
assert.Equal(t, expectedEntity.platformType, aws.ToString(field.Value))
case entityInstanceId:
requiredEntityFields[entityInstanceId] = true
assert.Equal(t, expectedEntity.instanceId, aws.ToString(field.Value))
}
fmt.Printf("%s: %s\n", aws.ToString(field.Field), aws.ToString(field.Value))
}
allEntityFieldsFound := true
for field, value := range requiredEntityFields {
if !value {
log.Printf("Missing required entity field: %s", field)
allEntityFieldsFound = false
}
}
assert.True(t, allEntityFieldsFound)
}

func getLogQueryId(logGroup string, since, until *time.Time) (*string, error) {
var queryId *string
params := &cloudwatchlogs.StartQueryInput{
QueryString: aws.String(queryString),
LogGroupName: aws.String(logGroup),
}
if since != nil {
params.StartTime = aws.Int64(since.UnixMilli())
}
if until != nil {
params.EndTime = aws.Int64(until.UnixMilli())
}
attempts := 0

for {
output, err := cwlClient.StartQuery(context.Background(), params)
attempts += 1

if err != nil {
if errors.As(err, &resourceNotFoundException) && attempts <= awsservice.StandardRetries {
// The log group/stream hasn't been created yet, so wait and retry
time.Sleep(retryWaitTime)
continue
}

// if the error is not a ResourceNotFoundException, we should fail here.
return queryId, err
}
queryId = output.QueryId
return queryId, err
}
}

func getQueryResult(queryId *string) ([][]types.ResultField, error) {
attempts := 0
var results [][]types.ResultField
params := &cloudwatchlogs.GetQueryResultsInput{
QueryId: aws.String(*queryId),
}
for {
if attempts > awsservice.StandardRetries {
return results, errors.New("exceeded retry count")
}
result, err := cwlClient.GetQueryResults(context.Background(), params)
log.Printf("GetQueryResult status is: %v", result.Status)
attempts += 1
if result.Status != types.QueryStatusComplete {
log.Printf("GetQueryResult: sleeping for 5 seconds until status is complete")
time.Sleep(5 * time.Second)
continue
}
log.Printf("GetQueryResult: result length is %d", len(result.Results))
if err != nil {
if errors.As(err, &resourceNotFoundException) {
// The log group/stream hasn't been created yet, so wait and retry
time.Sleep(retryWaitTime)
continue
}

// if the error is not a ResourceNotFoundException, we should fail here.
return results, err
}
results = result.Results
return results, err
}
}

func getLogGroup(logGroupName string) ([]types.LogGroup, error) {
attempts := 0
var logGroups []types.LogGroup
params := &cloudwatchlogs.DescribeLogGroupsInput{
LogGroupNamePrefix: aws.String(logGroupName),
}
for {
output, err := cwlClient.DescribeLogGroups(context.Background(), params)

attempts += 1

if err != nil {
if errors.As(err, &resourceNotFoundException) && attempts <= awsservice.StandardRetries {
// The log group/stream hasn't been created yet, so wait and retry
time.Sleep(retryWaitTime)
continue
}

// if the error is not a ResourceNotFoundException, we should fail here.
return logGroups, err
}
logGroups = output.LogGroups
return logGroups, err
}
}
2 changes: 1 addition & 1 deletion test/cloudwatchlogs/resources/config_log.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"files": {
"collect_list": [
{
"file_path": "/tmp/cwagent_log_test.log",
"file_path": "/tmp/cwagent_log_test.log*",
"log_group_name": "{instance_id}",
"log_stream_name": "{instance_id}",
"timezone": "UTC"
Expand Down
Loading
Loading