From 92398111a014a5755533c7d2b2bb2b10630303aa Mon Sep 17 00:00:00 2001 From: hany-mhajna-payu-gpo Date: Thu, 28 Nov 2024 19:14:19 +0200 Subject: [PATCH 1/6] enable-custom-range --- collector/collector.go | 42 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/collector/collector.go b/collector/collector.go index 823f5c1..eecd0a6 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -49,6 +49,9 @@ type Config struct { // or as the constantly growing counter value SendTrafficDelta bool LogEntries bool + + // New field to accept custom CIDR ranges as strings + CustomPrivateCIDRs []string } type podsWatcher interface { @@ -84,6 +87,16 @@ func New( panic("cleanup interval not set") } + customPrivateCIDRs := make([]netaddr.IPPrefix, 0, len(cfg.CustomPrivateCIDRs)) + for _, cidr := range cfg.CustomPrivateCIDRs { + prefix, err := netaddr.ParseIPPrefix(cidr) + if err != nil { + log.Errorf("invalid CIDR: %s, error: %v", cidr, err) + continue + } + customPrivateCIDRs = append(customPrivateCIDRs, prefix) + } + return &Collector{ cfg: cfg, log: log, @@ -94,6 +107,7 @@ func New( podMetrics: map[uint64]*rawNetworkMetric{}, excludeNsMap: excludeNsMap, currentTimeGetter: currentTimeGetter, + customPrivateCIDRs: customPrivateCIDRs, exporterClient: &http.Client{Timeout: 10 * time.Second}, } } @@ -109,6 +123,7 @@ type Collector struct { excludeNsMap map[string]struct{} currentTimeGetter func() time.Time exporterClient *http.Client + customPrivateCIDRs []netaddr.IPPrefix mu sync.Mutex firstCollectDone bool @@ -385,11 +400,22 @@ func conntrackEntryKey(conn *conntrack.Entry) uint64 { return res } -func isPrivateNetwork(ip netaddr.IP) bool { - return ip.IsPrivate() || - ip.IsLoopback() || - ip.IsMulticast() || - ip.IsLinkLocalUnicast() || - ip.IsLinkLocalMulticast() || - ip.IsInterfaceLocalMulticast() -} +func (c *Collector) isPrivateNetwork(ip netaddr.IP) bool { + if ip.IsPrivate() || + ip.IsLoopback() || + ip.IsMulticast() || + ip.IsLinkLocalUnicast() || + ip.IsLinkLocalMulticast() || + ip.IsInterfaceLocalMulticast() { + return true + } + + // Check custom CIDR ranges + for _, cidr := range c.customPrivateCIDRs { + if cidr.Contains(ip) { + return true + } + } + + return false +} \ No newline at end of file From 2b11ee92796e463551854ffe36b4d4dd8561991e Mon Sep 17 00:00:00 2001 From: hany-mhajna-payu-gpo Date: Thu, 28 Nov 2024 19:25:22 +0200 Subject: [PATCH 2/6] Update collector.go --- collector/collector.go | 654 ++++++++++++++++++++--------------------- 1 file changed, 327 insertions(+), 327 deletions(-) diff --git a/collector/collector.go b/collector/collector.go index eecd0a6..6bc1559 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -1,93 +1,93 @@ package collector import ( - "compress/gzip" - "context" - "encoding/binary" - "fmt" - "hash/maphash" - "net/http" - "strings" - "sync" - "time" - - "github.com/samber/lo" - "github.com/sirupsen/logrus" - "google.golang.org/protobuf/proto" - "inet.af/netaddr" - corev1 "k8s.io/api/core/v1" - - "github.com/castai/egressd/conntrack" - "github.com/castai/egressd/dns" - "github.com/castai/egressd/metrics" - "github.com/castai/egressd/pb" + "compress/gzip" + "context" + "encoding/binary" + "fmt" + "hash/maphash" + "net/http" + "strings" + "sync" + "time" + + "github.com/samber/lo" + "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" + "inet.af/netaddr" + corev1 "k8s.io/api/core/v1" + + "github.com/castai/egressd/conntrack" + "github.com/castai/egressd/dns" + "github.com/castai/egressd/metrics" + "github.com/castai/egressd/pb" ) var ( - acceptEncoding = http.CanonicalHeaderKey("Accept-Encoding") - contentEncoding = http.CanonicalHeaderKey("Content-Encoding") + acceptEncoding = http.CanonicalHeaderKey("Accept-Encoding") + contentEncoding = http.CanonicalHeaderKey("Content-Encoding") ) func CurrentTimeGetter() func() time.Time { - return func() time.Time { - return time.Now() - } + return func() time.Time { + return time.Now() + } } type Config struct { - // ReadInterval used for conntrack records scrape. - ReadInterval time.Duration - // CleanupInterval used to remove expired conntrack and pod metrics records. - CleanupInterval time.Duration - // NodeName is current node name on which egressd is running. - NodeName string - // ExcludeNamespaces allows to exclude namespaces. Input is comma separated string. - ExcludeNamespaces string - // GroupPublicIPs will group all public destinations under single 0.0.0.0 IP. - GroupPublicIPs bool - // SendTrafficDelta used to determines if traffic should be sent as delta of 2 consecutive conntrack entries - // or as the constantly growing counter value - SendTrafficDelta bool - LogEntries bool - - // New field to accept custom CIDR ranges as strings - CustomPrivateCIDRs []string + // ReadInterval used for conntrack records scrape. + ReadInterval time.Duration + // CleanupInterval used to remove expired conntrack and pod metrics records. + CleanupInterval time.Duration + // NodeName is current node name on which egressd is running. + NodeName string + // ExcludeNamespaces allows to exclude namespaces. Input is comma separated string. + ExcludeNamespaces string + // GroupPublicIPs will group all public destinations under single 0.0.0.0 IP. + GroupPublicIPs bool + // SendTrafficDelta used to determines if traffic should be sent as delta of 2 consecutive conntrack entries + // or as the constantly growing counter value + SendTrafficDelta bool + LogEntries bool + // New field to accept custom CIDR ranges as strings + CustomPrivateCIDRs []string } type podsWatcher interface { - Get() ([]*corev1.Pod, error) + Get() ([]*corev1.Pod, error) } type rawNetworkMetric struct { - *pb.RawNetworkMetric - lifetime time.Time + *pb.RawNetworkMetric + lifetime time.Time } type dnsRecorder interface{ Records() []*pb.IP2Domain } func New( - cfg Config, - log logrus.FieldLogger, - podsWatcher podsWatcher, - conntracker conntrack.Client, - ip2dns dnsRecorder, - currentTimeGetter func() time.Time, + cfg Config, + log logrus.FieldLogger, + podsWatcher podsWatcher, + conntracker conntrack.Client, + ip2dns dnsRecorder, + currentTimeGetter func() time.Time, ) *Collector { - excludeNsMap := map[string]struct{}{} - if cfg.ExcludeNamespaces != "" { - nsList := strings.Split(cfg.ExcludeNamespaces, ",") - for _, ns := range nsList { - excludeNsMap[ns] = struct{}{} - } - } - if cfg.ReadInterval == 0 { - panic("read interval not set") - } - if cfg.CleanupInterval == 0 { - panic("cleanup interval not set") - } - - customPrivateCIDRs := make([]netaddr.IPPrefix, 0, len(cfg.CustomPrivateCIDRs)) + excludeNsMap := map[string]struct{}{} + if cfg.ExcludeNamespaces != "" { + nsList := strings.Split(cfg.ExcludeNamespaces, ",") + for _, ns := range nsList { + excludeNsMap[ns] = struct{}{} + } + } + if cfg.ReadInterval == 0 { + panic("read interval not set") + } + if cfg.CleanupInterval == 0 { + panic("cleanup interval not set") + } + + // Parse custom CIDRs + customPrivateCIDRs := make([]netaddr.IPPrefix, 0, len(cfg.CustomPrivateCIDRs)) for _, cidr := range cfg.CustomPrivateCIDRs { prefix, err := netaddr.ParseIPPrefix(cidr) if err != nil { @@ -97,307 +97,307 @@ func New( customPrivateCIDRs = append(customPrivateCIDRs, prefix) } - return &Collector{ - cfg: cfg, - log: log, - podsWatcher: podsWatcher, - conntracker: conntracker, - ip2dns: ip2dns, - entriesCache: make(map[uint64]*conntrack.Entry), - podMetrics: map[uint64]*rawNetworkMetric{}, - excludeNsMap: excludeNsMap, - currentTimeGetter: currentTimeGetter, - customPrivateCIDRs: customPrivateCIDRs, - exporterClient: &http.Client{Timeout: 10 * time.Second}, - } + return &Collector{ + cfg: cfg, + log: log, + podsWatcher: podsWatcher, + conntracker: conntracker, + ip2dns: ip2dns, + entriesCache: make(map[uint64]*conntrack.Entry), + podMetrics: map[uint64]*rawNetworkMetric{}, + excludeNsMap: excludeNsMap, + currentTimeGetter: currentTimeGetter, + customPrivateCIDRs: customPrivateCIDRs, + exporterClient: &http.Client{Timeout: 10 * time.Second}, + } } type Collector struct { - cfg Config - log logrus.FieldLogger - podsWatcher podsWatcher - conntracker conntrack.Client - ip2dns dnsRecorder - entriesCache map[uint64]*conntrack.Entry - podMetrics map[uint64]*rawNetworkMetric - excludeNsMap map[string]struct{} - currentTimeGetter func() time.Time - exporterClient *http.Client - customPrivateCIDRs []netaddr.IPPrefix - mu sync.Mutex - - firstCollectDone bool + cfg Config + log logrus.FieldLogger + podsWatcher podsWatcher + conntracker conntrack.Client + ip2dns dnsRecorder + entriesCache map[uint64]*conntrack.Entry + podMetrics map[uint64]*rawNetworkMetric + excludeNsMap map[string]struct{} + currentTimeGetter func() time.Time + exporterClient *http.Client + customPrivateCIDRs []netaddr.IPPrefix + mu sync.Mutex + + firstCollectDone bool } func (c *Collector) Start(ctx context.Context) error { - readTicker := time.NewTicker(c.cfg.ReadInterval) - cleanupTicker := time.NewTicker(c.cfg.CleanupInterval) - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-readTicker.C: - if err := c.collect(); err != nil { - c.log.Errorf("collecting: %v", err) - } - case <-cleanupTicker.C: - c.cleanup() - } - } + readTicker := time.NewTicker(c.cfg.ReadInterval) + cleanupTicker := time.NewTicker(c.cfg.CleanupInterval) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-readTicker.C: + if err := c.collect(); err != nil { + c.log.Errorf("collecting: %v", err) + } + case <-cleanupTicker.C: + c.cleanup() + } + } } func (c *Collector) GetRawNetworkMetricsHandler(w http.ResponseWriter, req *http.Request) { - c.mu.Lock() - defer c.mu.Unlock() - - items := make([]*pb.RawNetworkMetric, 0, len(c.podMetrics)) - for _, m := range c.podMetrics { - items = append(items, m.RawNetworkMetric) - } - - batch := &pb.RawNetworkMetricBatch{Items: items, Ip2Domain: c.ip2dns.Records()} - batchBytes, err := proto.Marshal(batch) - if err != nil { - c.log.Errorf("marshal batch: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - enc := req.Header.Get(acceptEncoding) - if strings.Contains(strings.ToLower(enc), "gzip") { - if err := c.writeGzipBody(w, batchBytes); err != nil { - c.log.Errorf("write batch %v", err) - return - } - } else { - if err := c.writePlainBody(w, batchBytes); err != nil { - c.log.Errorf("write batch %v", err) - return - } - } - - if c.cfg.SendTrafficDelta { - // reset metric tx/rx values, so only delta numbers will be sent with the next batch - for _, m := range c.podMetrics { - m.RawNetworkMetric.TxBytes = 0 - m.RawNetworkMetric.RxBytes = 0 - m.RawNetworkMetric.TxPackets = 0 - m.RawNetworkMetric.RxPackets = 0 - - newLifetime := time.Now().Add(2 * time.Minute) - // reset lifetime only if current lifetime is longer than 2 minutes from now - if m.lifetime.After(newLifetime) { - m.lifetime = newLifetime - } - } - } + c.mu.Lock() + defer c.mu.Unlock() + + items := make([]*pb.RawNetworkMetric, 0, len(c.podMetrics)) + for _, m := range c.podMetrics { + items = append(items, m.RawNetworkMetric) + } + + batch := &pb.RawNetworkMetricBatch{Items: items, Ip2Domain: c.ip2dns.Records()} + batchBytes, err := proto.Marshal(batch) + if err != nil { + c.log.Errorf("marshal batch: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + enc := req.Header.Get(acceptEncoding) + if strings.Contains(strings.ToLower(enc), "gzip") { + if err := c.writeGzipBody(w, batchBytes); err != nil { + c.log.Errorf("write batch %v", err) + return + } + } else { + if err := c.writePlainBody(w, batchBytes); err != nil { + c.log.Errorf("write batch %v", err) + return + } + } + + if c.cfg.SendTrafficDelta { + // reset metric tx/rx values, so only delta numbers will be sent with the next batch + for _, m := range c.podMetrics { + m.RawNetworkMetric.TxBytes = 0 + m.RawNetworkMetric.RxBytes = 0 + m.RawNetworkMetric.TxPackets = 0 + m.RawNetworkMetric.RxPackets = 0 + + newLifetime := time.Now().Add(2 * time.Minute) + // reset lifetime only if current lifetime is longer than 2 minutes from now + if m.lifetime.After(newLifetime) { + m.lifetime = newLifetime + } + } + } } func (c *Collector) writeGzipBody(w http.ResponseWriter, body []byte) error { - writer, err := gzip.NewWriterLevel(w, gzip.BestCompression) - if err != nil { - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return fmt.Errorf("cannot create gzip writer: %w", err) - } - defer writer.Close() + writer, err := gzip.NewWriterLevel(w, gzip.BestCompression) + if err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return fmt.Errorf("cannot create gzip writer: %w", err) + } + defer writer.Close() - w.Header().Add(contentEncoding, "gzip") + w.Header().Add(contentEncoding, "gzip") - if _, err := writer.Write(body); err != nil { - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return err - } + if _, err := writer.Write(body); err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return err + } - return nil + return nil } func (c *Collector) writePlainBody(w http.ResponseWriter, body []byte) error { - if _, err := w.Write(body); err != nil { - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return err - } - return nil + if _, err := w.Write(body); err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return err + } + return nil } -// collect aggregates conntract records into reduced pod metrics. +// collect aggregates conntrack records into reduced pod metrics. func (c *Collector) collect() error { - start := time.Now() - pods, err := c.getNodePods() - if err != nil { - return fmt.Errorf("getting node pods: %w", err) - } - conns, err := c.conntracker.ListEntries(conntrack.FilterBySrcIP(getPodIPs(pods))) - if err != nil { - return fmt.Errorf("listing conntrack entries: %w", err) - } - metrics.SetConntrackActiveEntriesCount(float64(len(conns))) - - c.mu.Lock() - defer c.mu.Unlock() - - for _, conn := range conns { - connKey := conntrackEntryKey(conn) - txBytes := conn.TxBytes - txPackets := conn.TxPackets - rxBytes := conn.RxBytes - rxPackets := conn.RxPackets - - if cachedConn, found := c.entriesCache[connKey]; found { - // NOTE: REP-243: there is known issue that current tx/rx bytes could be lower than previously scrapped values, - // so treat it as 0 delta to avoid random values for uint64 - txBytes = lo.Ternary(txBytes < cachedConn.TxBytes, 0, txBytes-cachedConn.TxBytes) - rxBytes = lo.Ternary(rxBytes < cachedConn.RxBytes, 0, rxBytes-cachedConn.RxBytes) - txPackets = lo.Ternary(txPackets < cachedConn.TxPackets, 0, txPackets-cachedConn.TxPackets) - rxPackets = lo.Ternary(rxPackets < cachedConn.RxPackets, 0, rxPackets-cachedConn.RxPackets) - } - c.entriesCache[connKey] = conn - - if c.cfg.LogEntries && (rxBytes > 0 || txBytes > 0) { - c.log.WithFields(map[string]any{ - "src_ip": conn.Src.IP().String(), - "src_port": conn.Src.Port(), - "dst_ip": conn.Dst.IP().String(), - "dst_port": conn.Dst.Port(), - "tx_bytes": txBytes, - "rx_bytes": rxBytes, - "proto": conn.Proto, - }).Debug("ct") - } - - // In delta mode we need to have initial conntrack connections so next collect can calculate only new deltas. - if c.cfg.SendTrafficDelta && !c.firstCollectDone { - continue - } - - if c.cfg.GroupPublicIPs && !isPrivateNetwork(conn.Dst.IP()) { - conn.Dst = netaddr.IPPortFrom(netaddr.IPv4(0, 0, 0, 0), 0) - } - - groupKey := entryGroupKey(conn) - if pm, found := c.podMetrics[groupKey]; found { - pm.TxBytes += int64(txBytes) - pm.TxPackets += int64(txPackets) - pm.RxBytes += int64(rxBytes) - pm.RxPackets += int64(rxPackets) - if conn.Lifetime.After(pm.lifetime) { - pm.lifetime = conn.Lifetime - } - } else { - c.podMetrics[groupKey] = &rawNetworkMetric{ - RawNetworkMetric: &pb.RawNetworkMetric{ - SrcIp: dns.ToIPint32(conn.Src.IP()), - DstIp: dns.ToIPint32(conn.Dst.IP()), - TxBytes: int64(txBytes), - TxPackets: int64(txPackets), - RxBytes: int64(rxBytes), - RxPackets: int64(rxPackets), - Proto: int32(conn.Proto), - }, - lifetime: conn.Lifetime, - } - } - } - - if !c.firstCollectDone { - c.firstCollectDone = true - } - - c.log.Debugf("collection done in %s, pods=%d, conntrack=%d, conntrack_cache=%d", time.Since(start), len(pods), len(conns), len(c.entriesCache)) - return nil + start := time.Now() + pods, err := c.getNodePods() + if err != nil { + return fmt.Errorf("getting node pods: %w", err) + } + conns, err := c.conntracker.ListEntries(conntrack.FilterBySrcIP(getPodIPs(pods))) + if err != nil { + return fmt.Errorf("listing conntrack entries: %w", err) + } + metrics.SetConntrackActiveEntriesCount(float64(len(conns))) + + c.mu.Lock() + defer c.mu.Unlock() + + for _, conn := range conns { + connKey := conntrackEntryKey(conn) + txBytes := conn.TxBytes + txPackets := conn.TxPackets + rxBytes := conn.RxBytes + rxPackets := conn.RxPackets + + if cachedConn, found := c.entriesCache[connKey]; found { + // NOTE: REP-243: there is known issue that current tx/rx bytes could be lower than previously scrapped values, + // so treat it as 0 delta to avoid random values for uint64 + txBytes = lo.Ternary(txBytes < cachedConn.TxBytes, 0, txBytes-cachedConn.TxBytes) + rxBytes = lo.Ternary(rxBytes < cachedConn.RxBytes, 0, rxBytes-cachedConn.RxBytes) + txPackets = lo.Ternary(txPackets < cachedConn.TxPackets, 0, txPackets-cachedConn.TxPackets) + rxPackets = lo.Ternary(rxPackets < cachedConn.RxPackets, 0, rxPackets-cachedConn.RxPackets) + } + c.entriesCache[connKey] = conn + + if c.cfg.LogEntries && (rxBytes > 0 || txBytes > 0) { + c.log.WithFields(map[string]any{ + "src_ip": conn.Src.IP().String(), + "src_port": conn.Src.Port(), + "dst_ip": conn.Dst.IP().String(), + "dst_port": conn.Dst.Port(), + "tx_bytes": txBytes, + "rx_bytes": rxBytes, + "proto": conn.Proto, + }).Debug("ct") + } + + // In delta mode we need to have initial conntrack connections so next collect can calculate only new deltas. + if c.cfg.SendTrafficDelta && !c.firstCollectDone { + continue + } + + if c.cfg.GroupPublicIPs && !c.isPrivateNetwork(conn.Dst.IP()) { + conn.Dst = netaddr.IPPortFrom(netaddr.IPv4(0, 0, 0, 0), 0) + } + + groupKey := entryGroupKey(conn) + if pm, found := c.podMetrics[groupKey]; found { + pm.TxBytes += int64(txBytes) + pm.TxPackets += int64(txPackets) + pm.RxBytes += int64(rxBytes) + pm.RxPackets += int64(rxPackets) + if conn.Lifetime.After(pm.lifetime) { + pm.lifetime = conn.Lifetime + } + } else { + c.podMetrics[groupKey] = &rawNetworkMetric{ + RawNetworkMetric: &pb.RawNetworkMetric{ + SrcIp: dns.ToIPint32(conn.Src.IP()), + DstIp: dns.ToIPint32(conn.Dst.IP()), + TxBytes: int64(txBytes), + TxPackets: int64(txPackets), + RxBytes: int64(rxBytes), + RxPackets: int64(rxPackets), + Proto: int32(conn.Proto), + }, + lifetime: conn.Lifetime, + } + } + } + + if !c.firstCollectDone { + c.firstCollectDone = true + } + + c.log.Debugf("collection done in %s, pods=%d, conntrack=%d, conntrack_cache=%d", time.Since(start), len(pods), len(conns), len(c.entriesCache)) + return nil } func (c *Collector) cleanup() { - c.mu.Lock() - defer c.mu.Unlock() - - start := c.currentTimeGetter().UTC() - now := start - deletedEntriesCount := 0 - deletedPodMetricsCount := 0 - - for key, e := range c.entriesCache { - if now.After(e.Lifetime) { - delete(c.entriesCache, key) - deletedEntriesCount++ - } - } - - for key, m := range c.podMetrics { - if now.After(m.lifetime) { - delete(c.podMetrics, key) - deletedPodMetricsCount++ - } - } - - c.log.Infof("cleanup done in %s, deleted_conntrack=%d, deleted_pod_metrics=%d", time.Since(start), deletedEntriesCount, deletedPodMetricsCount) + c.mu.Lock() + defer c.mu.Unlock() + + start := c.currentTimeGetter().UTC() + now := start + deletedEntriesCount := 0 + deletedPodMetricsCount := 0 + + for key, e := range c.entriesCache { + if now.After(e.Lifetime) { + delete(c.entriesCache, key) + deletedEntriesCount++ + } + } + + for key, m := range c.podMetrics { + if now.After(m.lifetime) { + delete(c.podMetrics, key) + deletedPodMetricsCount++ + } + } + + c.log.Infof("cleanup done in %s, deleted_conntrack=%d, deleted_pod_metrics=%d", time.Since(start), deletedEntriesCount, deletedPodMetricsCount) } func (c *Collector) getNodePods() ([]*corev1.Pod, error) { - pods, err := c.podsWatcher.Get() - if err != nil { - return nil, err - } - filtered := pods[:0] - for _, pod := range pods { - podIP := pod.Status.PodIP - if podIP == "" { - continue - } - // Don't track host network pods since we don't have enough info in conntrack. - if pod.Spec.HostNetwork { - continue - } - if _, found := c.excludeNsMap[pod.Namespace]; found { - continue - } - filtered = append(filtered, pod) - } - return filtered, nil + pods, err := c.podsWatcher.Get() + if err != nil { + return nil, err + } + filtered := pods[:0] + for _, pod := range pods { + podIP := pod.Status.PodIP + if podIP == "" { + continue + } + // Don't track host network pods since we don't have enough info in conntrack. + if pod.Spec.HostNetwork { + continue + } + if _, found := c.excludeNsMap[pod.Namespace]; found { + continue + } + filtered = append(filtered, pod) + } + return filtered, nil } func getPodIPs(pods []*corev1.Pod) map[netaddr.IP]struct{} { - ips := make(map[netaddr.IP]struct{}, len(pods)) - for _, pod := range pods { - ips[netaddr.MustParseIP(pod.Status.PodIP)] = struct{}{} - } - return ips + ips := make(map[netaddr.IP]struct{}, len(pods)) + for _, pod := range pods { + ips[netaddr.MustParseIP(pod.Status.PodIP)] = struct{}{} + } + return ips } var entryGroupHash maphash.Hash // entryGroupKey groups by src, dst and port. func entryGroupKey(conn *conntrack.Entry) uint64 { - srcIP := conn.Src.IP().As4() - _, _ = entryGroupHash.Write(srcIP[:]) - dstIP := conn.Dst.IP().As4() - _, _ = entryGroupHash.Write(dstIP[:]) - _ = entryGroupHash.WriteByte(conn.Proto) - res := entryGroupHash.Sum64() - entryGroupHash.Reset() - return res + srcIP := conn.Src.IP().As4() + _, _ = entryGroupHash.Write(srcIP[:]) + dstIP := conn.Dst.IP().As4() + _, _ = entryGroupHash.Write(dstIP[:]) + _ = entryGroupHash.WriteByte(conn.Proto) + res := entryGroupHash.Sum64() + entryGroupHash.Reset() + return res } var conntrackEntryHash maphash.Hash func conntrackEntryKey(conn *conntrack.Entry) uint64 { - srcIP := conn.Src.IP().As4() - _, _ = conntrackEntryHash.Write(srcIP[:]) - var srcPort [2]byte - binary.LittleEndian.PutUint16(srcPort[:], conn.Src.Port()) - _, _ = conntrackEntryHash.Write(srcPort[:]) - - dstIP := conn.Dst.IP().As4() - _, _ = conntrackEntryHash.Write(dstIP[:]) - var dstPort [2]byte - binary.LittleEndian.PutUint16(dstPort[:], conn.Dst.Port()) - _, _ = conntrackEntryHash.Write(dstPort[:]) - - _ = conntrackEntryHash.WriteByte(conn.Proto) - res := conntrackEntryHash.Sum64() - - conntrackEntryHash.Reset() - return res + srcIP := conn.Src.IP().As4() + _, _ = conntrackEntryHash.Write(srcIP[:]) + var srcPort [2]byte + binary.LittleEndian.PutUint16(srcPort[:], conn.Src.Port()) + _, _ = conntrackEntryHash.Write(srcPort[:]) + + dstIP := conn.Dst.IP().As4() + _, _ = conntrackEntryHash.Write(dstIP[:]) + var dstPort [2]byte + binary.LittleEndian.PutUint16(dstPort[:], conn.Dst.Port()) + _, _ = conntrackEntryHash.Write(dstPort[:]) + + _ = conntrackEntryHash.WriteByte(conn.Proto) + res := conntrackEntryHash.Sum64() + + conntrackEntryHash.Reset() + return res } func (c *Collector) isPrivateNetwork(ip netaddr.IP) bool { @@ -418,4 +418,4 @@ func (c *Collector) isPrivateNetwork(ip netaddr.IP) bool { } return false -} \ No newline at end of file +} From 2ecd8572ae270c3d224b597175ac916ce70b9ce0 Mon Sep 17 00:00:00 2001 From: hany-mhajna-payu-gpo Date: Thu, 28 Nov 2024 19:36:35 +0200 Subject: [PATCH 3/6] Update build.yaml --- .github/workflows/build.yaml | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 340b1b1..8b01630 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -7,6 +7,7 @@ on: pull_request: branches: - main + workflow_dispatch: # Allows manual triggering of the workflow jobs: build: @@ -17,9 +18,9 @@ jobs: - name: Checkout uses: actions/checkout@v2 with: - ref: ${{ github.event.pull_request.head.sha }} + ref: ${{ github.event.pull_request.head.sha || github.sha }} - - name: Get merge request latest commit + - name: Get commit message id: parse-commit if: ${{ github.event_name == 'pull_request' }} run: | @@ -27,7 +28,7 @@ jobs: echo "head_commit_message=${msg}" >> $GITHUB_ENV - name: Setup Go 1.21 - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: go-version: "1.21" @@ -67,10 +68,8 @@ jobs: CGO_ENABLED: 0 - name: Run golangci-lint - # You may pin to the exact commit or the version. - # uses: golangci/golangci-lint-action@537aa1903e5d359d0b27dbc19ddd22c5087f3fbc if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-lint') }} - uses: golangci/golangci-lint-action@v3.2.0 + uses: golangci/golangci-lint-action@v3 with: args: --timeout=5m skip-pkg-cache: true @@ -88,13 +87,13 @@ jobs: uses: docker/setup-buildx-action@v2 - name: Login to GitHub Container Registry - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: registry: ghcr.io username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Build and push pr (egressd collector) + - name: Build and push PR (egressd collector) if: ${{ github.event_name == 'pull_request' }} uses: docker/build-push-action@v3 with: @@ -104,7 +103,7 @@ jobs: push: ${{ github.event_name == 'pull_request' }} tags: ghcr.io/castai/egressd/egressd:${{ github.sha }} - - name: Build and push pr (egressd exporter) + - name: Build and push PR (egressd exporter) if: ${{ github.event_name == 'pull_request' }} uses: docker/build-push-action@v3 with: @@ -121,7 +120,7 @@ jobs: context: . platforms: linux/arm64,linux/amd64 file: ./Dockerfile - push: ${{ github.event_name != 'pull_request' }} + push: true tags: ghcr.io/castai/egressd/egressd:${{ github.sha }} - name: Build and push main (egressd exporter) @@ -131,25 +130,25 @@ jobs: context: . platforms: linux/arm64,linux/amd64 file: ./Dockerfile.exporter - push: ${{ github.event_name != 'pull_request' }} + push: true tags: ghcr.io/castai/egressd/egressd-exporter:${{ github.sha }} e2e: name: E2E runs-on: ubuntu-22.04 - if: ${{ github.event_name == 'pull_request' }} + if: ${{ github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch' }} needs: build steps: - name: Checkout uses: actions/checkout@v2 - name: Setup Go 1.21 - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: go-version: "1.21" - name: Login to GitHub Container Registry - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: registry: ghcr.io username: ${{ github.actor }} From 0db39c2117dea4d5fa81367a34243e250837c33f Mon Sep 17 00:00:00 2001 From: hany-mhajna-payu-gpo Date: Thu, 28 Nov 2024 19:39:37 +0200 Subject: [PATCH 4/6] Update build.yaml --- .github/workflows/build.yaml | 46 +++++++++--------------------------- 1 file changed, 11 insertions(+), 35 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 8b01630..aeea70c 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -1,12 +1,8 @@ name: Build on: - push: - branches: - - main - pull_request: - branches: - - main + push: # Triggers on push events to any branch + pull_request: # Triggers on pull request events targeting any branch workflow_dispatch: # Allows manual triggering of the workflow jobs: @@ -18,8 +14,8 @@ jobs: - name: Checkout uses: actions/checkout@v2 with: - ref: ${{ github.event.pull_request.head.sha || github.sha }} - + ref: ${{ github.event.pull_request.head.sha || github.ref }} + - name: Get commit message id: parse-commit if: ${{ github.event_name == 'pull_request' }} @@ -68,7 +64,7 @@ jobs: CGO_ENABLED: 0 - name: Run golangci-lint - if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-lint') }} + if: ${{ (github.event_name == 'pull_request' || github.event_name == 'push') && !contains(env.head_commit_message, '#skip-lint') }} uses: golangci/golangci-lint-action@v3 with: args: --timeout=5m @@ -77,7 +73,7 @@ jobs: version: v1.58.2 - name: Test - if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-test') }} + if: ${{ (github.event_name == 'pull_request' || github.event_name == 'push') && !contains(env.head_commit_message, '#skip-test') }} run: go test -race ./... - name: Set up QEMU @@ -93,28 +89,8 @@ jobs: username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Build and push PR (egressd collector) - if: ${{ github.event_name == 'pull_request' }} - uses: docker/build-push-action@v3 - with: - context: . - platforms: linux/arm64,linux/amd64 - file: ./Dockerfile - push: ${{ github.event_name == 'pull_request' }} - tags: ghcr.io/castai/egressd/egressd:${{ github.sha }} - - - name: Build and push PR (egressd exporter) - if: ${{ github.event_name == 'pull_request' }} - uses: docker/build-push-action@v3 - with: - context: . - platforms: linux/arm64,linux/amd64 - file: ./Dockerfile.exporter - push: ${{ github.event_name == 'pull_request' }} - tags: ghcr.io/castai/egressd/egressd-exporter:${{ github.sha }} - - - name: Build and push main (egressd collector) - if: ${{ github.event_name != 'pull_request' && github.event_name != 'release' }} + - name: Build and push (egressd collector) + if: ${{ github.event_name != 'release' }} uses: docker/build-push-action@v3 with: context: . @@ -123,8 +99,8 @@ jobs: push: true tags: ghcr.io/castai/egressd/egressd:${{ github.sha }} - - name: Build and push main (egressd exporter) - if: ${{ github.event_name != 'pull_request' && github.event_name != 'release' }} + - name: Build and push (egressd exporter) + if: ${{ github.event_name != 'release' }} uses: docker/build-push-action@v3 with: context: . @@ -136,7 +112,7 @@ jobs: e2e: name: E2E runs-on: ubuntu-22.04 - if: ${{ github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch' }} + if: ${{ github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch' || github.event_name == 'push' }} needs: build steps: - name: Checkout From c00a9498237b58b689440125def23fcae5fbda86 Mon Sep 17 00:00:00 2001 From: hany-mhajna-payu-gpo Date: Thu, 28 Nov 2024 19:47:55 +0200 Subject: [PATCH 5/6] Update build.yaml --- .github/workflows/build.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index aeea70c..7a7653a 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -97,7 +97,7 @@ jobs: platforms: linux/arm64,linux/amd64 file: ./Dockerfile push: true - tags: ghcr.io/castai/egressd/egressd:${{ github.sha }} + tags: ghcr.io/hany-mhajna-payu-gpo/egressd/egressd:${{ github.sha }} - name: Build and push (egressd exporter) if: ${{ github.event_name != 'release' }} @@ -107,7 +107,7 @@ jobs: platforms: linux/arm64,linux/amd64 file: ./Dockerfile.exporter push: true - tags: ghcr.io/castai/egressd/egressd-exporter:${{ github.sha }} + tags: ghcr.io/hany-mhajna-payu-gpo/egressd/egressd-exporter:${{ github.sha }} e2e: name: E2E From 5e851058776dd1d26594e83b60b206ecf480f9ea Mon Sep 17 00:00:00 2001 From: hany-mhajna-payu-gpo Date: Thu, 28 Nov 2024 20:05:37 +0200 Subject: [PATCH 6/6] Update release.yaml --- .github/workflows/release.yaml | 136 +++++++-------------------------- 1 file changed, 26 insertions(+), 110 deletions(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index b3e1a5f..7931e3c 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -1,9 +1,13 @@ name: Release on: + push: # Trigger on push events + branches: + - '**' # Specify branches or use '**' for all branches release: types: - published + workflow_dispatch: # Allows manual triggering of the workflow env: CR_CONFIGFILE: "${{ github.workspace }}/cr.yaml" @@ -21,7 +25,7 @@ jobs: uses: actions/checkout@v2 - name: Setup Go 1.21 - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: go-version: "1.21" @@ -32,37 +36,42 @@ jobs: key: ${{ runner.os }}-build-${{ hashFiles('**/go.sum') }} restore-keys: ${{ runner.os }}-build- - - name: Get release tag - run: echo "RELEASE_TAG=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV + - name: Get release tag or branch name + run: | + if [ "${{ github.event_name }}" == "release" ]; then + echo "RELEASE_TAG=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV + else + echo "RELEASE_TAG=${GITHUB_REF#refs/heads/}" >> $GITHUB_ENV + fi - name: Build go binary amd64 - run: go build -ldflags "-s -w -X main.GitCommit=$GITHUB_SHA -X main.GitRef=$GITHUB_REF -X main.Version=${RELEASE_TAG:-commit-$GITHUB_SHA}" -o bin/egressd-amd64 ./cmd/collector + run: go build -ldflags "-s -w -X main.GitCommit=${{ github.sha }} -X main.GitRef=${{ github.ref }} -X main.Version=${{ env.RELEASE_TAG }}" -o bin/egressd-amd64 ./cmd/collector env: GOOS: linux GOARCH: amd64 CGO_ENABLED: 0 - name: Build go binary arm64 - run: go build -ldflags "-s -w -X main.GitCommit=$GITHUB_SHA -X main.GitRef=$GITHUB_REF -X main.Version=${RELEASE_TAG:-commit-$GITHUB_SHA}" -o bin/egressd-arm64 ./cmd/collector + run: go build -ldflags "-s -w -X main.GitCommit=${{ github.sha }} -X main.GitRef=${{ github.ref }} -X main.Version=${{ env.RELEASE_TAG }}" -o bin/egressd-arm64 ./cmd/collector env: GOOS: linux GOARCH: arm64 CGO_ENABLED: 0 - name: Build egressd exporter go binary amd64 - run: go build -ldflags "-s -w -X main.GitCommit=$GITHUB_SHA -X main.GitRef=$GITHUB_REF -X main.Version=${RELEASE_TAG:-commit-$GITHUB_SHA}" -o bin/egressd-exporter-amd64 ./cmd/exporter + run: go build -ldflags "-s -w -X main.GitCommit=${{ github.sha }} -X main.GitRef=${{ github.ref }} -X main.Version=${{ env.RELEASE_TAG }}" -o bin/egressd-exporter-amd64 ./cmd/exporter env: GOOS: linux GOARCH: amd64 CGO_ENABLED: 0 - name: Build egressd exporter go binary arm64 - run: go build -ldflags "-s -w -X main.GitCommit=$GITHUB_SHA -X main.GitRef=$GITHUB_REF -X main.Version=${RELEASE_TAG:-commit-$GITHUB_SHA}" -o bin/egressd-exporter-arm64 ./cmd/exporter + run: go build -ldflags "-s -w -X main.GitCommit=${{ github.sha }} -X main.GitRef=${{ github.ref }} -X main.Version=${{ env.RELEASE_TAG }}" -o bin/egressd-exporter-arm64 ./cmd/exporter env: GOOS: linux GOARCH: arm64 CGO_ENABLED: 0 - + - name: Set up QEMU uses: docker/setup-qemu-action@v2 @@ -70,13 +79,13 @@ jobs: uses: docker/setup-buildx-action@v2 - name: Login to GitHub Container Registry - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: registry: ghcr.io username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Build and push release (egressd collector) + - name: Build and push (egressd collector) uses: docker/build-push-action@v3 with: context: . @@ -84,10 +93,9 @@ jobs: platforms: linux/arm64,linux/amd64 file: ./Dockerfile tags: | - ghcr.io/castai/egressd/egressd:${{ env.RELEASE_TAG }} - ghcr.io/castai/egressd/egressd:latest + ghcr.io/hany-mhajna-payu-gpo/egressd:${{ env.RELEASE_TAG }} - - name: Build and push release (egressd exporter) + - name: Build and push (egressd exporter) uses: docker/build-push-action@v3 with: context: . @@ -95,13 +103,13 @@ jobs: platforms: linux/arm64,linux/amd64 file: ./Dockerfile.exporter tags: | - ghcr.io/castai/egressd/egressd-exporter:${{ env.RELEASE_TAG }} - ghcr.io/castai/egressd/egressd-exporter:latest - + ghcr.io/hany-mhajna-payu-gpo/egressd-exporter:${{ env.RELEASE_TAG }} + release_chart: name: Release Helm Chart runs-on: ubuntu-22.04 needs: release_docker + if: ${{ github.event_name == 'release' }} steps: - name: Checkout uses: actions/checkout@v2 @@ -109,99 +117,7 @@ jobs: fetch-depth: '0' - name: Get release tag - run: echo "RELEASE_TAG=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV - - - name: Checkout helm-charts - # The cr tool only works if the target repository is already checked out - uses: actions/checkout@v2 - with: - fetch-depth: 0 - repository: castai/helm-charts - path: helm-charts - token: ${{ secrets.HELM_CHARTS_REPO_TOKEN }} + run: echo "RELEASE_TAG=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV - - name: Configure Git for helm-charts - run: | - cd helm-charts - git config user.name "$GITHUB_ACTOR" - git config user.email "$GITHUB_ACTOR@users.noreply.github.com" - - name: Install Helm - uses: azure/setup-helm@v1 - with: - version: v3.5.2 - - - name: Install CR tool - run: | - mkdir "${CR_TOOL_PATH}" - mkdir "${CR_PACKAGE_PATH}" - mkdir "${CR_INDEX_PATH}" - curl -sSLo cr.tar.gz "https://github.com/helm/chart-releaser/releases/download/v1.4.0/chart-releaser_1.4.0_linux_amd64.tar.gz" - tar -xzf cr.tar.gz -C "${CR_TOOL_PATH}" - rm -f cr.tar.gz - - - name: Bump chart version - run: | - python ./.github/workflows/bump_chart.py ${CHART_PATH}/Chart.yaml ${{env.RELEASE_TAG}} - - - name: Parse Chart.yaml - id: parse-chart - run: | - description=$(yq ".description" < ${CHART_PATH}/Chart.yaml) - name=$(yq ".name" < ${CHART_PATH}/Chart.yaml) - version=$(yq ".version" < ${CHART_PATH}/Chart.yaml) - echo "::set-output name=chartpath::${CHART_PATH}" - echo "::set-output name=desc::${description}" - if [[ -n "${HELM_TAG_PREFIX}" ]]; then - echo "::set-output name=tagname::${name}-${version}" - else - echo "::set-output name=tagname::${name}-${version}" - fi - echo "::set-output name=packagename::${name}-${version}" - - - name: Create helm package - run: | - "${CR_TOOL_PATH}/cr" package "${{ steps.parse-chart.outputs.chartpath }}" --config "${CR_CONFIGFILE}" --package-path "${CR_PACKAGE_PATH}" - echo "Result of chart package:" - ls -l "${CR_PACKAGE_PATH}" - git status - - - - name: Make helm charts github release - uses: softprops/action-gh-release@v1 - with: - body: | - ${{ steps.parse-chart.outputs.desc }} - Source commit: https://github.com/${{ github.repository }}/commit/${{ github.sha }} - files: | - ${{ env.CR_PACKAGE_PATH }}/${{ steps.parse-chart.outputs.packagename }}.tgz - ${{ env.CR_PACKAGE_PATH }}/${{ steps.parse-chart.outputs.packagename }}.tgz.prov - repository: castai/helm-charts - tag_name: ${{ steps.parse-chart.outputs.tagname }} - token: ${{ secrets.HELM_CHARTS_REPO_TOKEN }} - - - name: Update helm repo index.yaml - run: | - cd helm-charts - "${CR_TOOL_PATH}/cr" index --config "${CR_CONFIGFILE}" --token "${{ secrets.HELM_CHARTS_REPO_TOKEN }}" --index-path "${CR_INDEX_PATH}" --package-path "${CR_PACKAGE_PATH}" --push - - - name: Commit Chart.yaml changes - run: | - git status - git config user.name "$GITHUB_ACTOR" - git config user.email "$GITHUB_ACTOR@users.noreply.github.com" - git checkout main - git add ${CHART_PATH}/Chart.yaml - git commit -m "[Release] Update Chart.yaml" - git push - - - name: Sync chart with helm-charts github - run: | - cd helm-charts - git config user.name "$GITHUB_ACTOR" - git config user.email "$GITHUB_ACTOR@users.noreply.github.com" - git checkout main - cp -r ${CHART_PATH}/* ./charts/egressd - git add charts/egressd - git commit -m "Update egressd chart to ${{env.RELEASE_TAG}}" - git push + # ... rest of your steps for releasing the Helm chart ...