Skip to content

Commit

Permalink
Implement reach command
Browse files Browse the repository at this point in the history
Signed-off-by: Daichi Sakaue <[email protected]>
  • Loading branch information
yokaze committed Dec 9, 2024
1 parent ddc0e55 commit 03d40f7
Show file tree
Hide file tree
Showing 14 changed files with 324 additions and 114 deletions.
3 changes: 3 additions & 0 deletions cmd/npv/app/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ const (
directionEgress = "Egress"
directionIngress = "Ingress"

partFrom = "From"
partTo = "To"

policyAllow = "Allow"
policyDeny = "Deny"
)
Expand Down
42 changes: 0 additions & 42 deletions cmd/npv/app/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,48 +45,6 @@ func createK8sClients() (*kubernetes.Clientset, *dynamic.DynamicClient, error) {
return clientset, dynamicClient, nil
}

func createCiliumClient(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string) (*client.Client, error) {
endpoint, err := getProxyEndpoint(ctx, clientset, namespace, name)
if err != nil {
return nil, err
}

if cached, ok := cachedCiliumClients[endpoint]; ok {
return cached, nil
}

ciliumClient, err := client.NewClient(endpoint)
if err != nil {
return nil, err
}
cachedCiliumClients[endpoint] = ciliumClient

return ciliumClient, err
}

func getProxyEndpoint(ctx context.Context, c *kubernetes.Clientset, namespace, name string) (string, error) {
targetPod, err := c.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return "", err
}
targetNode := targetPod.Spec.NodeName

pods, err := c.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{
FieldSelector: "spec.nodeName=" + targetNode,
LabelSelector: rootOptions.proxySelector,
})
if err != nil {
return "", err
}
if num := len(pods.Items); num != 1 {
err := fmt.Errorf("failed to find cilium-agent-proxy. found %d pods", num)
return "", err
}

podIP := pods.Items[0].Status.PodIP
return fmt.Sprintf("http://%s:%d", podIP, rootOptions.proxyPort), nil
}

func getPodEndpointID(ctx context.Context, d *dynamic.DynamicClient, namespace, name string) (int64, error) {
ep, err := d.Resource(gvrEndpoint).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
Expand Down
123 changes: 123 additions & 0 deletions cmd/npv/app/helper_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package app

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/cilium/cilium/pkg/client"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)

func getProxyEndpoint(ctx context.Context, c *kubernetes.Clientset, namespace, name string) (string, error) {
targetPod, err := c.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return "", err
}
targetNode := targetPod.Spec.NodeName

pods, err := c.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{
FieldSelector: "spec.nodeName=" + targetNode,
LabelSelector: rootOptions.proxySelector,
})
if err != nil {
return "", err
}
if num := len(pods.Items); num != 1 {
err := fmt.Errorf("failed to find cilium-agent-proxy. found %d pods", num)
return "", err
}

podIP := pods.Items[0].Status.PodIP
return fmt.Sprintf("http://%s:%d", podIP, rootOptions.proxyPort), nil
}

func createCiliumClient(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string) (*client.Client, error) {
endpoint, err := getProxyEndpoint(ctx, clientset, namespace, name)
if err != nil {
return nil, err
}

if cached, ok := cachedCiliumClients[endpoint]; ok {
return cached, nil
}

ciliumClient, err := client.NewClient(endpoint)
if err != nil {
return nil, err
}
cachedCiliumClients[endpoint] = ciliumClient

return ciliumClient, err
}

type policyEntryKey struct {
Identity int `json:"Identity"`
Direction int `json:"TrafficDirection"`
Protocol int `json:"Nexthdr"`
BigPort int `json:"DestPortNetwork"` // big endian
}

func (p policyEntryKey) Port() int {
return ((p.BigPort & 0xFF) << 8) + ((p.BigPort & 0xFF00) >> 8)
}

// For the meanings of the flags, see:
// https://github.com/cilium/cilium/blob/v1.16.3/bpf/lib/common.h#L394
type policyEntry struct {
Flags int `json:"Flags"`
Packets int `json:"Packets"`
Bytes int `json:"Bytes"`
Key policyEntryKey `json:"Key"`
}

func (p policyEntry) IsDenyRule() bool {
return (p.Flags & 1) > 0
}

func (p policyEntry) IsEgressRule() bool {
return p.Key.Direction > 0
}

func (p policyEntry) IsWildcardProtocol() bool {
return (p.Flags & 2) > 0
}

func (p policyEntry) IsWildcardPort() bool {
return (p.Flags & 4) > 0
}

