Skip to content

Commit

Permalink
OM-184 - support pod-name/ server-ip (#116)
Browse files Browse the repository at this point in the history
added support to use server-ip or pod-name/host-name according to the environment mode running
in kubernetes return host-name/pod-name as service label-value
in vm / docker return ipaddress as service label-value
  • Loading branch information
mphanias authored Apr 8, 2024
1 parent c692acd commit 990dcfc
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 35 deletions.
26 changes: 26 additions & 0 deletions internal/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type Config struct {
OtelPushInterval uint8 `toml:"push_interval"`
OtelServerStatFetchInterval uint8 `toml:"server_stat_fetch_interval"`
} `toml:"OpenTelemetry"`

IsKubernetes bool
KubernetesPodName string
} `toml:"Agent"`

Aerospike struct {
Expand Down Expand Up @@ -200,6 +203,27 @@ func (c *Config) FetchCloudInfo(md toml.MetaData) {
}
}

func (c *Config) FetchKubernetesInfo(md toml.MetaData) {
// use kubectl to fetch required Kubernetes context and find the required Kubenetes environment variables
envKubeServiceHost := os.Getenv("KUBERNETES_SERVICE_HOST")

Cfg.Agent.IsKubernetes = false

if envKubeServiceHost != "" && len(strings.TrimSpace(envKubeServiceHost)) > 0 {
Cfg.Agent.IsKubernetes = true
log.Info("Exporter is running in Kubernetes")

// get host-name
var err error
Cfg.Agent.KubernetesPodName, err = os.Hostname()
if err != nil {
log.Errorln(err)
return
}

}
}

