Skip to content

Commit

Permalink
Fix to not send entity when flag is disabled (fluent#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathalapooja authored Oct 16, 2024
1 parent 8e6d48c commit ca0749b
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 65 deletions.
42 changes: 23 additions & 19 deletions plugins/filter_kubernetes/kube_meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,9 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
k = api_map.via.map.ptr[i].key;
if (k.via.str.size == 8 && !strncmp(k.via.str.ptr, "metadata", 8)) {
meta_val = api_map.via.map.ptr[i].val;
search_workload(meta,ctx,meta_val);
if(ctx ->use_pod_association) {
search_workload(meta,ctx,meta_val);
}
if (meta_val.type == MSGPACK_OBJECT_MAP) {
meta_found = FLB_TRUE;
}
Expand Down Expand Up @@ -1259,11 +1261,12 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
}
}
int fallback_environment_len = 0;
char *fallback_environment = find_fallback_environment(ctx,meta);
if(fallback_environment) {
fallback_environment_len = strlen(fallback_environment);
}
char *fallback_environment = NULL;
if(ctx->use_pod_association) {
fallback_environment = find_fallback_environment(ctx,meta);
if(fallback_environment) {
fallback_environment_len = strlen(fallback_environment);
}
pod_service_found = flb_hash_get(ctx->pod_hash_table,
meta->podname, meta->podname_len,
&tmp_service_attributes, &tmp_service_attr_size);
Expand All @@ -1286,12 +1289,6 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,

/* Append Regex fields */
msgpack_pack_map(&mp_pck, map_size);
if (meta->cluster != NULL) {
msgpack_pack_str(&mp_pck, 7);
msgpack_pack_str_body(&mp_pck, "cluster", 7);
msgpack_pack_str(&mp_pck, meta->cluster_len);
msgpack_pack_str_body(&mp_pck, meta->cluster, meta->cluster_len);
}
if (meta->podname != NULL) {
msgpack_pack_str(&mp_pck, 8);
msgpack_pack_str_body(&mp_pck, "pod_name", 8);
Expand Down Expand Up @@ -1343,13 +1340,18 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
msgpack_pack_str(&mp_pck, platform_len);
msgpack_pack_str_body(&mp_pck, ctx->platform, platform_len);
}
}

if (meta->workload != NULL) {
msgpack_pack_str(&mp_pck, 8);
msgpack_pack_str_body(&mp_pck, "workload", 8);
msgpack_pack_str(&mp_pck, meta->workload_len);
msgpack_pack_str_body(&mp_pck, meta->workload, meta->workload_len);
if (meta->cluster != NULL) {
msgpack_pack_str(&mp_pck, 7);
msgpack_pack_str_body(&mp_pck, "cluster", 7);
msgpack_pack_str(&mp_pck, meta->cluster_len);
msgpack_pack_str_body(&mp_pck, meta->cluster, meta->cluster_len);
}
if (meta->workload != NULL) {
msgpack_pack_str(&mp_pck, 8);
msgpack_pack_str_body(&mp_pck, "workload", 8);
msgpack_pack_str(&mp_pck, meta->workload_len);
msgpack_pack_str_body(&mp_pck, meta->workload, meta->workload_len);
}
}