func queryPolicyMap(ctx context.Context, clientset *kubernetes.Clientset, dynamicClient *dynamic.DynamicClient, namespace, name string) ([]policyEntry, error) {
endpointID, err := getPodEndpointID(ctx, dynamicClient, namespace, name)
if err != nil {
return nil, fmt.Errorf("failed to get pod endpoint ID: %w", err)
}

url, err := getProxyEndpoint(ctx, clientset, namespace, name)
if err != nil {
return nil, fmt.Errorf("failed to get proxy endpoint: %w", err)
}

url = fmt.Sprintf("%s/policy/%d", url, endpointID)
resp, err := http.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to request policy: %w", err)
}
defer resp.Body.Close()

data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response: %w", err)
}

policies := make([]policyEntry, 0)
if err = json.Unmarshal(data, &policies); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}

return policies, nil
}
69 changes: 1 addition & 68 deletions cmd/npv/app/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package app

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"slices"
"strconv"
"strings"
Expand All @@ -16,8 +14,6 @@ import (
"github.com/spf13/cobra"
"golang.org/x/exp/rand"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)

func init() {
Expand All @@ -36,38 +32,6 @@ var inspectCmd = &cobra.Command{
ValidArgsFunction: completePods,
}

type policyEntryKey struct {
Identity int `json:"Identity"`
Direction int `json:"TrafficDirection"`
Protocol int `json:"Nexthdr"`
BigPort int `json:"DestPortNetwork"` // big endian
}

// For the meanings of the flags, see:
// https://github.com/cilium/cilium/blob/v1.16.3/bpf/lib/common.h#L394
type policyEntry struct {
Flags int `json:"Flags"`
Packets int `json:"Packets"`
Bytes int `json:"Bytes"`
Key policyEntryKey `json:"Key"`
}

func (p policyEntry) IsDenyRule() bool {
return (p.Flags & 1) > 0
}

func (p policyEntry) IsEgressRule() bool {
return p.Key.Direction > 0
}

func (p policyEntry) IsWildcardProtocol() bool {
return (p.Flags & 2) > 0
}

func (p policyEntry) IsWildcardPort() bool {
return (p.Flags & 4) > 0
}

// This command aims to show the result of "cilium bpf policy get" from a remote pod.
// https://github.com/cilium/cilium/blob/v1.16.3/cilium-dbg/cmd/bpf_policy_get.go
type inspectEntry struct {
Expand All @@ -84,37 +48,6 @@ type inspectEntry struct {
Packets int `json:"packets"`
}

func queryPolicyMap(ctx context.Context, clientset *kubernetes.Clientset, dynamicClient *dynamic.DynamicClient, namespace, name string) ([]policyEntry, error) {
endpointID, err := getPodEndpointID(ctx, dynamicClient, namespace, name)
if err != nil {
return nil, fmt.Errorf("failed to get pod endpoint ID: %w", err)
}

url, err := getProxyEndpoint(ctx, clientset, namespace, name)
if err != nil {
return nil, fmt.Errorf("failed to get proxy endpoint: %w", err)
}

url = fmt.Sprintf("%s/policy/%d", url, endpointID)
resp, err := http.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to request policy: %w", err)
}
defer resp.Body.Close()

data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response: %w", err)
}

policies := make([]policyEntry, 0)
if err = json.Unmarshal(data, &policies); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}

return policies, nil
}

func runInspect(ctx context.Context, w io.Writer, name string) error {
clientset, dynamicClient, err := createK8sClients()
if err != nil {
Expand Down Expand Up @@ -204,7 +137,7 @@ func runInspect(ctx context.Context, w io.Writer, name string) error {
entry.WildcardProtocol = p.IsWildcardProtocol()
entry.WildcardPort = p.IsWildcardPort()
entry.Protocol = p.Key.Protocol
entry.Port = ((p.Key.BigPort & 0xFF) << 8) + ((p.Key.BigPort & 0xFF00) >> 8)
entry.Port = p.Key.Port()
entry.Bytes = p.Bytes
entry.Packets = p.Packets
arr[i] = entry
Expand Down
4 changes: 2 additions & 2 deletions cmd/npv/app/manifest_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ func runManifestRange(ctx context.Context, w io.Writer) error {

for _, ep := range idEndpoints[int(fromIdentity)] {
entry := manifestRangeEntry{
Part: "From",
Part: partFrom,
Namespace: ep.GetNamespace(),
Name: ep.GetName(),
}
arr = append(arr, entry)
}
for _, ep := range idEndpoints[int(toIdentity)] {
entry := manifestRangeEntry{
Part: "To",
Part: partTo,
Namespace: ep.GetNamespace(),
Name: ep.GetName(),
}
Expand Down
Loading

0 comments on commit 03d40f7

Please sign in to comment.