// Initialize exporter configuration
func InitConfig(configFile string) {
// to print everything out regarding reading the config in app init
Expand All @@ -225,6 +249,8 @@ func InitConfig(configFile string) {

Cfg.ValidateAndUpdate(md)
Cfg.FetchCloudInfo(md)

Cfg.FetchKubernetesInfo(md)
}

// Set log file path
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/executors/otel_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func sendNodeUp(meter metric.Meter, ctx context.Context, commonLabels []attribut
metric.WithDescription("Aerospike node active status"),
)

if config.Cfg.Agent.IsKubernetes {
statprocessors.Service = config.Cfg.Agent.KubernetesPodName
}

labels := []attribute.KeyValue{
attribute.String("cluster_name", statprocessors.ClusterName),
attribute.String("service", statprocessors.Service),
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/executors/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (o *PrometheusImpl) Collect(ch chan<- prometheus.Metric) {
return
}

// if kubernetes then send host-name/pod-name else send server-ip as-isnh
if config.Cfg.Agent.IsKubernetes {
statprocessors.Service = config.Cfg.Agent.KubernetesPodName
}
ch <- prometheus.MustNewConstMetric(nodeActiveDesc, prometheus.GaugeValue, 1.0, statprocessors.ClusterName, statprocessors.Service, statprocessors.Build)

for _, wm := range refreshed_metrics {
Expand Down
6 changes: 2 additions & 4 deletions internal/pkg/statprocessors/sp_host_systeminfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ func getNetworkInfo() []AerospikeStat {
arrReceiveStats, arrTransferStats := dataprovider.GetSystemProvider().GetNetDevStats()

// netdev receive
clusterName := ClusterName
service := Service
for _, stats := range arrReceiveStats {
deviceName := stats["device_name"]
statName := "receive_bytes_total"
Expand All @@ -132,7 +130,7 @@ func getNetworkInfo() []AerospikeStat {
continue
}

labelValues := []string{clusterName, service, deviceName}
labelValues := []string{ClusterName, Service, deviceName}

allowed := isMetricAllowed(commons.CTX_SYSINFO_NETWORK_STATS, statName)
sysMetric := NewAerospikeStat(commons.CTX_SYSINFO_NETWORK_STATS, statName, allowed)
Expand All @@ -154,7 +152,7 @@ func getNetworkInfo() []AerospikeStat {
continue
}

labelValues := []string{clusterName, service, deviceName}
labelValues := []string{ClusterName, Service, deviceName}
allowed := isMetricAllowed(commons.CTX_SYSINFO_NETWORK_STATS, statName)
sysMetric := NewAerospikeStat(commons.CTX_SYSINFO_NETWORK_STATS, statName, allowed)
sysMetric.Labels = networkLabels
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/statprocessors/sp_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"strings"

commons "github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/commons"
"github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/config"
config "github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/config"

log "github.com/sirupsen/logrus"
)
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/statprocessors/sp_namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (nw *NamespaceStatsProcessor) refreshIndexPressure(singleInfoKey string, in
nsName := values[0]

labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_NS}
labelValues := []string{rawMetrics[Infokey_ClusterName], rawMetrics[Infokey_Service], nsName}
labelValues := []string{ClusterName, Service, nsName}

// Server index-pressure output: test:0:0;bar_device:0:0;materials:0:0
// ignore first element - namespace
Expand Down Expand Up @@ -182,7 +182,7 @@ func (nw *NamespaceStatsProcessor) refreshNamespaceStats(singleInfoKey string, i
// default: aerospike_namespace_<stat-name>
constructedStatname = stat
labels = []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_NS}
labelValues = []string{rawMetrics[Infokey_ClusterName], rawMetrics[Infokey_Service], nsName}
labelValues = []string{ClusterName, Service, nsName}

if isArrayType {
constructedStatname, labels, labelValues = nw.handleArrayStats(nsName, stat, pv, stats, deviceType, rawMetrics)
Expand Down Expand Up @@ -287,7 +287,7 @@ func (nw *NamespaceStatsProcessor) handleArrayStats(nsName string, statToProcess
compositeStatName := deviceType + "_" + statType + "_" + statName
deviceOrFileName := allNamespaceStats[deviceType+"."+statType+"["+statIndex+"]"]
labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_NS, statType + "_index", statType}
labelValues := []string{rawMetrics[Infokey_ClusterName], rawMetrics[Infokey_Service], nsName, statIndex, deviceOrFileName}
labelValues := []string{ClusterName, Service, nsName, statIndex, deviceOrFileName}

return compositeStatName, labels, labelValues

Expand Down
11 changes: 4 additions & 7 deletions internal/pkg/statprocessors/sp_node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,14 @@ func (sw *NodeStatsProcessor) Refresh(infoKeys []string, rawMetrics map[string]s
log.Tracef("node-configs:%s", nodeConfigs)
log.Tracef("node-stats:%s", nodeStats)

clusterName := rawMetrics[Infokey_ClusterName]
service := rawMetrics[Infokey_Service]

// we are sending configs and stats in same refresh call, as both are being sent to prom, instead of doing prom-push in 2 functions
// handle configs
var allMetricsToSend = []AerospikeStat{}

lCfgMetricsToSend := sw.handleRefresh(nodeConfigs, clusterName, service)
lCfgMetricsToSend := sw.handleRefresh(nodeConfigs)

// handle stats
lStatMetricsToSend := sw.handleRefresh(nodeStats, clusterName, service)
lStatMetricsToSend := sw.handleRefresh(nodeStats)

// merge both array into single
allMetricsToSend = append(allMetricsToSend, lCfgMetricsToSend...)
Expand All @@ -61,7 +58,7 @@ func (sw *NodeStatsProcessor) Refresh(infoKeys []string, rawMetrics map[string]s
return allMetricsToSend, nil
}

func (sw *NodeStatsProcessor) handleRefresh(nodeRawMetrics string, clusterName string, service string) []AerospikeStat {
func (sw *NodeStatsProcessor) handleRefresh(nodeRawMetrics string) []AerospikeStat {

stats := commons.ParseStats(nodeRawMetrics, ";")

Expand All @@ -81,7 +78,7 @@ func (sw *NodeStatsProcessor) handleRefresh(nodeRawMetrics string, clusterName s
}

labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE}
labelValues := []string{clusterName, service}
labelValues := []string{ClusterName, Service}

// pushToPrometheus(asMetric, pv, labels, labelsValues)
asMetric.updateValues(pv, labels, labelValues)
Expand Down
5 changes: 1 addition & 4 deletions internal/pkg/statprocessors/sp_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ func (sw *SetsStatsProcessor) Refresh(infoKeys []string, rawMetrics map[string]s
var allMetricsToSend = []AerospikeStat{}

for i := range setStats {
clusterName := rawMetrics[Infokey_ClusterName]
service := rawMetrics[Infokey_Service]

stats := commons.ParseStats(setStats[i], ":")
for stat, value := range stats {
pv, err := commons.TryConvert(value)
Expand All @@ -59,7 +56,7 @@ func (sw *SetsStatsProcessor) Refresh(infoKeys []string, rawMetrics map[string]s
}

labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_NS, commons.METRIC_LABEL_SET}
labelValues := []string{clusterName, service, stats["ns"], stats["set"]}
labelValues := []string{ClusterName, Service, stats["ns"], stats["set"]}

// pushToPrometheus(asMetric, pv, labels, labelsValues, ch)
asMetric.updateValues(pv, labels, labelValues)
Expand Down
5 changes: 1 addition & 4 deletions internal/pkg/statprocessors/sp_sindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ func (siw *SindexStatsProcessor) Refresh(infoKeys []string, rawMetrics map[strin
sindexName := sindexInfoKeySplit[1]
log.Tracef("sindex-stats:%s:%s:%s", nsName, sindexName, rawMetrics[sindex])

clusterName := rawMetrics[Infokey_ClusterName]
service := rawMetrics[Infokey_Service]

stats := commons.ParseStats(rawMetrics[sindex], ";")
for stat, value := range stats {
pv, err := commons.TryConvert(value)
Expand All @@ -90,7 +87,7 @@ func (siw *SindexStatsProcessor) Refresh(infoKeys []string, rawMetrics map[strin
}

labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_NS, commons.METRIC_LABEL_SINDEX}
labelValues := []string{clusterName, service, nsName, sindexName}
labelValues := []string{ClusterName, Service, nsName, sindexName}

asMetric.updateValues(pv, labels, labelValues)
allMetricsToSend = append(allMetricsToSend, asMetric)
Expand Down
10 changes: 5 additions & 5 deletions internal/pkg/statprocessors/sp_users.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,21 @@ func (uw *UserStatsProcessor) refreshUserStats(infoKeys []string, rawMetrics map
readInfoStats := []string{"read_quota", "read_single_record_tps", "read_scan_query_rps", "limitless_read_scan_query"}
writeInfoStats := []string{"write_quota", "write_single_record_tps", "write_scan_query_rps", "limitless_write_scan_query"}

asMetric, labels, labelValues := internalCreateLocalAerospikeStat(rawMetrics, "conns_in_use", user.User)
asMetric, labels, labelValues := internalCreateLocalAerospikeStat("conns_in_use", user.User)
asMetric.updateValues(float64(user.ConnsInUse), labels, labelValues)
allMetricsToSend = append(allMetricsToSend, asMetric)

if len(user.ReadInfo) >= 4 && len(user.WriteInfo) >= 4 {

for idxReadinfo := 0; idxReadinfo < len(user.ReadInfo); idxReadinfo++ {
riAeroMetric, riLabels, riLabelValues := internalCreateLocalAerospikeStat(rawMetrics, readInfoStats[idxReadinfo], user.User)
riAeroMetric, riLabels, riLabelValues := internalCreateLocalAerospikeStat(readInfoStats[idxReadinfo], user.User)
riAeroMetric.updateValues(float64(user.ReadInfo[idxReadinfo]), riLabels, riLabelValues)

allMetricsToSend = append(allMetricsToSend, riAeroMetric)

}
for idxWriteinfo := 0; idxWriteinfo < len(user.WriteInfo); idxWriteinfo++ {
wiAeroMetric, wiLabels, wiLabelValues := internalCreateLocalAerospikeStat(rawMetrics, writeInfoStats[idxWriteinfo], user.User)
wiAeroMetric, wiLabels, wiLabelValues := internalCreateLocalAerospikeStat(writeInfoStats[idxWriteinfo], user.User)
wiAeroMetric.updateValues(float64(user.WriteInfo[idxWriteinfo]), wiLabels, wiLabelValues)
allMetricsToSend = append(allMetricsToSend, wiAeroMetric)

Expand All @@ -147,9 +147,9 @@ func (uw *UserStatsProcessor) refreshUserStats(infoKeys []string, rawMetrics map
return allMetricsToSend, nil
}

func internalCreateLocalAerospikeStat(rawMetrics map[string]string, pStatName string, username string) (AerospikeStat, []string, []string) {
func internalCreateLocalAerospikeStat(pStatName string, username string) (AerospikeStat, []string, []string) {
labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_USER}
labelValues := []string{rawMetrics[Infokey_ClusterName], rawMetrics[Infokey_Service], username}
labelValues := []string{ClusterName, Service, username}
allowed := isMetricAllowed(commons.CTX_USERS, pStatName)
asMetric := NewAerospikeStat(commons.CTX_USERS, pStatName, allowed)

Expand Down
11 changes: 4 additions & 7 deletions internal/pkg/statprocessors/sp_xdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,14 @@ func (xw *XdrStatsProcessor) Refresh(infoKeys []string, rawMetrics map[string]st
xw.xdrMetrics = make(map[string]AerospikeStat)
}

clusterName := rawMetrics[Infokey_ClusterName]
service := rawMetrics[Infokey_Service]

var allMetricsToSend = []AerospikeStat{}

for _, key := range infoKeys {

xdrRawMetrics := rawMetrics[key]
// find and construct metric name
dcName, ns, metricPrefix := xw.constructMetricNamePrefix(key)
tmpXdrMetricsToSend := xw.handleRefresh(key, xdrRawMetrics, clusterName, service, dcName, ns, metricPrefix)
tmpXdrMetricsToSend := xw.handleRefresh(key, xdrRawMetrics, dcName, ns, metricPrefix)

allMetricsToSend = append(allMetricsToSend, tmpXdrMetricsToSend...)
}
Expand Down Expand Up @@ -108,7 +105,7 @@ func (xw *XdrStatsProcessor) constructMetricNamePrefix(infoKeyToProcess string)
}

func (xw *XdrStatsProcessor) handleRefresh(infoKeyToProcess string, xdrRawMetrics string,
clusterName string, service string, dcName string, ns string, metricPrefix string) []AerospikeStat {
dcName string, ns string, metricPrefix string) []AerospikeStat {
log.Tracef("xdr-%s:%s", infoKeyToProcess, xdrRawMetrics)

stats := commons.ParseStats(xdrRawMetrics, ";")
Expand All @@ -129,12 +126,12 @@ func (xw *XdrStatsProcessor) handleRefresh(infoKeyToProcess string, xdrRawMetric
}

labels := []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_DC_NAME}
labelValues := []string{clusterName, service, dcName}
labelValues := []string{ClusterName, Service, dcName}

// if namespace exists, add it to the label and label-values array
if len(ns) > 0 {
labels = []string{commons.METRIC_LABEL_CLUSTER_NAME, commons.METRIC_LABEL_SERVICE, commons.METRIC_LABEL_DC_NAME, commons.METRIC_LABEL_NS}
labelValues = []string{clusterName, service, dcName, ns}
labelValues = []string{ClusterName, Service, dcName, ns}
}

// pushToPrometheus(asMetric, pv, labels, labelsValues, ch)
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/statprocessors/statsrefresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package statprocessors

import (
commons "github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/commons"
"github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/config"
"github.com/aerospike/aerospike-prometheus-exporter/internal/pkg/dataprovider"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -68,6 +69,9 @@ func Refresh() ([]AerospikeStat, error) {

// set global values
ClusterName, Service, Build = rawMetrics[Infokey_ClusterName], rawMetrics[Infokey_Service], rawMetrics[Infokey_Build]
if config.Cfg.Agent.IsKubernetes {
Service = config.Cfg.Agent.KubernetesPodName
}

// sanitize the utf8 strings before sending them to watchers
for k, v := range rawMetrics {
Expand Down

0 comments on commit 990dcfc

Please sign in to comment.