/* Append API Server content */
Expand Down Expand Up @@ -1655,7 +1657,9 @@ static int get_and_merge_meta(struct flb_kube *ctx, struct flb_kube_meta *meta,
int ret;
char *api_buf;
size_t api_size;
get_cluster_from_environment(ctx, meta);
if(ctx->use_pod_association) {
get_cluster_from_environment(ctx, meta);
}
if (ctx->use_tag_for_meta) {
ret = merge_meta_from_tag(ctx, meta, out_buf, out_size);
return ret;
Expand Down
53 changes: 31 additions & 22 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
}
// If we are missing the service name, the entity will get rejected by the frontend anyway
// so do not emit entity unless service name is filled
if(stream->entity != NULL && stream->entity->key_attributes->name != NULL) {
if(ctx->add_entity && stream->entity != NULL && stream->entity->key_attributes->name != NULL) {
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\"entity\":{", 10)) {
goto error;
Expand Down Expand Up @@ -959,45 +959,54 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map
kube_key = val.via.map.ptr[j].key;
kube_val = val.via.map.ptr[j].val;
if(strncmp(kube_key.via.str.ptr, "service_name", kube_key.via.str.size) == 0) {
if(entity->key_attributes->name == NULL) {
entity->key_attributes->name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
if(entity->key_attributes->name != NULL) {
flb_free(entity->key_attributes->name);
}
entity->key_attributes->name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
} else if(strncmp(kube_key.via.str.ptr, "environment", kube_key.via.str.size) == 0) {
if(entity->key_attributes->environment == NULL) {
entity->key_attributes->environment = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
if(entity->key_attributes->environment != NULL) {
flb_free(entity->key_attributes->environment);
}
entity->key_attributes->environment = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
} else if(strncmp(kube_key.via.str.ptr, "namespace_name", kube_key.via.str.size) == 0) {
if(entity->attributes->namespace == NULL) {
entity->attributes->namespace = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
if(entity->attributes->namespace != NULL) {
flb_free(entity->attributes->namespace);
}
entity->attributes->namespace = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
} else if(strncmp(kube_key.via.str.ptr, "host", kube_key.via.str.size) == 0) {
if(entity->attributes->node == NULL) {
entity->attributes->node = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
if(entity->attributes->node != NULL) {
flb_free(entity->attributes->node);
}
entity->attributes->node = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
} else if(strncmp(kube_key.via.str.ptr, "cluster", kube_key.via.str.size) == 0) {
if(entity->attributes->cluster_name == NULL) {
entity->attributes->cluster_name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
if(entity->attributes->cluster_name != NULL) {
flb_free(entity->attributes->cluster_name);
}
entity->attributes->cluster_name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
} else if(strncmp(kube_key.via.str.ptr, "workload", kube_key.via.str.size) == 0) {
if(entity->attributes->workload == NULL) {
entity->attributes->workload = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
if(entity->attributes->workload != NULL) {
flb_free(entity->attributes->workload);
}
entity->attributes->workload = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
} else if(strncmp(kube_key.via.str.ptr, "name_source", kube_key.via.str.size) == 0) {
if(entity->attributes->name_source == NULL) {
entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
if(entity->attributes->name_source != NULL) {
flb_free(entity->attributes->name_source);
}
entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
} else if(strncmp(kube_key.via.str.ptr, "platform", kube_key.via.str.size) == 0) {
if(entity->attributes->platform_type == NULL) {
entity->attributes->platform_type = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
if(entity->attributes->platform_type != NULL) {
flb_free(entity->attributes->platform_type);
}
entity->attributes->platform_type = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
}
}
}
}
if(strncmp(key.via.str.ptr, "ec2_instance_id",key.via.str.size ) == 0 ) {
if(entity->attributes->instance_id == NULL) {
entity->attributes->instance_id = flb_strndup(val.via.str.ptr, val.via.str.size);
if(entity->attributes->instance_id != NULL) {
flb_free(entity->attributes->instance_id);
}
entity->attributes->instance_id = flb_strndup(val.via.str.ptr, val.via.str.size);
}
}
if(entity->key_attributes->name == NULL && entity->attributes->name_source == NULL &&entity->attributes->workload != NULL) {
Expand All @@ -1007,7 +1016,6 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map
}

void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stream, const msgpack_object map) {
if(ctx->kubernete_metadata_enabled) {
if(stream->entity == NULL) {
stream->entity = flb_malloc(sizeof(entity));
if (stream->entity == NULL) {
Expand All @@ -1034,7 +1042,6 @@ void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stre
if (!stream->entity) {
flb_plg_warn(ctx->ins, "Failed to generate entity");
}
}
}

/*
Expand Down Expand Up @@ -1111,7 +1118,9 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
flb_plg_debug(ctx->ins, "Couldn't determine log group & stream for record with tag %s", tag);
goto error;
}
update_or_create_entity(ctx,stream,map);
if(ctx->kubernete_metadata_enabled && ctx->add_entity) {
update_or_create_entity(ctx,stream,map);
}

if (ctx->log_key) {
key_str = NULL;
Expand Down
6 changes: 6 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,12 @@ static struct flb_config_map config_map[] = {
"is 'd1,d2;d3', we will consider it as [[d1, d2],[d3]]."
},

{
FLB_CONFIG_MAP_BOOL, "add_entity", "false",
0, FLB_TRUE, offsetof(struct flb_cloudwatch, add_entity),
"add entity to PutLogEvent calls"
},

/* EOF */
{0}
};
Expand Down
2 changes: 2 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ struct flb_cloudwatch {
*/

int kubernete_metadata_enabled;

int add_entity;
};

void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx);
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
[1554141513.598656,{"log":"Fluent Bit is logging\n","stream":"stdout","kubernetes":{"pod_name":"use-kubelet-enabled","namespace_name":"options","workload":"my-deployment","pod_id":"e9f2963f-55f2-11e9-84c5-02e422b8a84a","labels":{"app.kubernetes.io/name":"fluent-bit"},"annotations":{"prometheus.io/path":"/api/v1/metrics/prometheus","prometheus.io/port":"2020","prometheus.io/scrape":"true"},"host":"ip-10-49-18-80.eu-west-1.compute.internal","container_name":"fluent-bit","docker_id":"c9898099f6d235126d564ed38a020007ea7a6fac6e25e718de683c9dd0076c16","container_hash":"fluent/fluent-bit@sha256:7ac0fd3569af866e9a6a22eb592744200d2dbe098cf066162453f8d0b06c531f","container_image":"fluent/fluent-bit:latest"}}]
[1554141513.598656,{"log":"Fluent Bit is logging\n","stream":"stdout","kubernetes":{"pod_name":"use-kubelet-enabled","namespace_name":"options","pod_id":"e9f2963f-55f2-11e9-84c5-02e422b8a84a","labels":{"app.kubernetes.io/name":"fluent-bit"},"annotations":{"prometheus.io/path":"/api/v1/metrics/prometheus","prometheus.io/port":"2020","prometheus.io/scrape":"true"},"host":"ip-10-49-18-80.eu-west-1.compute.internal","container_name":"fluent-bit","docker_id":"c9898099f6d235126d564ed38a020007ea7a6fac6e25e718de683c9dd0076c16","container_hash":"fluent/fluent-bit@sha256:7ac0fd3569af866e9a6a22eb592744200d2dbe098cf066162453f8d0b06c531f","container_image":"fluent/fluent-bit:latest"}}]
Original file line number Diff line number Diff line change
@@ -1 +1 @@
[1554141513.598656,{"log":"Fluent Bit is logging\n","stream":"stdout","kubernetes":{"cluster":"test-cluster","pod_name":"use-pod-association-enabled-fallback-env","namespace_name":"options","service_name":"test-service","environment":"eks:test-cluster/options","name_source":"Instrumentation","platform":"eks","workload":"use-pod-association-enabled-fallback-env","pod_id":"e9f2963f-55f2-11e9-84c5-02e422b8a84a","labels":{"app.kubernetes.io/name":"fluent-bit"},"annotations":{"prometheus.io/path":"/api/v1/metrics/prometheus","prometheus.io/port":"2020","prometheus.io/scrape":"true"},"host":"ip-10-49-18-80.eu-west-1.compute.internal","container_name":"fluent-bit","docker_id":"c9898099f6d235126d564ed38a020007ea7a6fac6e25e718de683c9dd0076c16","container_hash":"fluent/fluent-bit@sha256:7ac0fd3569af866e9a6a22eb592744200d2dbe098cf066162453f8d0b06c531f","container_image":"fluent/fluent-bit:latest"}}]
[1554141513.598656,{"log":"Fluent Bit is logging\n","stream":"stdout","kubernetes":{"pod_name":"use-pod-association-enabled-fallback-env","namespace_name":"options","service_name":"test-service","environment":"eks:test-cluster/options","name_source":"Instrumentation","platform":"eks","cluster":"test-cluster","workload":"use-pod-association-enabled-fallback-env","pod_id":"e9f2963f-55f2-11e9-84c5-02e422b8a84a","labels":{"app.kubernetes.io/name":"fluent-bit"},"annotations":{"prometheus.io/path":"/api/v1/metrics/prometheus","prometheus.io/port":"2020","prometheus.io/scrape":"true"},"host":"ip-10-49-18-80.eu-west-1.compute.internal","container_name":"fluent-bit","docker_id":"c9898099f6d235126d564ed38a020007ea7a6fac6e25e718de683c9dd0076c16","container_hash":"fluent/fluent-bit@sha256:7ac0fd3569af866e9a6a22eb592744200d2dbe098cf066162453f8d0b06c531f","container_image":"fluent/fluent-bit:latest"}}]
56 changes: 34 additions & 22 deletions tests/runtime/filter_kubernetes.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,24 +448,48 @@ static void flb_test_options_use_kubelet_enabled_json()
flb_test_options_use_kubelet_enabled("options_use-kubelet-enabled_fluent-bit", NULL, 1);
}

#define flb_test_pod_to_service_map_use_kubelet_true(target, suffix, nExpected, platform) \
kube_test("options/" target, KUBE_POD_ASSOCIATION, suffix, nExpected, \
"use_pod_association", "true", \
"use_kubelet", "true", \
"kubelet_port", "8002", \
"Pod_Service_Preload_Cache_Dir", DPATH "/servicemap/" target, \
"pod_association_host_server_ca_file", "/tst/ca.crt", \
"pod_association_host_client_cert_file", "/tst/client.crt", \
"pod_association_host_client_key_file", "/tst/client.key", \
"set_platform", platform, \
NULL); \

#define flb_test_pod_to_service_map_use_kubelet_false(target, suffix, nExpected, platform) \
kube_test("options/" target, KUBE_POD_ASSOCIATION, suffix, nExpected, \
"use_pod_association", "true", \
"use_kubelet", "false", \
"kubelet_port", "8002", \
"Pod_Service_Preload_Cache_Dir", DPATH "/servicemap/" target, \
"pod_association_host_server_ca_file", "/tst/ca.crt", \
"pod_association_host_client_cert_file", "/tst/client.crt", \
"pod_association_host_client_key_file", "/tst/client.key", \
"set_platform", platform, \
NULL); \

static void flb_test_options_use_kubelet_enabled_replicaset_json()
{
flb_test_options_use_kubelet_enabled("options_use-kubelet-enabled-replicaset_fluent-bit", NULL, 1);
flb_test_pod_to_service_map_use_kubelet_true("options_use-kubelet-enabled-replicaset_fluent-bit", NULL, 1, NULL);
}

static void flb_test_options_use_kubelet_enabled_deployment_json()
{
flb_test_options_use_kubelet_enabled("options_use-kubelet-enabled-deployment_fluent-bit", NULL, 1);
flb_test_pod_to_service_map_use_kubelet_true("options_use-kubelet-enabled-deployment_fluent-bit", NULL, 1, NULL);
}

static void flb_test_options_use_kubelet_enabled_daemonset_json()
{
flb_test_options_use_kubelet_enabled("options_use-kubelet-enabled-daemonset_fluent-bit", NULL, 1);
flb_test_pod_to_service_map_use_kubelet_true("options_use-kubelet-enabled-daemonset_fluent-bit", NULL, 1, NULL);
}

static void flb_test_options_use_kubelet_enabled_pod_json()
{
flb_test_options_use_kubelet_enabled("options_use-kubelet-enabled-pod_fluent-bit", NULL, 1);
flb_test_pod_to_service_map_use_kubelet_true("options_use-kubelet-enabled-pod_fluent-bit", NULL, 1, NULL);
}

static void flb_test_options_use_kubelet_disabled_json()
Expand All @@ -475,22 +499,22 @@ static void flb_test_options_use_kubelet_disabled_json()

static void flb_test_options_use_kubelet_disabled_replicaset_json()
{
flb_test_options_use_kubelet_disabled("options_use-kubelet-disabled-replicaset_fluent-bit", NULL, 1);
flb_test_pod_to_service_map_use_kubelet_false("options_use-kubelet-disabled-replicaset_fluent-bit", NULL, 1, NULL);
}

static void flb_test_options_use_kubelet_disabled_deployment_json()
{
flb_test_options_use_kubelet_disabled("options_use-kubelet-disabled-deployment_fluent-bit", NULL, 1);
flb_test_pod_to_service_map_use_kubelet_false("options_use-kubelet-disabled-deployment_fluent-bit", NULL, 1, NULL);
}

static void flb_test_options_use_kubelet_disabled_daemonset_json()
{
flb_test_options_use_kubelet_disabled("options_use-kubelet-disabled-daemonset_fluent-bit", NULL, 1);
flb_test_pod_to_service_map_use_kubelet_false("options_use-kubelet-disabled-daemonset_fluent-bit", NULL, 1, NULL);
}

static void flb_test_options_use_kubelet_disabled_pod_json()
{
flb_test_options_use_kubelet_disabled("options_use-kubelet-disabled-pod_fluent-bit", NULL, 1);
flb_test_pod_to_service_map_use_kubelet_false("options_use-kubelet-disabled-pod_fluent-bit", NULL, 1, NULL);
}


Expand Down Expand Up @@ -974,27 +998,15 @@ static void flb_test_annotations_exclude_multiple_4_container_4_stderr()
flb_test_annotations_exclude("annotations-exclude_multiple-4_container-4", "stderr", 1);
}

#define flb_test_pod_to_service_map(target, suffix, nExpected, platform) \
kube_test("options/" target, KUBE_POD_ASSOCIATION, suffix, nExpected, \
"use_pod_association", "true", \
"use_kubelet", "true", \
"kubelet_port", "8002", \
"Pod_Service_Preload_Cache_Dir", DPATH "/servicemap/" target, \
"pod_association_host_server_ca_file", "/tst/ca.crt", \
"pod_association_host_client_cert_file", "/tst/client.crt", \
"pod_association_host_client_key_file", "/tst/client.key", \
"set_platform", platform, \
NULL); \

static void kube_options_use_pod_association_enabled()
{
flb_test_pod_to_service_map("options_use-pod-association-enabled_fluent-bit", NULL, 1, NULL);
flb_test_pod_to_service_map_use_kubelet_true("options_use-pod-association-enabled_fluent-bit", NULL, 1, NULL);
}

static void kube_options_use_pod_association_enabled_fallback_env()
{
setenv("CLUSTER_NAME","test-cluster", 1);
flb_test_pod_to_service_map("options_use-pod-association-enabled-fallback-env_fluent-bit", NULL, 1, "eks");
flb_test_pod_to_service_map_use_kubelet_true("options_use-pod-association-enabled-fallback-env_fluent-bit", NULL, 1, "eks");
}

#ifdef FLB_HAVE_SYSTEMD
Expand Down

0 comments on commit ca0749b

Please sign in to comment.