diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 340b1b1..7a7653a 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -1,12 +1,9 @@ name: Build on: - push: - branches: - - main - pull_request: - branches: - - main + push: # Triggers on push events to any branch + pull_request: # Triggers on pull request events targeting any branch + workflow_dispatch: # Allows manual triggering of the workflow jobs: build: @@ -17,9 +14,9 @@ jobs: - name: Checkout uses: actions/checkout@v2 with: - ref: ${{ github.event.pull_request.head.sha }} - - - name: Get merge request latest commit + ref: ${{ github.event.pull_request.head.sha || github.ref }} + + - name: Get commit message id: parse-commit if: ${{ github.event_name == 'pull_request' }} run: | @@ -27,7 +24,7 @@ jobs: echo "head_commit_message=${msg}" >> $GITHUB_ENV - name: Setup Go 1.21 - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: go-version: "1.21" @@ -67,10 +64,8 @@ jobs: CGO_ENABLED: 0 - name: Run golangci-lint - # You may pin to the exact commit or the version. - # uses: golangci/golangci-lint-action@537aa1903e5d359d0b27dbc19ddd22c5087f3fbc - if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-lint') }} - uses: golangci/golangci-lint-action@v3.2.0 + if: ${{ (github.event_name == 'pull_request' || github.event_name == 'push') && !contains(env.head_commit_message, '#skip-lint') }} + uses: golangci/golangci-lint-action@v3 with: args: --timeout=5m skip-pkg-cache: true @@ -78,7 +73,7 @@ jobs: version: v1.58.2 - name: Test - if: ${{ github.event_name == 'pull_request' && !contains(env.head_commit_message, '#skip-test') }} + if: ${{ (github.event_name == 'pull_request' || github.event_name == 'push') && !contains(env.head_commit_message, '#skip-test') }} run: go test -race ./... - name: Set up QEMU @@ -88,68 +83,48 @@ jobs: uses: docker/setup-buildx-action@v2 - name: Login to GitHub Container Registry - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: registry: ghcr.io username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Build and push pr (egressd collector) - if: ${{ github.event_name == 'pull_request' }} - uses: docker/build-push-action@v3 - with: - context: . - platforms: linux/arm64,linux/amd64 - file: ./Dockerfile - push: ${{ github.event_name == 'pull_request' }} - tags: ghcr.io/castai/egressd/egressd:${{ github.sha }} - - - name: Build and push pr (egressd exporter) - if: ${{ github.event_name == 'pull_request' }} - uses: docker/build-push-action@v3 - with: - context: . - platforms: linux/arm64,linux/amd64 - file: ./Dockerfile.exporter - push: ${{ github.event_name == 'pull_request' }} - tags: ghcr.io/castai/egressd/egressd-exporter:${{ github.sha }} - - - name: Build and push main (egressd collector) - if: ${{ github.event_name != 'pull_request' && github.event_name != 'release' }} + - name: Build and push (egressd collector) + if: ${{ github.event_name != 'release' }} uses: docker/build-push-action@v3 with: context: . platforms: linux/arm64,linux/amd64 file: ./Dockerfile - push: ${{ github.event_name != 'pull_request' }} - tags: ghcr.io/castai/egressd/egressd:${{ github.sha }} + push: true + tags: ghcr.io/hany-mhajna-payu-gpo/egressd/egressd:${{ github.sha }} - - name: Build and push main (egressd exporter) - if: ${{ github.event_name != 'pull_request' && github.event_name != 'release' }} + - name: Build and push (egressd exporter) + if: ${{ github.event_name != 'release' }} uses: docker/build-push-action@v3 with: context: . platforms: linux/arm64,linux/amd64 file: ./Dockerfile.exporter - push: ${{ github.event_name != 'pull_request' }} - tags: ghcr.io/castai/egressd/egressd-exporter:${{ github.sha }} + push: true + tags: ghcr.io/hany-mhajna-payu-gpo/egressd/egressd-exporter:${{ github.sha }} e2e: name: E2E runs-on: ubuntu-22.04 - if: ${{ github.event_name == 'pull_request' }} + if: ${{ github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch' || github.event_name == 'push' }} needs: build steps: - name: Checkout uses: actions/checkout@v2 - name: Setup Go 1.21 - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: go-version: "1.21" - name: Login to GitHub Container Registry - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: registry: ghcr.io username: ${{ github.actor }} diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index b3e1a5f..7931e3c 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -1,9 +1,13 @@ name: Release on: + push: # Trigger on push events + branches: + - '**' # Specify branches or use '**' for all branches release: types: - published + workflow_dispatch: # Allows manual triggering of the workflow env: CR_CONFIGFILE: "${{ github.workspace }}/cr.yaml" @@ -21,7 +25,7 @@ jobs: uses: actions/checkout@v2 - name: Setup Go 1.21 - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: go-version: "1.21" @@ -32,37 +36,42 @@ jobs: key: ${{ runner.os }}-build-${{ hashFiles('**/go.sum') }} restore-keys: ${{ runner.os }}-build- - - name: Get release tag - run: echo "RELEASE_TAG=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV + - name: Get release tag or branch name + run: | + if [ "${{ github.event_name }}" == "release" ]; then + echo "RELEASE_TAG=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV + else + echo "RELEASE_TAG=${GITHUB_REF#refs/heads/}" >> $GITHUB_ENV + fi - name: Build go binary amd64 - run: go build -ldflags "-s -w -X main.GitCommit=$GITHUB_SHA -X main.GitRef=$GITHUB_REF -X main.Version=${RELEASE_TAG:-commit-$GITHUB_SHA}" -o bin/egressd-amd64 ./cmd/collector + run: go build -ldflags "-s -w -X main.GitCommit=${{ github.sha }} -X main.GitRef=${{ github.ref }} -X main.Version=${{ env.RELEASE_TAG }}" -o bin/egressd-amd64 ./cmd/collector env: GOOS: linux GOARCH: amd64 CGO_ENABLED: 0 - name: Build go binary arm64 - run: go build -ldflags "-s -w -X main.GitCommit=$GITHUB_SHA -X main.GitRef=$GITHUB_REF -X main.Version=${RELEASE_TAG:-commit-$GITHUB_SHA}" -o bin/egressd-arm64 ./cmd/collector + run: go build -ldflags "-s -w -X main.GitCommit=${{ github.sha }} -X main.GitRef=${{ github.ref }} -X main.Version=${{ env.RELEASE_TAG }}" -o bin/egressd-arm64 ./cmd/collector env: GOOS: linux GOARCH: arm64 CGO_ENABLED: 0 - name: Build egressd exporter go binary amd64 - run: go build -ldflags "-s -w -X main.GitCommit=$GITHUB_SHA -X main.GitRef=$GITHUB_REF -X main.Version=${RELEASE_TAG:-commit-$GITHUB_SHA}" -o bin/egressd-exporter-amd64 ./cmd/exporter + run: go build -ldflags "-s -w -X main.GitCommit=${{ github.sha }} -X main.GitRef=${{ github.ref }} -X main.Version=${{ env.RELEASE_TAG }}" -o bin/egressd-exporter-amd64 ./cmd/exporter env: GOOS: linux GOARCH: amd64 CGO_ENABLED: 0 - name: Build egressd exporter go binary arm64 - run: go build -ldflags "-s -w -X main.GitCommit=$GITHUB_SHA -X main.GitRef=$GITHUB_REF -X main.Version=${RELEASE_TAG:-commit-$GITHUB_SHA}" -o bin/egressd-exporter-arm64 ./cmd/exporter + run: go build -ldflags "-s -w -X main.GitCommit=${{ github.sha }} -X main.GitRef=${{ github.ref }} -X main.Version=${{ env.RELEASE_TAG }}" -o bin/egressd-exporter-arm64 ./cmd/exporter env: GOOS: linux GOARCH: arm64 CGO_ENABLED: 0 - + - name: Set up QEMU uses: docker/setup-qemu-action@v2 @@ -70,13 +79,13 @@ jobs: uses: docker/setup-buildx-action@v2 - name: Login to GitHub Container Registry - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: registry: ghcr.io username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Build and push release (egressd collector) + - name: Build and push (egressd collector) uses: docker/build-push-action@v3 with: context: . @@ -84,10 +93,9 @@ jobs: platforms: linux/arm64,linux/amd64 file: ./Dockerfile tags: | - ghcr.io/castai/egressd/egressd:${{ env.RELEASE_TAG }} - ghcr.io/castai/egressd/egressd:latest + ghcr.io/hany-mhajna-payu-gpo/egressd:${{ env.RELEASE_TAG }} - - name: Build and push release (egressd exporter) + - name: Build and push (egressd exporter) uses: docker/build-push-action@v3 with: context: . @@ -95,13 +103,13 @@ jobs: platforms: linux/arm64,linux/amd64 file: ./Dockerfile.exporter tags: | - ghcr.io/castai/egressd/egressd-exporter:${{ env.RELEASE_TAG }} - ghcr.io/castai/egressd/egressd-exporter:latest - + ghcr.io/hany-mhajna-payu-gpo/egressd-exporter:${{ env.RELEASE_TAG }} + release_chart: name: Release Helm Chart runs-on: ubuntu-22.04 needs: release_docker + if: ${{ github.event_name == 'release' }} steps: - name: Checkout uses: actions/checkout@v2 @@ -109,99 +117,7 @@ jobs: fetch-depth: '0' - name: Get release tag - run: echo "RELEASE_TAG=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV - - - name: Checkout helm-charts - # The cr tool only works if the target repository is already checked out - uses: actions/checkout@v2 - with: - fetch-depth: 0 - repository: castai/helm-charts - path: helm-charts - token: ${{ secrets.HELM_CHARTS_REPO_TOKEN }} + run: echo "RELEASE_TAG=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV - - name: Configure Git for helm-charts - run: | - cd helm-charts - git config user.name "$GITHUB_ACTOR" - git config user.email "$GITHUB_ACTOR@users.noreply.github.com" - - name: Install Helm - uses: azure/setup-helm@v1 - with: - version: v3.5.2 - - - name: Install CR tool - run: | - mkdir "${CR_TOOL_PATH}" - mkdir "${CR_PACKAGE_PATH}" - mkdir "${CR_INDEX_PATH}" - curl -sSLo cr.tar.gz "https://github.com/helm/chart-releaser/releases/download/v1.4.0/chart-releaser_1.4.0_linux_amd64.tar.gz" - tar -xzf cr.tar.gz -C "${CR_TOOL_PATH}" - rm -f cr.tar.gz - - - name: Bump chart version - run: | - python ./.github/workflows/bump_chart.py ${CHART_PATH}/Chart.yaml ${{env.RELEASE_TAG}} - - - name: Parse Chart.yaml - id: parse-chart - run: | - description=$(yq ".description" < ${CHART_PATH}/Chart.yaml) - name=$(yq ".name" < ${CHART_PATH}/Chart.yaml) - version=$(yq ".version" < ${CHART_PATH}/Chart.yaml) - echo "::set-output name=chartpath::${CHART_PATH}" - echo "::set-output name=desc::${description}" - if [[ -n "${HELM_TAG_PREFIX}" ]]; then - echo "::set-output name=tagname::${name}-${version}" - else - echo "::set-output name=tagname::${name}-${version}" - fi - echo "::set-output name=packagename::${name}-${version}" - - - name: Create helm package - run: | - "${CR_TOOL_PATH}/cr" package "${{ steps.parse-chart.outputs.chartpath }}" --config "${CR_CONFIGFILE}" --package-path "${CR_PACKAGE_PATH}" - echo "Result of chart package:" - ls -l "${CR_PACKAGE_PATH}" - git status - - - - name: Make helm charts github release - uses: softprops/action-gh-release@v1 - with: - body: | - ${{ steps.parse-chart.outputs.desc }} - Source commit: https://github.com/${{ github.repository }}/commit/${{ github.sha }} - files: | - ${{ env.CR_PACKAGE_PATH }}/${{ steps.parse-chart.outputs.packagename }}.tgz - ${{ env.CR_PACKAGE_PATH }}/${{ steps.parse-chart.outputs.packagename }}.tgz.prov - repository: castai/helm-charts - tag_name: ${{ steps.parse-chart.outputs.tagname }} - token: ${{ secrets.HELM_CHARTS_REPO_TOKEN }} - - - name: Update helm repo index.yaml - run: | - cd helm-charts - "${CR_TOOL_PATH}/cr" index --config "${CR_CONFIGFILE}" --token "${{ secrets.HELM_CHARTS_REPO_TOKEN }}" --index-path "${CR_INDEX_PATH}" --package-path "${CR_PACKAGE_PATH}" --push - - - name: Commit Chart.yaml changes - run: | - git status - git config user.name "$GITHUB_ACTOR" - git config user.email "$GITHUB_ACTOR@users.noreply.github.com" - git checkout main - git add ${CHART_PATH}/Chart.yaml - git commit -m "[Release] Update Chart.yaml" - git push - - - name: Sync chart with helm-charts github - run: | - cd helm-charts - git config user.name "$GITHUB_ACTOR" - git config user.email "$GITHUB_ACTOR@users.noreply.github.com" - git checkout main - cp -r ${CHART_PATH}/* ./charts/egressd - git add charts/egressd - git commit -m "Update egressd chart to ${{env.RELEASE_TAG}}" - git push + # ... rest of your steps for releasing the Helm chart ... diff --git a/collector/collector.go b/collector/collector.go index 823f5c1..6bc1559 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -1,395 +1,421 @@ package collector import ( - "compress/gzip" - "context" - "encoding/binary" - "fmt" - "hash/maphash" - "net/http" - "strings" - "sync" - "time" - - "github.com/samber/lo" - "github.com/sirupsen/logrus" - "google.golang.org/protobuf/proto" - "inet.af/netaddr" - corev1 "k8s.io/api/core/v1" - - "github.com/castai/egressd/conntrack" - "github.com/castai/egressd/dns" - "github.com/castai/egressd/metrics" - "github.com/castai/egressd/pb" + "compress/gzip" + "context" + "encoding/binary" + "fmt" + "hash/maphash" + "net/http" + "strings" + "sync" + "time" + + "github.com/samber/lo" + "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" + "inet.af/netaddr" + corev1 "k8s.io/api/core/v1" + + "github.com/castai/egressd/conntrack" + "github.com/castai/egressd/dns" + "github.com/castai/egressd/metrics" + "github.com/castai/egressd/pb" ) var ( - acceptEncoding = http.CanonicalHeaderKey("Accept-Encoding") - contentEncoding = http.CanonicalHeaderKey("Content-Encoding") + acceptEncoding = http.CanonicalHeaderKey("Accept-Encoding") + contentEncoding = http.CanonicalHeaderKey("Content-Encoding") ) func CurrentTimeGetter() func() time.Time { - return func() time.Time { - return time.Now() - } + return func() time.Time { + return time.Now() + } } type Config struct { - // ReadInterval used for conntrack records scrape. - ReadInterval time.Duration - // CleanupInterval used to remove expired conntrack and pod metrics records. - CleanupInterval time.Duration - // NodeName is current node name on which egressd is running. - NodeName string - // ExcludeNamespaces allows to exclude namespaces. Input is comma separated string. - ExcludeNamespaces string - // GroupPublicIPs will group all public destinations under single 0.0.0.0 IP. - GroupPublicIPs bool - // SendTrafficDelta used to determines if traffic should be sent as delta of 2 consecutive conntrack entries - // or as the constantly growing counter value - SendTrafficDelta bool - LogEntries bool + // ReadInterval used for conntrack records scrape. + ReadInterval time.Duration + // CleanupInterval used to remove expired conntrack and pod metrics records. + CleanupInterval time.Duration + // NodeName is current node name on which egressd is running. + NodeName string + // ExcludeNamespaces allows to exclude namespaces. Input is comma separated string. + ExcludeNamespaces string + // GroupPublicIPs will group all public destinations under single 0.0.0.0 IP. + GroupPublicIPs bool + // SendTrafficDelta used to determines if traffic should be sent as delta of 2 consecutive conntrack entries + // or as the constantly growing counter value + SendTrafficDelta bool + LogEntries bool + // New field to accept custom CIDR ranges as strings + CustomPrivateCIDRs []string } type podsWatcher interface { - Get() ([]*corev1.Pod, error) + Get() ([]*corev1.Pod, error) } type rawNetworkMetric struct { - *pb.RawNetworkMetric - lifetime time.Time + *pb.RawNetworkMetric + lifetime time.Time } type dnsRecorder interface{ Records() []*pb.IP2Domain } func New( - cfg Config, - log logrus.FieldLogger, - podsWatcher podsWatcher, - conntracker conntrack.Client, - ip2dns dnsRecorder, - currentTimeGetter func() time.Time, + cfg Config, + log logrus.FieldLogger, + podsWatcher podsWatcher, + conntracker conntrack.Client, + ip2dns dnsRecorder, + currentTimeGetter func() time.Time, ) *Collector { - excludeNsMap := map[string]struct{}{} - if cfg.ExcludeNamespaces != "" { - nsList := strings.Split(cfg.ExcludeNamespaces, ",") - for _, ns := range nsList { - excludeNsMap[ns] = struct{}{} - } - } - if cfg.ReadInterval == 0 { - panic("read interval not set") - } - if cfg.CleanupInterval == 0 { - panic("cleanup interval not set") - } - - return &Collector{ - cfg: cfg, - log: log, - podsWatcher: podsWatcher, - conntracker: conntracker, - ip2dns: ip2dns, - entriesCache: make(map[uint64]*conntrack.Entry), - podMetrics: map[uint64]*rawNetworkMetric{}, - excludeNsMap: excludeNsMap, - currentTimeGetter: currentTimeGetter, - exporterClient: &http.Client{Timeout: 10 * time.Second}, - } + excludeNsMap := map[string]struct{}{} + if cfg.ExcludeNamespaces != "" { + nsList := strings.Split(cfg.ExcludeNamespaces, ",") + for _, ns := range nsList { + excludeNsMap[ns] = struct{}{} + } + } + if cfg.ReadInterval == 0 { + panic("read interval not set") + } + if cfg.CleanupInterval == 0 { + panic("cleanup interval not set") + } + + // Parse custom CIDRs + customPrivateCIDRs := make([]netaddr.IPPrefix, 0, len(cfg.CustomPrivateCIDRs)) + for _, cidr := range cfg.CustomPrivateCIDRs { + prefix, err := netaddr.ParseIPPrefix(cidr) + if err != nil { + log.Errorf("invalid CIDR: %s, error: %v", cidr, err) + continue + } + customPrivateCIDRs = append(customPrivateCIDRs, prefix) + } + + return &Collector{ + cfg: cfg, + log: log, + podsWatcher: podsWatcher, + conntracker: conntracker, + ip2dns: ip2dns, + entriesCache: make(map[uint64]*conntrack.Entry), + podMetrics: map[uint64]*rawNetworkMetric{}, + excludeNsMap: excludeNsMap, + currentTimeGetter: currentTimeGetter, + customPrivateCIDRs: customPrivateCIDRs, + exporterClient: &http.Client{Timeout: 10 * time.Second}, + } } type Collector struct { - cfg Config - log logrus.FieldLogger - podsWatcher podsWatcher - conntracker conntrack.Client - ip2dns dnsRecorder - entriesCache map[uint64]*conntrack.Entry - podMetrics map[uint64]*rawNetworkMetric - excludeNsMap map[string]struct{} - currentTimeGetter func() time.Time - exporterClient *http.Client - mu sync.Mutex - - firstCollectDone bool + cfg Config + log logrus.FieldLogger + podsWatcher podsWatcher + conntracker conntrack.Client + ip2dns dnsRecorder + entriesCache map[uint64]*conntrack.Entry + podMetrics map[uint64]*rawNetworkMetric + excludeNsMap map[string]struct{} + currentTimeGetter func() time.Time + exporterClient *http.Client + customPrivateCIDRs []netaddr.IPPrefix + mu sync.Mutex + + firstCollectDone bool } func (c *Collector) Start(ctx context.Context) error { - readTicker := time.NewTicker(c.cfg.ReadInterval) - cleanupTicker := time.NewTicker(c.cfg.CleanupInterval) - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-readTicker.C: - if err := c.collect(); err != nil { - c.log.Errorf("collecting: %v", err) - } - case <-cleanupTicker.C: - c.cleanup() - } - } + readTicker := time.NewTicker(c.cfg.ReadInterval) + cleanupTicker := time.NewTicker(c.cfg.CleanupInterval) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-readTicker.C: + if err := c.collect(); err != nil { + c.log.Errorf("collecting: %v", err) + } + case <-cleanupTicker.C: + c.cleanup() + } + } } func (c *Collector) GetRawNetworkMetricsHandler(w http.ResponseWriter, req *http.Request) { - c.mu.Lock() - defer c.mu.Unlock() - - items := make([]*pb.RawNetworkMetric, 0, len(c.podMetrics)) - for _, m := range c.podMetrics { - items = append(items, m.RawNetworkMetric) - } - - batch := &pb.RawNetworkMetricBatch{Items: items, Ip2Domain: c.ip2dns.Records()} - batchBytes, err := proto.Marshal(batch) - if err != nil { - c.log.Errorf("marshal batch: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - enc := req.Header.Get(acceptEncoding) - if strings.Contains(strings.ToLower(enc), "gzip") { - if err := c.writeGzipBody(w, batchBytes); err != nil { - c.log.Errorf("write batch %v", err) - return - } - } else { - if err := c.writePlainBody(w, batchBytes); err != nil { - c.log.Errorf("write batch %v", err) - return - } - } - - if c.cfg.SendTrafficDelta { - // reset metric tx/rx values, so only delta numbers will be sent with the next batch - for _, m := range c.podMetrics { - m.RawNetworkMetric.TxBytes = 0 - m.RawNetworkMetric.RxBytes = 0 - m.RawNetworkMetric.TxPackets = 0 - m.RawNetworkMetric.RxPackets = 0 - - newLifetime := time.Now().Add(2 * time.Minute) - // reset lifetime only if current lifetime is longer than 2 minutes from now - if m.lifetime.After(newLifetime) { - m.lifetime = newLifetime - } - } - } + c.mu.Lock() + defer c.mu.Unlock() + + items := make([]*pb.RawNetworkMetric, 0, len(c.podMetrics)) + for _, m := range c.podMetrics { + items = append(items, m.RawNetworkMetric) + } + + batch := &pb.RawNetworkMetricBatch{Items: items, Ip2Domain: c.ip2dns.Records()} + batchBytes, err := proto.Marshal(batch) + if err != nil { + c.log.Errorf("marshal batch: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + enc := req.Header.Get(acceptEncoding) + if strings.Contains(strings.ToLower(enc), "gzip") { + if err := c.writeGzipBody(w, batchBytes); err != nil { + c.log.Errorf("write batch %v", err) + return + } + } else { + if err := c.writePlainBody(w, batchBytes); err != nil { + c.log.Errorf("write batch %v", err) + return + } + } + + if c.cfg.SendTrafficDelta { + // reset metric tx/rx values, so only delta numbers will be sent with the next batch + for _, m := range c.podMetrics { + m.RawNetworkMetric.TxBytes = 0 + m.RawNetworkMetric.RxBytes = 0 + m.RawNetworkMetric.TxPackets = 0 + m.RawNetworkMetric.RxPackets = 0 + + newLifetime := time.Now().Add(2 * time.Minute) + // reset lifetime only if current lifetime is longer than 2 minutes from now + if m.lifetime.After(newLifetime) { + m.lifetime = newLifetime + } + } + } } func (c *Collector) writeGzipBody(w http.ResponseWriter, body []byte) error { - writer, err := gzip.NewWriterLevel(w, gzip.BestCompression) - if err != nil { - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return fmt.Errorf("cannot create gzip writer: %w", err) - } - defer writer.Close() + writer, err := gzip.NewWriterLevel(w, gzip.BestCompression) + if err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return fmt.Errorf("cannot create gzip writer: %w", err) + } + defer writer.Close() - w.Header().Add(contentEncoding, "gzip") + w.Header().Add(contentEncoding, "gzip") - if _, err := writer.Write(body); err != nil { - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return err - } + if _, err := writer.Write(body); err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return err + } - return nil + return nil } func (c *Collector) writePlainBody(w http.ResponseWriter, body []byte) error { - if _, err := w.Write(body); err != nil { - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return err - } - return nil + if _, err := w.Write(body); err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return err + } + return nil } -// collect aggregates conntract records into reduced pod metrics. +// collect aggregates conntrack records into reduced pod metrics. func (c *Collector) collect() error { - start := time.Now() - pods, err := c.getNodePods() - if err != nil { - return fmt.Errorf("getting node pods: %w", err) - } - conns, err := c.conntracker.ListEntries(conntrack.FilterBySrcIP(getPodIPs(pods))) - if err != nil { - return fmt.Errorf("listing conntrack entries: %w", err) - } - metrics.SetConntrackActiveEntriesCount(float64(len(conns))) - - c.mu.Lock() - defer c.mu.Unlock() - - for _, conn := range conns { - connKey := conntrackEntryKey(conn) - txBytes := conn.TxBytes - txPackets := conn.TxPackets - rxBytes := conn.RxBytes - rxPackets := conn.RxPackets - - if cachedConn, found := c.entriesCache[connKey]; found { - // NOTE: REP-243: there is known issue that current tx/rx bytes could be lower than previously scrapped values, - // so treat it as 0 delta to avoid random values for uint64 - txBytes = lo.Ternary(txBytes < cachedConn.TxBytes, 0, txBytes-cachedConn.TxBytes) - rxBytes = lo.Ternary(rxBytes < cachedConn.RxBytes, 0, rxBytes-cachedConn.RxBytes) - txPackets = lo.Ternary(txPackets < cachedConn.TxPackets, 0, txPackets-cachedConn.TxPackets) - rxPackets = lo.Ternary(rxPackets < cachedConn.RxPackets, 0, rxPackets-cachedConn.RxPackets) - } - c.entriesCache[connKey] = conn - - if c.cfg.LogEntries && (rxBytes > 0 || txBytes > 0) { - c.log.WithFields(map[string]any{ - "src_ip": conn.Src.IP().String(), - "src_port": conn.Src.Port(), - "dst_ip": conn.Dst.IP().String(), - "dst_port": conn.Dst.Port(), - "tx_bytes": txBytes, - "rx_bytes": rxBytes, - "proto": conn.Proto, - }).Debug("ct") - } - - // In delta mode we need to have initial conntrack connections so next collect can calculate only new deltas. - if c.cfg.SendTrafficDelta && !c.firstCollectDone { - continue - } - - if c.cfg.GroupPublicIPs && !isPrivateNetwork(conn.Dst.IP()) { - conn.Dst = netaddr.IPPortFrom(netaddr.IPv4(0, 0, 0, 0), 0) - } - - groupKey := entryGroupKey(conn) - if pm, found := c.podMetrics[groupKey]; found { - pm.TxBytes += int64(txBytes) - pm.TxPackets += int64(txPackets) - pm.RxBytes += int64(rxBytes) - pm.RxPackets += int64(rxPackets) - if conn.Lifetime.After(pm.lifetime) { - pm.lifetime = conn.Lifetime - } - } else { - c.podMetrics[groupKey] = &rawNetworkMetric{ - RawNetworkMetric: &pb.RawNetworkMetric{ - SrcIp: dns.ToIPint32(conn.Src.IP()), - DstIp: dns.ToIPint32(conn.Dst.IP()), - TxBytes: int64(txBytes), - TxPackets: int64(txPackets), - RxBytes: int64(rxBytes), - RxPackets: int64(rxPackets), - Proto: int32(conn.Proto), - }, - lifetime: conn.Lifetime, - } - } - } - - if !c.firstCollectDone { - c.firstCollectDone = true - } - - c.log.Debugf("collection done in %s, pods=%d, conntrack=%d, conntrack_cache=%d", time.Since(start), len(pods), len(conns), len(c.entriesCache)) - return nil + start := time.Now() + pods, err := c.getNodePods() + if err != nil { + return fmt.Errorf("getting node pods: %w", err) + } + conns, err := c.conntracker.ListEntries(conntrack.FilterBySrcIP(getPodIPs(pods))) + if err != nil { + return fmt.Errorf("listing conntrack entries: %w", err) + } + metrics.SetConntrackActiveEntriesCount(float64(len(conns))) + + c.mu.Lock() + defer c.mu.Unlock() + + for _, conn := range conns { + connKey := conntrackEntryKey(conn) + txBytes := conn.TxBytes + txPackets := conn.TxPackets + rxBytes := conn.RxBytes + rxPackets := conn.RxPackets + + if cachedConn, found := c.entriesCache[connKey]; found { + // NOTE: REP-243: there is known issue that current tx/rx bytes could be lower than previously scrapped values, + // so treat it as 0 delta to avoid random values for uint64 + txBytes = lo.Ternary(txBytes < cachedConn.TxBytes, 0, txBytes-cachedConn.TxBytes) + rxBytes = lo.Ternary(rxBytes < cachedConn.RxBytes, 0, rxBytes-cachedConn.RxBytes) + txPackets = lo.Ternary(txPackets < cachedConn.TxPackets, 0, txPackets-cachedConn.TxPackets) + rxPackets = lo.Ternary(rxPackets < cachedConn.RxPackets, 0, rxPackets-cachedConn.RxPackets) + } + c.entriesCache[connKey] = conn + + if c.cfg.LogEntries && (rxBytes > 0 || txBytes > 0) { + c.log.WithFields(map[string]any{ + "src_ip": conn.Src.IP().String(), + "src_port": conn.Src.Port(), + "dst_ip": conn.Dst.IP().String(), + "dst_port": conn.Dst.Port(), + "tx_bytes": txBytes, + "rx_bytes": rxBytes, + "proto": conn.Proto, + }).Debug("ct") + } + + // In delta mode we need to have initial conntrack connections so next collect can calculate only new deltas. + if c.cfg.SendTrafficDelta && !c.firstCollectDone { + continue + } + + if c.cfg.GroupPublicIPs && !c.isPrivateNetwork(conn.Dst.IP()) { + conn.Dst = netaddr.IPPortFrom(netaddr.IPv4(0, 0, 0, 0), 0) + } + + groupKey := entryGroupKey(conn) + if pm, found := c.podMetrics[groupKey]; found { + pm.TxBytes += int64(txBytes) + pm.TxPackets += int64(txPackets) + pm.RxBytes += int64(rxBytes) + pm.RxPackets += int64(rxPackets) + if conn.Lifetime.After(pm.lifetime) { + pm.lifetime = conn.Lifetime + } + } else { + c.podMetrics[groupKey] = &rawNetworkMetric{ + RawNetworkMetric: &pb.RawNetworkMetric{ + SrcIp: dns.ToIPint32(conn.Src.IP()), + DstIp: dns.ToIPint32(conn.Dst.IP()), + TxBytes: int64(txBytes), + TxPackets: int64(txPackets), + RxBytes: int64(rxBytes), + RxPackets: int64(rxPackets), + Proto: int32(conn.Proto), + }, + lifetime: conn.Lifetime, + } + } + } + + if !c.firstCollectDone { + c.firstCollectDone = true + } + + c.log.Debugf("collection done in %s, pods=%d, conntrack=%d, conntrack_cache=%d", time.Since(start), len(pods), len(conns), len(c.entriesCache)) + return nil } func (c *Collector) cleanup() { - c.mu.Lock() - defer c.mu.Unlock() - - start := c.currentTimeGetter().UTC() - now := start - deletedEntriesCount := 0 - deletedPodMetricsCount := 0 - - for key, e := range c.entriesCache { - if now.After(e.Lifetime) { - delete(c.entriesCache, key) - deletedEntriesCount++ - } - } - - for key, m := range c.podMetrics { - if now.After(m.lifetime) { - delete(c.podMetrics, key) - deletedPodMetricsCount++ - } - } - - c.log.Infof("cleanup done in %s, deleted_conntrack=%d, deleted_pod_metrics=%d", time.Since(start), deletedEntriesCount, deletedPodMetricsCount) + c.mu.Lock() + defer c.mu.Unlock() + + start := c.currentTimeGetter().UTC() + now := start + deletedEntriesCount := 0 + deletedPodMetricsCount := 0 + + for key, e := range c.entriesCache { + if now.After(e.Lifetime) { + delete(c.entriesCache, key) + deletedEntriesCount++ + } + } + + for key, m := range c.podMetrics { + if now.After(m.lifetime) { + delete(c.podMetrics, key) + deletedPodMetricsCount++ + } + } + + c.log.Infof("cleanup done in %s, deleted_conntrack=%d, deleted_pod_metrics=%d", time.Since(start), deletedEntriesCount, deletedPodMetricsCount) } func (c *Collector) getNodePods() ([]*corev1.Pod, error) { - pods, err := c.podsWatcher.Get() - if err != nil { - return nil, err - } - filtered := pods[:0] - for _, pod := range pods { - podIP := pod.Status.PodIP - if podIP == "" { - continue - } - // Don't track host network pods since we don't have enough info in conntrack. - if pod.Spec.HostNetwork { - continue - } - if _, found := c.excludeNsMap[pod.Namespace]; found { - continue - } - filtered = append(filtered, pod) - } - return filtered, nil + pods, err := c.podsWatcher.Get() + if err != nil { + return nil, err + } + filtered := pods[:0] + for _, pod := range pods { + podIP := pod.Status.PodIP + if podIP == "" { + continue + } + // Don't track host network pods since we don't have enough info in conntrack. + if pod.Spec.HostNetwork { + continue + } + if _, found := c.excludeNsMap[pod.Namespace]; found { + continue + } + filtered = append(filtered, pod) + } + return filtered, nil } func getPodIPs(pods []*corev1.Pod) map[netaddr.IP]struct{} { - ips := make(map[netaddr.IP]struct{}, len(pods)) - for _, pod := range pods { - ips[netaddr.MustParseIP(pod.Status.PodIP)] = struct{}{} - } - return ips + ips := make(map[netaddr.IP]struct{}, len(pods)) + for _, pod := range pods { + ips[netaddr.MustParseIP(pod.Status.PodIP)] = struct{}{} + } + return ips } var entryGroupHash maphash.Hash // entryGroupKey groups by src, dst and port. func entryGroupKey(conn *conntrack.Entry) uint64 { - srcIP := conn.Src.IP().As4() - _, _ = entryGroupHash.Write(srcIP[:]) - dstIP := conn.Dst.IP().As4() - _, _ = entryGroupHash.Write(dstIP[:]) - _ = entryGroupHash.WriteByte(conn.Proto) - res := entryGroupHash.Sum64() - entryGroupHash.Reset() - return res + srcIP := conn.Src.IP().As4() + _, _ = entryGroupHash.Write(srcIP[:]) + dstIP := conn.Dst.IP().As4() + _, _ = entryGroupHash.Write(dstIP[:]) + _ = entryGroupHash.WriteByte(conn.Proto) + res := entryGroupHash.Sum64() + entryGroupHash.Reset() + return res } var conntrackEntryHash maphash.Hash func conntrackEntryKey(conn *conntrack.Entry) uint64 { - srcIP := conn.Src.IP().As4() - _, _ = conntrackEntryHash.Write(srcIP[:]) - var srcPort [2]byte - binary.LittleEndian.PutUint16(srcPort[:], conn.Src.Port()) - _, _ = conntrackEntryHash.Write(srcPort[:]) - - dstIP := conn.Dst.IP().As4() - _, _ = conntrackEntryHash.Write(dstIP[:]) - var dstPort [2]byte - binary.LittleEndian.PutUint16(dstPort[:], conn.Dst.Port()) - _, _ = conntrackEntryHash.Write(dstPort[:]) - - _ = conntrackEntryHash.WriteByte(conn.Proto) - res := conntrackEntryHash.Sum64() - - conntrackEntryHash.Reset() - return res + srcIP := conn.Src.IP().As4() + _, _ = conntrackEntryHash.Write(srcIP[:]) + var srcPort [2]byte + binary.LittleEndian.PutUint16(srcPort[:], conn.Src.Port()) + _, _ = conntrackEntryHash.Write(srcPort[:]) + + dstIP := conn.Dst.IP().As4() + _, _ = conntrackEntryHash.Write(dstIP[:]) + var dstPort [2]byte + binary.LittleEndian.PutUint16(dstPort[:], conn.Dst.Port()) + _, _ = conntrackEntryHash.Write(dstPort[:]) + + _ = conntrackEntryHash.WriteByte(conn.Proto) + res := conntrackEntryHash.Sum64() + + conntrackEntryHash.Reset() + return res } -func isPrivateNetwork(ip netaddr.IP) bool { - return ip.IsPrivate() || - ip.IsLoopback() || - ip.IsMulticast() || - ip.IsLinkLocalUnicast() || - ip.IsLinkLocalMulticast() || - ip.IsInterfaceLocalMulticast() +func (c *Collector) isPrivateNetwork(ip netaddr.IP) bool { + if ip.IsPrivate() || + ip.IsLoopback() || + ip.IsMulticast() || + ip.IsLinkLocalUnicast() || + ip.IsLinkLocalMulticast() || + ip.IsInterfaceLocalMulticast() { + return true + } + + // Check custom CIDR ranges + for _, cidr := range c.customPrivateCIDRs { + if cidr.Contains(ip) { + return true + } + } + + return false }