diff --git a/.clang-format b/.clang-format index 4f3186cf1..2d0ce67d7 100644 --- a/.clang-format +++ b/.clang-format @@ -44,7 +44,7 @@ AllowShortIfStatementsOnASingleLine: false # 允许短的循环保持在同一行 AllowShortLoopsOnASingleLine: false -# 总是在返回类型后换行: None, All, TopLevel(顶级函数,不包括在类中的函数), +# 总是在返回类型后换行: None, All, TopLevel(顶级函数,不包括在类中的函数), # AllDefinitions(所有的定义,不包括声明), TopLevelDefinitions(所有的顶级函数的定义) AlwaysBreakAfterReturnType: None @@ -82,6 +82,8 @@ BraceWrapping: BeforeCatch: false # else之前 BeforeElse: false + # elseif之前 + BeforeElseif: false # 缩进大括号 IndentBraces: false # 分离空函数 @@ -94,8 +96,8 @@ BraceWrapping: # 在二元运算符前换行: None(在操作符后换行), NonAssignment(在非赋值的操作符前换行), All(在操作符前换行) BreakBeforeBinaryOperators: NonAssignment -# 在大括号前换行: Attach(始终将大括号附加到周围的上下文), Linux(除函数、命名空间和类定义,与Attach类似), -# Mozilla(除枚举、函数、记录定义,与Attach类似), Stroustrup(除函数定义、catch、else,与Attach类似), +# 在大括号前换行: Attach(始终将大括号附加到周围的上下文), Linux(除函数、命名空间和类定义,与Attach类似), +# Mozilla(除枚举、函数、记录定义,与Attach类似), Stroustrup(除函数定义、catch、else,与Attach类似), # Allman(总是在大括号前换行), GNU(总是在大括号前换行,并对于控制语句的大括号增加额外的缩进), WebKit(在函数前换行), Custom # 注:这里认为语句块也属于函数 BreakBeforeBraces: Custom diff --git a/Makefile b/Makefile index 894b6a42e..a8393d06a 100644 --- a/Makefile +++ b/Makefile @@ -12,13 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Author: LemmyHuang -# Create: 2021-12-08 VERSION ?= 1.0-dev GIT_COMMIT_HASH ?= $(shell git rev-parse HEAD) GIT_TREESTATE=$(shell if [ -n "$(git status --porcelain)" ]; then echo "dirty"; else echo "clean"; fi) BUILD_DATE = $(shell date -u +'%Y-%m-%dT%H:%M:%SZ') ROOT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) + # Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set) ifeq (,$(shell go env GOBIN)) GOBIN=$(shell go env GOPATH)/bin @@ -143,6 +142,7 @@ uninstall: $(call printlog, UNINSTALL, $(INSTALL_BIN)/$(APPS3)) $(QUIET) rm -rf $(INSTALL_BIN)/$(APPS3) +.PHONY: build build: ./kmesh_compile.sh @@ -152,6 +152,15 @@ docker: build format: ./hack/format.sh +.PHONY: test +ifeq ($(RUN_IN_CONTAINER),1) +test: + ./hack/run-ut.sh --docker +else +test: + ./hack/run-ut.sh --local +endif + clean: $(QUIET) rm -rf ./out $(QUIET) rm -rf ./config/linux-bpf.h diff --git a/README.md b/README.md index 3c93edb47..4b4c3be1f 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,7 @@ The Kmesh user space components are licensed under the [Apache License, Version 2.0](./LICENSE). The BPF code templates, ko(kernel module) and mesh data accelerate are dual-licensed under the [General Public License, Version 2.0 (only)](./bpf/LICENSE.GPL-2.0) -and the [2-Clause BSD License](./bpf/LICENSE.GPL-2.0) +and the [2-Clause BSD License](./bpf/LICENSE.BSD-2-Clause) (you can use the terms of either license, at your option). ## Credit diff --git a/bpf/include/bpf_common.h b/bpf/include/bpf_common.h index 801763ad2..348f7bee2 100644 --- a/bpf/include/bpf_common.h +++ b/bpf/include/bpf_common.h @@ -1,7 +1,12 @@ /* SPDX-License-Identifier: (GPL-2.0-only OR BSD-2-Clause) */ /* Copyright Authors of Kmesh */ -#define map_of_manager kmesh_manage +#ifndef __KMESH_BPF_COMMON_H__ +#define __KMESH_BPF_COMMON_H__ + +#include "common.h" + +#define map_of_manager kmesh_manage #define MAP_SIZE_OF_MANAGER 8192 /*0x3a1(929) is the specific port handled by the cni to enable kmesh*/ #define ENABLE_KMESH_PORT 0x3a1 @@ -16,6 +21,13 @@ #define AUTH_FORBID 1 #define AUTH_PROCESSING 2 +struct manager_key { + union { + __u64 netns_cookie; + struct ip_addr addr; + }; +}; + typedef struct { __u32 is_bypassed; } manager_value_t; @@ -32,49 +44,69 @@ typedef struct { */ struct { __uint(type, BPF_MAP_TYPE_HASH); - __type(key, __u64); + __type(key, struct manager_key); __type(value, manager_value_t); __uint(max_entries, MAP_SIZE_OF_MANAGER); __uint(map_flags, 0); } map_of_manager SEC(".maps"); -static inline void record_manager_netns_cookie(struct bpf_map *map, struct bpf_sock_addr *ctx) +/* + * From v5.4, bpf_get_netns_cookie can be called for bpf cgroup hooks, from v5.15, it can be called for bpf sockops + * hook. Therefore, ensure that function is correctly used. + */ +static inline void record_manager_netns_cookie(struct bpf_sock_addr *ctx) { int err; + struct manager_key key = {0}; + key.netns_cookie = bpf_get_netns_cookie(ctx); manager_value_t value = { .is_bypassed = 0, }; - __u64 cookie = bpf_get_netns_cookie(ctx); - err = bpf_map_update_elem(map, &cookie, &value, BPF_NOEXIST); + err = bpf_map_update_elem(&map_of_manager, &key, &value, BPF_NOEXIST); if (err) BPF_LOG(ERR, KMESH, "record netcookie failed!, err is %d\n", err); } -static inline void set_netns_bypass_value(struct bpf_sock_addr *sock_addr, int new_bypass_value) +/* + * From v5.4, bpf_get_netns_cookie can be called for bpf cgroup hooks, from v5.15, it can be called for bpf sockops + * hook. Therefore, ensure that function is correctly used. + */ +static inline void set_netns_bypass_value(struct bpf_sock_addr *ctx, int new_bypass_value) { - __u64 cookie = bpf_get_netns_cookie(sock_addr); - manager_value_t *current_value = bpf_map_lookup_elem(&map_of_manager, &cookie); + struct manager_key key = {0}; + key.netns_cookie = bpf_get_netns_cookie(ctx); + manager_value_t *current_value = bpf_map_lookup_elem(&map_of_manager, &key); if (!current_value || current_value->is_bypassed == new_bypass_value) return; current_value->is_bypassed = new_bypass_value; - int err = bpf_map_update_elem(&map_of_manager, &cookie, current_value, BPF_EXIST); + int err = bpf_map_update_elem(&map_of_manager, &key, current_value, BPF_EXIST); if (err) BPF_LOG(ERR, KMESH, "set netcookie failed!, err is %d\n", err); } +/* + * From v5.4, bpf_get_netns_cookie can be called for bpf cgroup hooks, from v5.15, it can be called for bpf sockops + * hook. Therefore, ensure that function is correctly used. + */ static inline bool is_kmesh_enabled(struct bpf_sock_addr *ctx) { - __u64 cookie = bpf_get_netns_cookie(ctx); - return bpf_map_lookup_elem(&map_of_manager, &cookie); + struct manager_key key = {0}; + key.netns_cookie = bpf_get_netns_cookie(ctx); + return bpf_map_lookup_elem(&map_of_manager, &key); } +/* + * From v5.4, bpf_get_netns_cookie can be called for bpf cgroup hooks, from v5.15, it can be called for bpf sockops + * hook. Therefore, ensure that function is correctly used. + */ static inline bool is_bypass_enabled(struct bpf_sock_addr *ctx) { - __u64 cookie = bpf_get_netns_cookie(ctx); - manager_value_t *value = bpf_map_lookup_elem(&map_of_manager, &cookie); + struct manager_key key = {0}; + key.netns_cookie = bpf_get_netns_cookie(ctx); + manager_value_t *value = bpf_map_lookup_elem(&map_of_manager, &key); if (!value) return false; @@ -82,11 +114,17 @@ static inline bool is_bypass_enabled(struct bpf_sock_addr *ctx) return value->is_bypassed; } -static inline void remove_manager_netns_cookie(struct bpf_map *map, struct bpf_sock_addr *ctx) +/* + * From v5.4, bpf_get_netns_cookie can be called for bpf cgroup hooks, from v5.15, it can be called for bpf sockops + * hook. Therefore, ensure that function is correctly used. + */ +static inline void remove_manager_netns_cookie(struct bpf_sock_addr *ctx) { int err; - __u64 cookie = bpf_get_netns_cookie(ctx); - err = bpf_map_delete_elem(map, &cookie); + struct manager_key key = {0}; + key.netns_cookie = bpf_get_netns_cookie(ctx); + + err = bpf_map_delete_elem(&map_of_manager, &key); if (err && err != -ENOENT) BPF_LOG(ERR, KMESH, "remove netcookie failed!, err is %d\n", err); } @@ -126,14 +164,14 @@ static inline bool conn_from_cni_sim_delete(struct bpf_sock_addr *ctx) static inline bool handle_kmesh_manage_process(struct bpf_sock_addr *ctx) { if (conn_from_cni_sim_add(ctx)) { - record_manager_netns_cookie(&map_of_manager, ctx); + record_manager_netns_cookie(ctx); // return failed, cni sim connect 0.0.0.1:929(0x3a1) // A normal program will not connect to this IP address return true; } if (conn_from_cni_sim_delete(ctx)) { - remove_manager_netns_cookie(&map_of_manager, ctx); + remove_manager_netns_cookie(ctx); return true; } return false; @@ -155,4 +193,6 @@ static inline bool handle_bypass_process(struct bpf_sock_addr *ctx) return true; } return false; -} \ No newline at end of file +} + +#endif \ No newline at end of file diff --git a/bpf/include/common.h b/bpf/include/common.h index 0bbab1188..3706001e8 100644 --- a/bpf/include/common.h +++ b/bpf/include/common.h @@ -36,6 +36,32 @@ #endif #define SEC_TAIL(ID, KEY) SEC(__stringify(ID) "/" __stringify(KEY)) +struct ip_addr { + union { + __u32 ip4; + __u32 ip6[4]; + }; +}; +#define IPV6_ADDR_LEN 16 + +/* +eBPF verifier verifies the eBPF PROG, including the read and write permissions of the CTX parameters. In the V4 and V6 +scenarios, the governance logic is similar except for the address information. However, the eBPF verifier strictly +checks the read and write operations of the ctx members. For example, v6-related variables cannot be read or written in +the v4 context. Therefore, to reuse the governance logic, kmesh_context defined to cache the input and output +information related to the address. +*/ +struct kmesh_context { + // input + struct bpf_sock_addr *ctx; + struct ip_addr orig_dst_addr; + + // output + struct ip_addr dnat_ip; + __u32 dnat_port; + bool via_waypoint; +}; + static inline void *kmesh_map_lookup_elem(void *map, const void *key) { return bpf_map_lookup_elem(map, key); @@ -53,7 +79,7 @@ static inline int kmesh_map_update_elem(void *map, const void *key, const void * } #if OE_23_03 -#define bpf__strncmp bpf_strncmp +#define bpf__strncmp bpf_strncmp #define GET_SKOPS_REMOTE_PORT(sk_ops) (__u16)((sk_ops)->remote_port) #else #define GET_SKOPS_REMOTE_PORT(sk_ops) (__u16)((sk_ops)->remote_port >> 16) @@ -61,4 +87,4 @@ static inline int kmesh_map_update_elem(void *map, const void *key, const void * #define GET_SKOPS_LOCAL_PORT(sk_ops) (__u16)((sk_ops)->local_port) -#endif // _COMMON_H_ +#endif // _COMMON_H_ diff --git a/bpf/kmesh/workload/cgroup_sock.c b/bpf/kmesh/workload/cgroup_sock.c index 741fa76d8..d456bf53b 100644 --- a/bpf/kmesh/workload/cgroup_sock.c +++ b/bpf/kmesh/workload/cgroup_sock.c @@ -4,20 +4,22 @@ #include #include #include +#include #include "bpf_log.h" #include "ctx/sock_addr.h" #include "frontend.h" #include "bpf_common.h" -static inline int sock4_traffic_control(struct bpf_sock_addr *ctx) +static inline int sock4_traffic_control(struct kmesh_context *kmesh_ctx) { int ret; frontend_value *frontend_v = NULL; + struct bpf_sock_addr *ctx = kmesh_ctx->ctx; if (ctx->protocol != IPPROTO_TCP) return 0; - DECLARE_FRONTEND_KEY(ctx, frontend_k); + DECLARE_FRONTEND_KEY(ctx, &kmesh_ctx->orig_dst_addr, frontend_k); DECLARE_VAR_IPV4(ctx->user_ip4, ip); BPF_LOG(DEBUG, KMESH, "origin addr=[%pI4h:%u]\n", &ip, bpf_ntohs(ctx->user_port)); @@ -27,7 +29,7 @@ static inline int sock4_traffic_control(struct bpf_sock_addr *ctx) } BPF_LOG(DEBUG, KMESH, "bpf find frontend addr=[%pI4h:%u]\n", &ip, bpf_ntohs(ctx->user_port)); - ret = frontend_manager(ctx, frontend_v); + ret = frontend_manager(kmesh_ctx, frontend_v); if (ret != 0) { if (ret != -ENOENT) BPF_LOG(ERR, KMESH, "frontend_manager failed, ret:%d\n", ret); @@ -40,6 +42,12 @@ static inline int sock4_traffic_control(struct bpf_sock_addr *ctx) SEC("cgroup/connect4") int cgroup_connect4_prog(struct bpf_sock_addr *ctx) { + struct kmesh_context kmesh_ctx = {0}; + kmesh_ctx.ctx = ctx; + kmesh_ctx.orig_dst_addr.ip4 = ctx->user_ip4; + kmesh_ctx.dnat_ip.ip4 = ctx->user_ip4; + kmesh_ctx.dnat_port = ctx->user_port; + if (handle_kmesh_manage_process(ctx) || !is_kmesh_enabled(ctx)) { return CGROUP_SOCK_OK; } @@ -48,8 +56,19 @@ int cgroup_connect4_prog(struct bpf_sock_addr *ctx) return CGROUP_SOCK_OK; } - int ret = sock4_traffic_control(ctx); + int ret = sock4_traffic_control(&kmesh_ctx); + if (ret) { + BPF_LOG(ERR, KMESH, "sock_traffic_control failed: %d\n", ret); + return CGROUP_SOCK_OK; + } + SET_CTX_ADDRESS4(ctx, &kmesh_ctx.dnat_ip, kmesh_ctx.dnat_port); + if (kmesh_ctx.via_waypoint) { + kmesh_workload_tail_call(ctx, TAIL_CALL_CONNECT4_INDEX); + + // if tail call failed will run this code + BPF_LOG(ERR, KMESH, "workload tail call failed, err is %d\n", ret); + } return CGROUP_SOCK_OK; } diff --git a/bpf/kmesh/workload/include/backend.h b/bpf/kmesh/workload/include/backend.h index 9e95d98cc..9b850c763 100644 --- a/bpf/kmesh/workload/include/backend.h +++ b/bpf/kmesh/workload/include/backend.h @@ -15,49 +15,58 @@ static inline backend_value *map_lookup_backend(const backend_key *key) return kmesh_map_lookup_elem(&map_of_backend, key); } -static inline int waypoint_manager(ctx_buff_t *ctx, __u32 addr, __u32 port) +static inline int waypoint_manager(struct kmesh_context *kmesh_ctx, struct ip_addr *wp_addr, __u32 port) { int ret; address_t target_addr; + ctx_buff_t *ctx = (ctx_buff_t *)kmesh_ctx->ctx; __u64 *sk = (__u64 *)ctx->sk; struct bpf_sock_tuple value_tuple = {0}; - value_tuple.ipv4.daddr = ctx->user_ip4; - value_tuple.ipv4.dport = ctx->user_port; - - ret = bpf_map_update_elem(&map_of_dst_info, &sk, &value_tuple, BPF_NOEXIST); + if (ctx->family == AF_INET) { + value_tuple.ipv4.daddr = kmesh_ctx->orig_dst_addr.ip4; + value_tuple.ipv4.dport = ctx->user_port; + } else if (ctx->family == AF_INET6) { + bpf_memcpy(value_tuple.ipv6.daddr, kmesh_ctx->orig_dst_addr.ip6, IPV6_ADDR_LEN); + value_tuple.ipv6.dport = ctx->user_port; + } else { + BPF_LOG(ERR, BACKEND, "invalid ctx family: %u\n", ctx->family); + return -1; + } + ret = bpf_map_update_elem(&map_of_dst_info, &(sk), &value_tuple, BPF_NOEXIST); if (ret) { BPF_LOG(ERR, BACKEND, "record metadata origin address and port failed, ret is %d\n", ret); return ret; } - target_addr.ipv4 = addr; - target_addr.port = port; - SET_CTX_ADDRESS(ctx, target_addr); - kmesh_workload_tail_call(ctx, TAIL_CALL_CONNECT4_INDEX); - // if tail call failed will run this code - BPF_LOG(ERR, BACKEND, "workload tail call failed, err is %d\n", ret); - return -ENOEXEC; + if (ctx->user_family == AF_INET) + kmesh_ctx->dnat_ip.ip4 = wp_addr->ip4; + else + bpf_memcpy(kmesh_ctx->dnat_ip.ip6, wp_addr->ip6, IPV6_ADDR_LEN); + kmesh_ctx->dnat_port = port; + kmesh_ctx->via_waypoint = true; + return 0; } -static inline int backend_manager(ctx_buff_t *ctx, backend_value *backend_v, __u32 service_id, service_value *service_v) +static inline int +backend_manager(struct kmesh_context *kmesh_ctx, backend_value *backend_v, __u32 service_id, service_value *service_v) { int ret; - address_t target_addr; + ctx_buff_t *ctx = (ctx_buff_t *)kmesh_ctx->ctx; __u32 user_port = ctx->user_port; - if (backend_v->waypoint_addr != 0 && backend_v->waypoint_port != 0) { + if (backend_v->waypoint_port != 0) { BPF_LOG( DEBUG, BACKEND, "find waypoint addr=[%pI4h:%u]", - &backend_v->waypoint_addr, + &backend_v->wp_addr.ip4, bpf_ntohs(backend_v->waypoint_port)); - ret = waypoint_manager(ctx, backend_v->waypoint_addr, backend_v->waypoint_port); - if (ret == -ENOEXEC) { - BPF_LOG(ERR, BACKEND, "waypoint_manager failed, ret:%d\n", ret); - return ret; + ret = waypoint_manager(kmesh_ctx, &backend_v->wp_addr, backend_v->waypoint_port); + if (ret != 0) { + BPF_LOG(ERR, BACKEND, "waypoint_manager failed, ret: %d\n", ret); } + return ret; } #pragma unroll @@ -71,15 +80,18 @@ static inline int backend_manager(ctx_buff_t *ctx, backend_value *backend_v, __u #pragma unroll for (__u32 j = 0; j < MAX_PORT_COUNT; j++) { if (user_port == service_v->service_port[j]) { - target_addr.ipv4 = backend_v->ipv4; - target_addr.port = service_v->target_port[j]; - SET_CTX_ADDRESS(ctx, target_addr); + if (ctx->user_family == AF_INET) + kmesh_ctx->dnat_ip.ip4 = backend_v->addr.ip4; + else + bpf_memcpy(kmesh_ctx->dnat_ip.ip6, backend_v->addr.ip6, IPV6_ADDR_LEN); + kmesh_ctx->dnat_port = service_v->target_port[j]; + kmesh_ctx->via_waypoint = false; BPF_LOG( DEBUG, BACKEND, "get the backend addr=[%pI4h:%u]", - &target_addr.ipv4, - bpf_ntohs(target_addr.port)); + &kmesh_ctx->dnat_ip.ip4, + bpf_ntohs(kmesh_ctx->dnat_port)); return 0; } } diff --git a/bpf/kmesh/workload/include/ctx/sock_addr.h b/bpf/kmesh/workload/include/ctx/sock_addr.h index 714be0f1c..f784f36bc 100644 --- a/bpf/kmesh/workload/include/ctx/sock_addr.h +++ b/bpf/kmesh/workload/include/ctx/sock_addr.h @@ -11,12 +11,26 @@ typedef enum { typedef struct bpf_sock_addr ctx_buff_t; -#define DECLARE_FRONTEND_KEY(ctx, key) \ +#define DECLARE_FRONTEND_KEY(ctx, ctx_vip, key) \ frontend_key key = {0}; \ - key.ipv4 = (ctx)->user_ip4 + if (ctx->user_family == AF_INET) \ + key.addr.ip4 = (ctx_vip)->ip4; \ + else \ + bpf_memcpy(key.addr.ip6, (ctx_vip)->ip6, IPV6_ADDR_LEN) -#define SET_CTX_ADDRESS(ctx, address) \ - (ctx)->user_ip4 = (address).ipv4; \ - (ctx)->user_port = (address).port +#define SET_CTX_ADDRESS4(ctx, addr, port) \ + do { \ + if (ctx->user_family == AF_INET) { \ + (ctx)->user_ip4 = (addr)->ip4; \ + (ctx)->user_port = port; \ + } \ + } while (0) +#define SET_CTX_ADDRESS6(ctx, addr, port) \ + do { \ + if (ctx->user_family == AF_INET6) { \ + bpf_memcpy((ctx)->user_ip6, (addr)->ip6, IPV6_ADDR_LEN); \ + (ctx)->user_port = port; \ + } \ + } while (0) #endif //__BPF_CTX_SOCK_ADDR_H diff --git a/bpf/kmesh/workload/include/encoder.h b/bpf/kmesh/workload/include/encoder.h index 30ed3fff2..879d64401 100644 --- a/bpf/kmesh/workload/include/encoder.h +++ b/bpf/kmesh/workload/include/encoder.h @@ -1,12 +1,12 @@ /* SPDX-License-Identifier: (GPL-2.0-only OR BSD-2-Clause) */ /* Copyright Authors of Kmesh */ -#include "config.h" -#include "common.h" - #ifndef __ENCODER_H__ #define __ENCODER_H__ +#include "config.h" +#include "common.h" + struct { __uint(type, BPF_MAP_TYPE_HASH); __type(key, __u64); diff --git a/bpf/kmesh/workload/include/endpoint.h b/bpf/kmesh/workload/include/endpoint.h index f0cb089d9..f5139a10f 100644 --- a/bpf/kmesh/workload/include/endpoint.h +++ b/bpf/kmesh/workload/include/endpoint.h @@ -12,8 +12,8 @@ static inline endpoint_value *map_lookup_endpoint(const endpoint_key *key) return kmesh_map_lookup_elem(&map_of_endpoint, key); } -static inline int -endpoint_manager(ctx_buff_t *ctx, endpoint_value *endpoint_v, __u32 service_id, service_value *service_v) +static inline int endpoint_manager( + struct kmesh_context *kmesh_ctx, endpoint_value *endpoint_v, __u32 service_id, service_value *service_v) { int ret = 0; backend_key backend_k = {0}; @@ -26,7 +26,7 @@ endpoint_manager(ctx_buff_t *ctx, endpoint_value *endpoint_v, __u32 service_id, return -ENOENT; } - ret = backend_manager(ctx, backend_v, service_id, service_v); + ret = backend_manager(kmesh_ctx, backend_v, service_id, service_v); if (ret != 0) { if (ret != -ENOENT) BPF_LOG(ERR, ENDPOINT, "backend_manager failed, ret:%d\n", ret); diff --git a/bpf/kmesh/workload/include/frontend.h b/bpf/kmesh/workload/include/frontend.h index 291d37e2c..45dd69201 100644 --- a/bpf/kmesh/workload/include/frontend.h +++ b/bpf/kmesh/workload/include/frontend.h @@ -13,7 +13,7 @@ static inline frontend_value *map_lookup_frontend(const frontend_key *key) return kmesh_map_lookup_elem(&map_of_frontend, key); } -static inline int frontend_manager(ctx_buff_t *ctx, frontend_value *frontend_v) +static inline int frontend_manager(struct kmesh_context *kmesh_ctx, frontend_value *frontend_v) { int ret = 0; service_key service_k = {0}; @@ -36,21 +36,21 @@ static inline int frontend_manager(ctx_buff_t *ctx, frontend_value *frontend_v) if (direct_backend) { // For pod direct access, if a pod has watpoint captured, we will redirect to waypoint, otherwise we do nothing. - if (backend_v->waypoint_addr != 0 && backend_v->waypoint_port != 0) { + if (backend_v->waypoint_port != 0) { BPF_LOG( DEBUG, FRONTEND, "find waypoint addr=[%pI4h:%u]", - &backend_v->waypoint_addr, + &backend_v->wp_addr.ip4, bpf_ntohs(backend_v->waypoint_port)); - ret = waypoint_manager(ctx, backend_v->waypoint_addr, backend_v->waypoint_port); - if (ret == -ENOEXEC) { + ret = waypoint_manager(kmesh_ctx, &backend_v->wp_addr, backend_v->waypoint_port); + if (ret != 0) { BPF_LOG(ERR, BACKEND, "waypoint_manager failed, ret:%d\n", ret); - return ret; } + return ret; } } else { - ret = service_manager(ctx, frontend_v->upstream_id, service_v); + ret = service_manager(kmesh_ctx, frontend_v->upstream_id, service_v); if (ret != 0) { if (ret != -ENOENT) BPF_LOG(ERR, FRONTEND, "service_manager failed, ret:%d\n", ret); diff --git a/bpf/kmesh/workload/include/service.h b/bpf/kmesh/workload/include/service.h index 0dee9b951..023a7aca2 100644 --- a/bpf/kmesh/workload/include/service.h +++ b/bpf/kmesh/workload/include/service.h @@ -12,7 +12,7 @@ static inline service_value *map_lookup_service(const service_key *key) return kmesh_map_lookup_elem(&map_of_service, key); } -static inline int lb_random_handle(ctx_buff_t *ctx, __u32 service_id, service_value *service_v) +static inline int lb_random_handle(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v) { int ret = 0; endpoint_key endpoint_k = {0}; @@ -27,7 +27,7 @@ static inline int lb_random_handle(ctx_buff_t *ctx, __u32 service_id, service_va return -ENOENT; } - ret = endpoint_manager(ctx, endpoint_v, service_id, service_v); + ret = endpoint_manager(kmesh_ctx, endpoint_v, service_id, service_v); if (ret != 0) { if (ret != -ENOENT) BPF_LOG(ERR, SERVICE, "endpoint_manager failed, ret:%d\n", ret); @@ -37,28 +37,28 @@ static inline int lb_random_handle(ctx_buff_t *ctx, __u32 service_id, service_va return 0; } -static inline int service_manager(ctx_buff_t *ctx, __u32 service_id, service_value *service_v) +static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v) { int ret = 0; - if (service_v->waypoint_addr != 0 && service_v->waypoint_port != 0) { + if (service_v->wp_addr.ip4 != 0 && service_v->waypoint_port != 0) { BPF_LOG( DEBUG, SERVICE, "find waypoint addr=[%pI4h:%u]", - &service_v->waypoint_addr, + &service_v->wp_addr.ip4, bpf_ntohs(service_v->waypoint_port)); - ret = waypoint_manager(ctx, service_v->waypoint_addr, service_v->waypoint_port); - if (ret == -ENOEXEC) { + ret = waypoint_manager(kmesh_ctx, &service_v->wp_addr, service_v->waypoint_port); + if (ret != 0) { BPF_LOG(ERR, BACKEND, "waypoint_manager failed, ret:%d\n", ret); - return ret; } + return ret; } BPF_LOG(DEBUG, SERVICE, "load balance type:%u", service_v->lb_policy); switch (service_v->lb_policy) { case LB_POLICY_RANDOM: - ret = lb_random_handle(ctx, service_id, service_v); + ret = lb_random_handle(kmesh_ctx, service_id, service_v); break; defalut: BPF_LOG(ERR, SERVICE, "unsupport load balance type:%u\n", service_v->lb_policy); diff --git a/bpf/kmesh/workload/include/workload.h b/bpf/kmesh/workload/include/workload.h index 2c18fab13..68c0e7617 100644 --- a/bpf/kmesh/workload/include/workload.h +++ b/bpf/kmesh/workload/include/workload.h @@ -12,7 +12,7 @@ // frontend map typedef struct { - __u32 ipv4; // Service ip or Pod ip + struct ip_addr addr; // Service ip or Pod ip } __attribute__((packed)) frontend_key; typedef struct { @@ -30,7 +30,7 @@ typedef struct { __u32 service_port[MAX_PORT_COUNT]; // service_port[i] and target_port[i] are a pair, i starts from 0 and max value // is MAX_PORT_COUNT-1 __u32 target_port[MAX_PORT_COUNT]; - __u32 waypoint_addr; + struct ip_addr wp_addr; __u32 waypoint_port; } __attribute__((packed)) service_value; @@ -50,10 +50,10 @@ typedef struct { } __attribute__((packed)) backend_key; typedef struct { - __u32 ipv4; // backend ip + struct ip_addr addr; __u32 service_count; __u32 service[MAX_SERVICE_COUNT]; - __u32 waypoint_addr; + struct ip_addr wp_addr; __u32 waypoint_port; } __attribute__((packed)) backend_value; diff --git a/bpf/kmesh/workload/sendmsg.c b/bpf/kmesh/workload/sendmsg.c index a62f07767..583d1297a 100644 --- a/bpf/kmesh/workload/sendmsg.c +++ b/bpf/kmesh/workload/sendmsg.c @@ -2,6 +2,7 @@ /* Copyright Authors of Kmesh */ #include +#include #include #include #include "bpf_log.h" @@ -30,14 +31,43 @@ #define TLV_TYPE_SIZE 1 #define TLV_LENGTH_SIZE 4 -#define TLV_DST_LENGTH 6 -#define TLV_DST_SIZE 11 -#define TLV_END_SIZE 5 +#define TLV_IP4_LENGTH 4 +#define TLV_IP6_LENGTH 16 +#define TLV_PORT_LENGTH 2 -#define FORMAT_IP_LENGTH 16 +/* +[dst_ip4] - 4 bytes +[dst_port] - 2 bytes +*/ +#define TLV_ORG_DST_ADDR4_LENGTH (TLV_IP4_LENGTH + TLV_PORT_LENGTH) +#define TLV_ORG_DST_ADDR6_LENGTH (TLV_IP6_LENGTH + TLV_PORT_LENGTH) + +/* +[TYPE] - TLV_TYPE_SIZE byte +[length] - TLV_LENGTH_SIZE bytes +[dst_ip4] - TLV_IP4_LENGTH bytes +[dst_port] - TLV_PORT_LENGTH bytes +*/ +#define TLV_ORG_DST_ADDR4_SIZE (TLV_TYPE_SIZE + TLV_LENGTH_SIZE + TLV_ORG_DST_ADDR4_LENGTH) + +#define TLV_ORG_DST_ADDR6_SIZE (TLV_TYPE_SIZE + TLV_LENGTH_SIZE + TLV_ORG_DST_ADDR6_LENGTH) +/* +An empty TLV block indicates the end, e.g: +[type=0xfe] +[length = 0] +*/ +#define TLV_END_SIZE (TLV_TYPE_SIZE + TLV_LENGTH_SIZE) +#define FORMAT_IP_LENGTH 16 +/* +tlv struct +[TYPE] - 1 byte, define tlv block type +[length] - 4 bytes, size of value +[value] - 'length' bytes, payload +*/ enum TLV_TYPE { - TLV_DST_INFO = 0x01, + TLV_ORG_DST_ADDR4 = 0x01, + TLV_ORG_DST_ADDR6 = 0x02, TLV_PAYLOAD = 0xfe, }; @@ -50,33 +80,7 @@ static inline int check_overflow(struct sk_msg_md *msg, __u8 *begin, __u32 lengt return 0; } -static inline int _encode_tlv_type(struct sk_msg_md *msg, enum TLV_TYPE type, __u32 off) -{ - __u8 *begin = (__u8 *)(msg->data) + off; - if (check_overflow(msg, begin, 1)) - return off; - *begin = (__u8)type; - - return off + TLV_TYPE_SIZE; // cost 1 byte -} - -static inline int _encode_tlv_length(struct sk_msg_md *msg, __u32 length, __u32 off) -{ - __u32 *begin = (__u32 *)((__u8 *)(msg->data) + off); - if (check_overflow(msg, (__u8 *)begin, 4)) - return off; - *begin = bpf_htonl(length); - return off + TLV_LENGTH_SIZE; // cost 4 byte -} - -static inline int encode_metadata_end(struct sk_msg_md *msg, __u32 off) -{ - off = _encode_tlv_type(msg, TLV_PAYLOAD, off); - off = _encode_tlv_length(msg, 0, off); - return off; -} - -static inline int get_origin_dst(struct sk_msg_md *msg, __u32 *dst_ip, __u16 *dst_port) +static inline int get_origin_dst(struct sk_msg_md *msg, struct ip_addr *dst_ip, __u16 *dst_port) { __u64 *current_sk = (__u64 *)msg->sk; struct bpf_sock_tuple *dst; @@ -84,8 +88,15 @@ static inline int get_origin_dst(struct sk_msg_md *msg, __u32 *dst_ip, __u16 *ds dst = bpf_map_lookup_elem(&map_of_dst_info, ¤t_sk); if (!dst) return -ENOENT; - *dst_ip = dst->ipv4.daddr; - *dst_port = dst->ipv4.dport; + + if (msg->family == AF_INET) { + dst_ip->ip4 = dst->ipv4.daddr; + *dst_port = dst->ipv4.dport; + } else { + bpf_memcpy(dst_ip->ip6, dst->ipv6.daddr, IPV6_ADDR_LEN); + *dst_port = dst->ipv6.dport; + } + bpf_map_delete_elem(&map_of_dst_info, ¤t_sk); return 0; } @@ -101,55 +112,73 @@ static inline int alloc_dst_length(struct sk_msg_md *msg, __u32 length) return 0; } -static inline void encode_metadata_dst(struct sk_msg_md *msg, __u32 off) +static inline void sk_msg_write_buf(struct sk_msg_md *msg, __u32 *off, __u8 *data, __u32 len) +{ + __u8 *begin = (__u8 *)(msg->data) + *off; + if (check_overflow(msg, begin, len)) { + BPF_LOG(ERR, SENDMSG, "sk msg write buf overflow, off: %u, len: %u\n", *off, len); + return; + } + + bpf_memcpy(begin, data, len); + *off += len; + return; +} + +static inline void encode_metadata_end(struct sk_msg_md *msg, __u32 *off) { - __u32 dst_ip; + __u8 type = TLV_PAYLOAD; + __u32 size = 0; + + sk_msg_write_buf(msg, off, &type, TLV_TYPE_SIZE); + sk_msg_write_buf(msg, off, &size, TLV_LENGTH_SIZE); + return; +} + +static inline void encode_metadata_org_dst_addr(struct sk_msg_md *msg, __u32 *off, bool v4) +{ + struct ip_addr dst_ip = {0}; __u16 dst_port; - __u32 *msg_dst_ip_loc; - __u16 *msg_dst_port_loc; + __u8 type = (v4 ? TLV_ORG_DST_ADDR4 : TLV_ORG_DST_ADDR6); + __u32 tlv_size = (v4 ? TLV_ORG_DST_ADDR4_SIZE : TLV_ORG_DST_ADDR6_SIZE); + __u32 addr_size = (v4 ? TLV_ORG_DST_ADDR4_LENGTH : TLV_ORG_DST_ADDR6_LENGTH); if (get_origin_dst(msg, &dst_ip, &dst_port)) return; - if (alloc_dst_length(msg, TLV_DST_SIZE + TLV_END_SIZE)) + if (alloc_dst_length(msg, tlv_size + TLV_END_SIZE)) return; BPF_LOG(DEBUG, SENDMSG, "get valid dst, do encoding...\n"); - off = _encode_tlv_type(msg, TLV_DST_INFO, off); - off = _encode_tlv_length(msg, TLV_DST_LENGTH, off); + // write T + sk_msg_write_buf(msg, off, &type, TLV_TYPE_SIZE); - msg_dst_ip_loc = (__u32 *)((__u8 *)msg->data + off); - if (check_overflow(msg, (__u8 *)msg_dst_ip_loc, 4)) - return; - *msg_dst_ip_loc = dst_ip; - off += 4; + // write L + addr_size = bpf_htonl(addr_size); + sk_msg_write_buf(msg, off, &addr_size, TLV_LENGTH_SIZE); - msg_dst_port_loc = (__u16 *)((__u8 *)msg->data + off); - if (check_overflow(msg, (__u8 *)msg_dst_port_loc, 2)) - return; - *msg_dst_port_loc = dst_port; - off += 2; + // write V + if (v4) + sk_msg_write_buf(msg, off, (__u8 *)&dst_ip.ip4, TLV_IP4_LENGTH); + else + sk_msg_write_buf(msg, off, (__u8 *)dst_ip.ip6, TLV_IP6_LENGTH); + sk_msg_write_buf(msg, off, &dst_port, TLV_PORT_LENGTH); + // write END encode_metadata_end(msg, off); -} - -static inline void encode_metadata(struct sk_msg_md *msg, enum TLV_TYPE type, __u32 off) -{ - switch (type) { - case TLV_DST_INFO: { - encode_metadata_dst(msg, off); - break; - } - default: - break; - } + return; } SEC("sk_msg") -int sendmsg(struct sk_msg_md *msg) +int sendmsg_prog(struct sk_msg_md *msg) { - encode_metadata(msg, TLV_DST_INFO, 0); + __u32 off = 0; + if (msg->family != AF_INET && msg->family != AF_INET6) + return SK_PASS; + + // encode org dst addr + encode_metadata_org_dst_addr(msg, &off, (msg->family == AF_INET)); return SK_PASS; } diff --git a/bpf/kmesh/workload/sockops.c b/bpf/kmesh/workload/sockops.c index 946341085..c88b59303 100644 --- a/bpf/kmesh/workload/sockops.c +++ b/bpf/kmesh/workload/sockops.c @@ -32,37 +32,56 @@ struct { __uint(map_flags, 0); } map_of_kmesh_socket SEC(".maps"); -static inline bool is_managed_by_kmesh(__u32 ip) +static inline bool is_managed_by_kmesh(__u32 family, __u32 ip4, __u32 *ip6) { - __u64 key = ip; - int *value = bpf_map_lookup_elem(&map_of_manager, &key); + struct manager_key key = {0}; + if (family == AF_INET) + key.addr.ip4 = ip4; + if (family == AF_INET6 && ip6) + bpf_memcpy(key.addr.ip6, ip6, IPV6_ADDR_LEN); + int *value = bpf_map_lookup_elem(&map_of_manager, &key); if (!value) return false; - return (*value == 0); } static inline void extract_skops_to_tuple(struct bpf_sock_ops *skops, struct bpf_sock_tuple *tuple_key) { - tuple_key->ipv4.saddr = skops->local_ip4; - tuple_key->ipv4.daddr = skops->remote_ip4; - // local_port is host byteorder - tuple_key->ipv4.sport = bpf_htons(GET_SKOPS_LOCAL_PORT(skops)); - // remote_port is network byteorder - // openEuler 2303 convert remote port different than other linux vendor - - tuple_key->ipv4.dport = GET_SKOPS_REMOTE_PORT(skops); + if (skops->family == AF_INET) { + tuple_key->ipv4.saddr = skops->local_ip4; + tuple_key->ipv4.daddr = skops->remote_ip4; + // local_port is host byteorder, need to htons + tuple_key->ipv4.sport = bpf_htons(GET_SKOPS_LOCAL_PORT(skops)); + // remote_port is network byteorder + tuple_key->ipv4.dport = GET_SKOPS_REMOTE_PORT(skops); + } else { + bpf_memcpy(tuple_key->ipv6.saddr, skops->local_ip6, IPV6_ADDR_LEN); + bpf_memcpy(tuple_key->ipv6.daddr, skops->remote_ip6, IPV6_ADDR_LEN); + // local_port is host byteorder, need to htons + tuple_key->ipv6.sport = bpf_htons(GET_SKOPS_LOCAL_PORT(skops)); + // remote_port is network byteorder + tuple_key->ipv6.dport = GET_SKOPS_REMOTE_PORT(skops); + } } static inline void extract_skops_to_tuple_reverse(struct bpf_sock_ops *skops, struct bpf_sock_tuple *tuple_key) { - tuple_key->ipv4.saddr = skops->remote_ip4; - tuple_key->ipv4.daddr = skops->local_ip4; - // remote_port is network byteorder - tuple_key->ipv4.sport = GET_SKOPS_REMOTE_PORT(skops); - // local_port is host byteorder - tuple_key->ipv4.dport = bpf_htons(GET_SKOPS_LOCAL_PORT(skops)); + if (skops->family == AF_INET) { + tuple_key->ipv4.saddr = skops->remote_ip4; + tuple_key->ipv4.daddr = skops->local_ip4; + // remote_port is network byteorder + tuple_key->ipv4.sport = GET_SKOPS_REMOTE_PORT(skops); + // local_port is host byteorder + tuple_key->ipv4.dport = bpf_htons(GET_SKOPS_LOCAL_PORT(skops)); + } else { + bpf_memcpy(tuple_key->ipv6.saddr, skops->remote_ip6, IPV6_ADDR_LEN); + bpf_memcpy(tuple_key->ipv6.daddr, skops->local_ip6, IPV6_ADDR_LEN); + // remote_port is network byteorder + tuple_key->ipv6.sport = GET_SKOPS_REMOTE_PORT(skops); + // local_port is host byteorder + tuple_key->ipv6.dport = bpf_htons(GET_SKOPS_LOCAL_PORT(skops)); + } } // clean map_of_auth @@ -112,7 +131,7 @@ static inline void auth_ip_tuple(struct bpf_sock_ops *skops) // the server info when we transmitted to the kmesh auth info. // In this way, auth can be performed normally. extract_skops_to_tuple_reverse(skops, &(*msg).tuple); - (*msg).type = (__u32)IPV4; + (*msg).type = (skops->family == AF_INET) ? IPV4 : IPV6; record_auth_processing(skops); bpf_ringbuf_submit(msg, 0); } @@ -128,22 +147,31 @@ static inline void enable_encoding_metadata(struct bpf_sock_ops *skops) BPF_LOG(ERR, SOCKOPS, "enable encoding metadta failed!, err is %d", err); } -static inline void record_kmesh_managed_ip(__u32 ip) +static inline void record_kmesh_managed_ip(__u32 family, __u32 ip4, __u32 *ip6) { int err; manager_value_t value = { .is_bypassed = 0, }; + struct manager_key key = {0}; + if (family == AF_INET) + key.addr.ip4 = ip4; + if (family == AF_INET6 && ip6) + bpf_memcpy(key.addr.ip6, ip6, IPV6_ADDR_LEN); - __u64 key = ip; err = bpf_map_update_elem(&map_of_manager, &key, &value, BPF_NOEXIST); if (err) BPF_LOG(ERR, KMESH, "record ip failed!, err is %d\n", err); } -static inline void remove_kmesh_managed_ip(__u32 ip) +static inline void remove_kmesh_managed_ip(__u32 family, __u32 ip4, __u32 *ip6) { - __u64 key = ip; + struct manager_key key = {0}; + if (family == AF_INET) + key.addr.ip4 = ip4; + if (family == AF_INET6 && ip6) + bpf_memcpy(key.addr.ip6, ip6, IPV6_ADDR_LEN); + int err = bpf_map_delete_elem(&map_of_manager, &key); if (err && err != -ENOENT) BPF_LOG(ERR, KMESH, "remove ip failed!, err is %d\n", err); @@ -185,9 +213,14 @@ static inline bool skops_conn_from_bypass_sim_delete(struct bpf_sock_ops *skops) return conn_from_sim(skops, 1, DISABLE_BYPASS_PORT); } -static inline void set_bypass_value(__u32 ip, int new_bypass_value) +static inline void set_bypass_value(__u32 family, __u32 ip4, __u32 *ip6, int new_bypass_value) { - __u64 key = ip; + struct manager_key key = {0}; + if (family == AF_INET) + key.addr.ip4 = ip4; + if (family == AF_INET6 && ip6) + bpf_memcpy(key.addr.ip6, ip6, IPV6_ADDR_LEN); + manager_value_t *current_value = bpf_map_lookup_elem(&map_of_manager, &key); if (!current_value || current_value->is_bypassed == new_bypass_value) return; @@ -207,17 +240,17 @@ static inline bool ipv4_mapped_addr(__u32 ip6[4]) static inline void skops_handle_kmesh_managed_process(struct bpf_sock_ops *skops) { if (skops_conn_from_cni_sim_add(skops)) - record_kmesh_managed_ip(skops->local_ip4); + record_kmesh_managed_ip(skops->family, skops->local_ip4, skops->local_ip6); if (skops_conn_from_cni_sim_delete(skops)) - remove_kmesh_managed_ip(skops->local_ip4); + remove_kmesh_managed_ip(skops->family, skops->local_ip4, skops->local_ip6); } static inline void skops_handle_bypass_process(struct bpf_sock_ops *skops) { if (skops_conn_from_bypass_sim_add(skops)) - set_bypass_value(skops->local_ip4, 1); + set_bypass_value(skops->family, skops->local_ip4, skops->local_ip6, 1); if (skops_conn_from_bypass_sim_delete(skops)) - set_bypass_value(skops->local_ip4, 0); + set_bypass_value(skops->family, skops->local_ip4, skops->local_ip6, 0); } SEC("sockops") @@ -231,7 +264,7 @@ int sockops_prog(struct bpf_sock_ops *skops) skops_handle_bypass_process(skops); break; case BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB: - if (!is_managed_by_kmesh(skops->local_ip4)) // local ip4 is client ip + if (!is_managed_by_kmesh(skops->family, skops->local_ip4, NULL)) // local ip4 is client ip break; if (bpf_sock_ops_cb_flags_set(skops, BPF_SOCK_OPS_STATE_CB_FLAG) != 0) BPF_LOG(ERR, SOCKOPS, "set sockops cb failed!\n"); @@ -241,7 +274,7 @@ int sockops_prog(struct bpf_sock_ops *skops) enable_encoding_metadata(skops); break; case BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB: - if (!is_managed_by_kmesh(skops->local_ip4)) // local ip4 is server ip + if (!is_managed_by_kmesh(skops->family, skops->local_ip4, NULL)) // local ip4 is server ip break; if (bpf_sock_ops_cb_flags_set(skops, BPF_SOCK_OPS_STATE_CB_FLAG) != 0) BPF_LOG(ERR, SOCKOPS, "set sockops cb failed!\n"); diff --git a/bpf/kmesh/workload/xdp.c b/bpf/kmesh/workload/xdp.c index 549c83557..f0baba628 100644 --- a/bpf/kmesh/workload/xdp.c +++ b/bpf/kmesh/workload/xdp.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include "config.h" @@ -18,7 +19,10 @@ struct xdp_info { struct ethhdr *ethh; - struct iphdr *iph; + union { + struct iphdr *iph; + struct ipv6hdr *ip6h; + }; struct tcphdr *tcph; }; @@ -48,10 +52,17 @@ static inline int get_hdr_ptr(struct xdp_md *ctx, struct ethhdr **ethh, struct i static inline void parser_tuple(struct xdp_info *info, struct bpf_sock_tuple *tuple_info) { - tuple_info->ipv4.saddr = info->iph->saddr; - tuple_info->ipv4.daddr = info->iph->daddr; - tuple_info->ipv4.sport = info->tcph->source; - tuple_info->ipv4.dport = info->tcph->dest; + if (info->iph->version == 4) { + tuple_info->ipv4.saddr = info->iph->saddr; + tuple_info->ipv4.daddr = info->iph->daddr; + tuple_info->ipv4.sport = info->tcph->source; + tuple_info->ipv4.dport = info->tcph->dest; + } else { + bpf_memcpy((__u8 *)tuple_info->ipv6.saddr, info->ip6h->saddr.in6_u.u6_addr8, IPV6_ADDR_LEN); + bpf_memcpy((__u8 *)tuple_info->ipv6.daddr, info->ip6h->daddr.in6_u.u6_addr8, IPV6_ADDR_LEN); + tuple_info->ipv6.sport = info->tcph->source; + tuple_info->ipv6.dport = info->tcph->dest; + } } static inline void shutdown_tuple(struct xdp_info *info) @@ -94,6 +105,9 @@ int xdp_shutdown(struct xdp_md *ctx) if (parser_xdp_info(ctx, &info) == PARSER_FAILED) return XDP_PASS; + if (info.iph->version != 4 && info.iph->version != 6) + return XDP_PASS; + // never failed parser_tuple(&info, &tuple_info); ret = check_auth(&tuple_info); diff --git a/daemon/manager/manager.go b/daemon/manager/manager.go index 07839e6eb..70e46c4ac 100644 --- a/daemon/manager/manager.go +++ b/daemon/manager/manager.go @@ -76,7 +76,7 @@ func Execute(configs *options.BootstrapConfigs) error { log.Info("bpf Start successful") defer bpfLoader.Stop() - c := controller.NewController(configs.BpfConfig.Mode, configs.ByPassConfig.EnableByPass, bpfLoader.GetBpfKmeshWorkload()) + c := controller.NewController(configs, bpfLoader.GetBpfKmeshWorkload()) if err := c.Start(); err != nil { return err } diff --git a/daemon/options/options.go b/daemon/options/options.go index ed5cc341f..0c7724d8b 100644 --- a/daemon/options/options.go +++ b/daemon/options/options.go @@ -25,16 +25,18 @@ import ( ) type BootstrapConfigs struct { - BpfConfig *BpfConfig - CniConfig *cniConfig - ByPassConfig *byPassConfig + BpfConfig *BpfConfig + CniConfig *cniConfig + ByPassConfig *byPassConfig + SecretManagerConfig *secretConfig } func NewBootstrapConfigs() *BootstrapConfigs { return &BootstrapConfigs{ - BpfConfig: &BpfConfig{}, - CniConfig: &cniConfig{}, - ByPassConfig: &byPassConfig{}, + BpfConfig: &BpfConfig{}, + CniConfig: &cniConfig{}, + ByPassConfig: &byPassConfig{}, + SecretManagerConfig: &secretConfig{}, } } @@ -51,6 +53,7 @@ func (c *BootstrapConfigs) AttachFlags(cmd *cobra.Command) { c.BpfConfig.AttachFlags(cmd) c.CniConfig.AttachFlags(cmd) c.ByPassConfig.AttachFlags(cmd) + c.SecretManagerConfig.AttachFlags(cmd) } func (c *BootstrapConfigs) ParseConfigs() error { diff --git a/daemon/options/secret_manager.go b/daemon/options/secret_manager.go new file mode 100644 index 000000000..d33c112ec --- /dev/null +++ b/daemon/options/secret_manager.go @@ -0,0 +1,28 @@ +/* Copyright 2024 The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package options + +import ( + "github.com/spf13/cobra" +) + +type secretConfig struct { + Enable bool +} + +func (c *secretConfig) AttachFlags(cmd *cobra.Command) { + cmd.PersistentFlags().BoolVar(&c.Enable, "enable-secret-manager", false, "whether to start secret manager or not, default to false") +} diff --git a/deploy/helm/templates/daemonset.yaml b/deploy/helm/templates/daemonset.yaml index 2d1c0d066..c50319f83 100644 --- a/deploy/helm/templates/daemonset.yaml +++ b/deploy/helm/templates/daemonset.yaml @@ -68,6 +68,9 @@ spec: name: cni - mountPath: /opt/cni/bin name: kmesh-cni-install-path + - mountPath: /host/proc + name: host-procfs + readOnly: true - mountPath: /var/run/secrets/istio name: istiod-ca-cert - mountPath: /var/run/secrets/tokens @@ -90,6 +93,10 @@ spec: - hostPath: path: /opt/cni/bin name: kmesh-cni-install-path + - name: host-procfs + hostPath: + path: /proc + type: Directory - configMap: defaultMode: 420 name: istio-ca-root-cert @@ -102,4 +109,3 @@ spec: audience: istio-ca expirationSeconds: 43200 path: istio-token - hostPID: true diff --git a/deploy/yaml/kmesh.yaml b/deploy/yaml/kmesh.yaml index d38de2f9c..d9368f9d8 100644 --- a/deploy/yaml/kmesh.yaml +++ b/deploy/yaml/kmesh.yaml @@ -40,6 +40,10 @@ spec: - name: kmesh-cni-install-path hostPath: path: /opt/cni/bin + - name: host-procfs + hostPath: + path: /proc + type: Directory - name: istiod-ca-cert configMap: defaultMode: 420 @@ -105,6 +109,9 @@ spec: - name: kmesh-cni-install-path mountPath: /opt/cni/bin readOnly: false + - mountPath: /host/proc + name: host-procfs + readOnly: true - name: istiod-ca-cert mountPath: /var/run/secrets/istio - name: istio-token @@ -116,4 +123,3 @@ spec: cpu: "1" priorityClassName: system-node-critical serviceAccountName: kmesh - hostPID: true diff --git a/docs/proposal/observability.md b/docs/proposal/observability.md new file mode 100644 index 000000000..b5453df01 --- /dev/null +++ b/docs/proposal/observability.md @@ -0,0 +1,264 @@ +--- +title: proposal of Kmesh observability +authors: +- "@LiZhencheng9527" # Authors' GitHub accounts here. +reviewers: +- "" +- TBD +approvers: +- "" +- TBD + +creation-date: 2024-05-16 + +--- + +## proposal of Kmesh observability + + + +### Summary + + + +The importance of observability of service mesh as the basis for manageable, reliable and sustainable grid systems cannot be ignored.In istio, accesslog, metric, and tracing are provided at layers l4 and l7 to meet the user's need for observability. + +In this proposal, I will analyse the observability metrics of istio. And propose that Kmesh implement observability features to support these metrics. So that users can seamlessly use Kmesh. + +### Motivation + + + +#### Accesslog + +In [istio ztunnel](https://github.com/istio/ztunnel?tab=readme-ov-file#logging), the Layer 4 access log contains the following metrics: + +source.addr +source.workload +source.namespace +source.identity + +destination.addr +destination.hbone_addr +destination.service +destination.workload +destination.namespace +destination.identity + +direction + +bytes_sent +bytes_recv +duration + +An example of the accesslog obtained is shown below: + +```console +2024-05-30T12:18:10.172761Z info access connection complete + src.addr=10.244.0.10:47667 src.workload=sleep-7656cf8794-9v2gv src.namespace=ambient-demo src.identity="spiffe://cluster.local/ns/ambient-demo/sa/sleep" + dst.addr=10.244.0.7:8080 dst.hbone_addr=10.244.0.7:8080 dst.service=httpbin.ambient-demo.svc.cluster.local dst.workload=httpbin-86b8ffc5ff-bhvxx dst.namespace=ambient-demo + dst.identity="spiffe://cluster.local/ns/ambient-demo/sa/httpbin" + direction="inbound" bytes_sent=239 bytes_recv=76 duration="2ms" +``` + +The accesslog needs to contain the identities(address/workload/namespace/identity) of the destination and source. In addition, the required metrics are the size of the message sent(bytes_sent), the size of the message received(bytes_recv) and the duration of the link. + +In order for users to be able to use Kmesh smoothly, Kmesh needs to support these accesslog. + +#### Metrics + +To monitor service behavior, Istio generates metrics for all service traffic in, out, and within an Istio service mesh. These metrics provide information on behaviors. + +Refer to [istio ztunnel metric](https://github.com/istio/ztunnel/blob/6532c553946856b4acc326f3b9ca6cc6abc718d0/src/proxy/metrics.rs#L369) , at Layer L4, the required metric is: + +```console +connection_opens: The total number of TCP connections opened +connection_close: The total number of TCP connections closed +received_bytes: The size of total bytes received during request in case of a TCP connection +sent_bytes: The size of total bytes sent during response in case of a TCP connection +on_demand_dns: The total number of requests that used on-demand DNS (unstable) +on_demand_dns_cache_misses: The total number of cache misses for requests on-demand DNS (unstable) +``` + +DNS-related metrics in the above metrics, as Kmesh does not yet support DNS, we will consider supporting it after the Kmesh DNS functionality is implemented. + +Therefore Kmesh first needs to support `connection_opens`, `connection_close`, `received_bytes`, `sent_bytes`. + +The above metrics also include the labels shown below: + +```console +reporter + +source_workload +source_canonical_service +source_canonical_revision +source_workload_namespace +source_principal +source_app +source_version +source_cluster + +destination_service +destination_service_namespace +destination_service_name + +destination_workload +destination_canonical_service +destination_canonical_revision +destination_workload_namespace +destination_principal +destination_app +destination_version +destination_cluster + +request_protocol +response_flag +connection_security_policy + +istio_tcp_sent_bytes_total{ + reporter="destination", + + source_workload="sleep",source_canonical_service="sleep",source_canonical_revision="latest",source_workload_namespace="ambient-demo", + source_principal="spiffe://cluster.local/ns/ambient-demo/sa/sleep",source_app="sleep",source_version="latest",source_cluster="Kubernetes", + + destination_service="tcp-echo.ambient-demo.svc.cluster.local",destination_service_namespace="ambient-demo",destination_service_name="tcp-echo",destination_workload="tcp-echo",destination_canonical_service="tcp-echo",destination_canonical_revision="v1",destination_workload_namespace="ambient-demo", + destination_principal="spiffe://cluster.local/ns/ambient-demo/sa/default",destination_app="tcp-echo",destination_version="v1",destination_cluster="Kubernetes", + + request_protocol="tcp",response_flags="-",connection_security_policy="mutual_tls"} 16 +``` + +`Report` shows whether the metric is on the sender or the receiver. Then there is some identity information about the source and destination. These are similar to labels in accesslog. + +Then is `request_protocol`, `response_flag` and `connection_security_policy`. The values of `connection_security_policy` are mutual_tls and unknown. + +In addition to the metrics already available for istio, as Kmesh is able to get [richer metrics](https://gitee.com/openeuler/gala-docs/blob/master/gopher_tech.md#tcp%E6%8C%87%E6%A0%87) from the kernel. This will be an advantage for Kmesh. + +#### Goals + + + +It is now clear that in order to enhance the observability of Kmesh, we need to: + +- Getting the required metrics from ebpf. +- Generation of accesslog from acquired data +- Support for querying metrics through Prometheus + +#### Non-Goals + + + +- Dns related indicators. +- Metrics for the L7 layer. + +### Proposal + + + +Kmesh needs to collect metrics through the kernel and pass them on to the user mode. In the user mode, accesslog is generated from metrics. And support querying metrics through kemsh localhost:15020. + +### Design Details + + + +This is because Kmesh needs to get metrics from the kernel and sent them to the user mode. We need a bpf map to record the metrics, as a vehicle for transferring. + +So we need to define a bpf map that contains all the required metrics: + +```console +struct conn_value { + u64 connection_opens; + u64 connection_closes; + u64 received_bytes; + u64 sent_bytes; + u64 duration; + + __u32 destination; + __u32 source; +}; +``` + +The above destinations and sources are bpf maps that contain workload identity information. + +#### Access log + +On termination of the TCP link, ebpf sents the data from this link to kmesh-daemon through bpf map. + +Relying on this data to generate accesslog, which is then printed by kmesh log. + +#### Metrics + +Metric is obtained in the same way as accesslog. + +After obtaining the metric through bpf map, we also have to support the Prometheus query. + +1.Expose metrics to Prometheus RegistryEnable HTTP listening interface. +2.Enable HTTP listening interface. +3.Regular updating of metrics. Update metrics every time a link is broken. + +
+ +
+ +Observability should be achieved in both ads mode and workload mode. + +We now consider the realisation of only l4 layers of observability. + +For metric features, provide 15020 port for Prometheus queries. + +#### Test Plan + + + +### Alternatives + + + + \ No newline at end of file diff --git a/docs/proposal/pics/observability.svg b/docs/proposal/pics/observability.svg new file mode 100644 index 000000000..4b2ea0a88 --- /dev/null +++ b/docs/proposal/pics/observability.svg @@ -0,0 +1,4 @@ + + + +
istiod
istiod
Ads/Workload Controller
Ads/Workload Control...
access log
access log
Telemetry Controller
Telemetry Controller
xDS
xDS
extension filter
extension filter
Workloads
Workloads
Metric
Metric
log file
log file
Store
Store
Access log format
Access log...
Get
Get
Kmesh daemon
Kmesh daemon
xDS
xDS
Metric Port
Metric Port
Prometheus
Prometheus
Metric
Metric
Text is not SVG - cannot display
\ No newline at end of file diff --git a/hack/run-ut.sh b/hack/run-ut.sh new file mode 100755 index 000000000..bc370bcea --- /dev/null +++ b/hack/run-ut.sh @@ -0,0 +1,52 @@ +#!/bin/bash + +ROOT_DIR=$(git rev-parse --show-toplevel) + +. $ROOT_DIR/hack/utils.sh + +function docker_run_go_ut() { + local container_id=$1 + docker exec $container_id go test -v -vet=off ./pkg/... +} + +function run_go_ut_local() { + bash $ROOT_DIR/build.sh + export PKG_CONFIG_PATH=$ROOT_DIR/mk + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$ROOT_DIR/api/v2-c:$ROOT_DIR/bpf/deserialization_to_bpf_map + go test -v -vet=off ./pkg/... +} + +function run_go_ut_in_docker() { + prepare + container_id=$(run_docker_container) + build_kmesh $container_id + docker_run_go_ut $container_id + clean_container $container_id +} + +function clean() { + make clean $ROOT_DIR +} + +# Running go ut with docker by default +if [ -z "$1" -o "$1" == "-d" -o "$1" == "--docker" ]; then + run_go_ut_in_docker + exit +fi + +if [ "$1" == "-l" -o "$1" == "--local" ]; then + run_go_ut_local + exit +fi + +if [ "$1" == "-h" -o "$1" == "--help" ]; then + echo run-ut.sh -h/--help : Help. + echo run-ut.sh -d/--docker: run go unit test in docker. + echo run-ut.sh -l/--local: run go unit test locally. + exit +fi + +if [ "$1" == "-c" -o "$1" == "--clean" ]; then + clean + exit +fi \ No newline at end of file diff --git a/hack/utils.sh b/hack/utils.sh new file mode 100644 index 000000000..cf83c7f11 --- /dev/null +++ b/hack/utils.sh @@ -0,0 +1,73 @@ +#!/bin/bash + +function prepare() { + local arch + arch=$(get_arch) + docker pull "ghcr.io/kmesh-net/kmesh-build-${arch}:latest" +} + +function run_docker_container() { + local arch + arch=$(get_arch) + local container_id + container_id=$(docker run -itd --privileged=true \ + -v /usr/src:/usr/src \ + -v /usr/include/linux/bpf.h:/kmesh/config/linux-bpf.h \ + -v /etc/cni/net.d:/etc/cni/net.d \ + -v /opt/cni/bin:/opt/cni/bin \ + -v /mnt:/mnt \ + -v /sys/fs/bpf:/sys/fs/bpf \ + -v /lib/modules:/lib/modules \ + -v "$(pwd)":/kmesh \ + -v "$(go env GOCACHE)":/root/.cache/go-build \ + -v "$(go env GOMODCACHE)":/go/pkg/mod \ + -e PKG_CONFIG_PATH=/kmesh/mk \ + --name kmesh-build "ghcr.io/kmesh-net/kmesh-build-${arch}:latest") + + echo "$container_id" +} + +function get_arch() { + if [ "$(arch)" == "x86_64" ]; then + echo "x86" + else + echo "arm" + fi +} + +function build_kmesh() { + local container_id=$1 + docker exec $container_id git config --global --add safe.directory /kmesh + docker exec $container_id sh /kmesh/build.sh + docker exec $container_id sh /kmesh/build.sh -i + docker exec $container_id sh -c "$(declare -f copy_to_host); copy_to_host" +} + +function copy_to_host() { + local arch="" + if [ "$(arch)" == "x86_64" ]; then + arch="amd64" + else + arch="aarch64" + fi + + mkdir -p "./out/$arch" + mkdir -p "./out/$arch/ko" + + cp /usr/lib64/libkmesh_api_v2_c.so out/$arch + cp /usr/lib64/libkmesh_deserial.so out/$arch + cp /usr/lib64/libboundscheck.so out/$arch + find /usr/lib64 -name 'libbpf.so*' -exec cp {} out/$arch \; + find /usr/lib64 -name 'libprotobuf-c.so*' -exec cp {} out/$arch \; + cp /usr/bin/kmesh-daemon out/$arch + cp /usr/bin/kmesh-cni out/$arch + cp /usr/bin/mdacore out/$arch + if [ -f "/lib/modules/kmesh/kmesh.ko" ]; then + cp /lib/modules/kmesh/kmesh.ko out/$arch/ko + fi +} + +function clean_container() { + local container_id=$1 + docker rm -f $container_id +} diff --git a/kmesh_compile.sh b/kmesh_compile.sh index 37a196227..021395e31 100755 --- a/kmesh_compile.sh +++ b/kmesh_compile.sh @@ -1,75 +1,8 @@ #!/bin/bash -function prepare() { - local arch - arch=$(get_arch) - docker pull "ghcr.io/kmesh-net/kmesh-build-${arch}:latest" -} +ROOT_DIR=$(git rev-parse --show-toplevel) -function run_docker_container() { - local arch - arch=$(get_arch) - local container_id - container_id=$(docker run -itd --privileged=true \ - -v /usr/src:/usr/src \ - -v /usr/include/linux/bpf.h:/kmesh/config/linux-bpf.h \ - -v /etc/cni/net.d:/etc/cni/net.d \ - -v /opt/cni/bin:/opt/cni/bin \ - -v /mnt:/mnt \ - -v /sys/fs/bpf:/sys/fs/bpf \ - -v /lib/modules:/lib/modules \ - -v "$(pwd)":/kmesh \ - -v "$(go env GOCACHE)":/root/.cache/go-build \ - -v "$(go env GOMODCACHE)":/go/pkg/mod \ - --name kmesh-build "ghcr.io/kmesh-net/kmesh-build-${arch}:latest") - - echo "$container_id" -} - -function get_arch() { - if [ "$(arch)" == "x86_64" ]; then - echo "x86" - else - echo "arm" - fi -} - -function build_kmesh() { - local container_id=$1 - docker exec $container_id git config --global --add safe.directory /kmesh - docker exec $container_id sh /kmesh/build.sh - docker exec $container_id sh /kmesh/build.sh -i - docker exec $container_id sh -c "$(declare -f copy_to_host); copy_to_host" -} - -function copy_to_host() { - local arch="" - if [ "$(arch)" == "x86_64" ]; then - arch="amd64" - else - arch="aarch64" - fi - - mkdir -p "./out/$arch" - mkdir -p "./out/$arch/ko" - - cp /usr/lib64/libkmesh_api_v2_c.so out/$arch - cp /usr/lib64/libkmesh_deserial.so out/$arch - cp /usr/lib64/libboundscheck.so out/$arch - find /usr/lib64 -name 'libbpf.so*' -exec cp {} out/$arch \; - find /usr/lib64 -name 'libprotobuf-c.so*' -exec cp {} out/$arch \; - cp /usr/bin/kmesh-daemon out/$arch - cp /usr/bin/kmesh-cni out/$arch - cp /usr/bin/mdacore out/$arch - if [ -f "/lib/modules/kmesh/kmesh.ko" ]; then - cp /lib/modules/kmesh/kmesh.ko out/$arch/ko - fi -} - -function clean_container() { - local container_id=$1 - docker rm -f $container_id -} +. $ROOT_DIR/hack/utils.sh prepare container_id=$(run_docker_container) diff --git a/pkg/auth/rbac.go b/pkg/auth/rbac.go index aef47bc3c..6348d7022 100644 --- a/pkg/auth/rbac.go +++ b/pkg/auth/rbac.go @@ -22,6 +22,7 @@ import ( "encoding/binary" "fmt" "net" + "net/netip" "strings" "github.com/cilium/ebpf/ringbuf" @@ -31,7 +32,6 @@ import ( "kmesh.net/kmesh/pkg/bpf" "kmesh.net/kmesh/pkg/controller/workload/cache" "kmesh.net/kmesh/pkg/logger" - "kmesh.net/kmesh/pkg/nets" ) const ( @@ -178,10 +178,10 @@ func (r *Rbac) RemovePolicy(policyKey string) { func (r *Rbac) doRbac(conn *rbacConnection) bool { var dstWorkload *workloadapi.Workload if len(conn.dstIp) > 0 { - dstWorkload = r.workloadCache.GetWorkloadByAddr(cache.NetworkAddress{ - Network: conn.dstNetwork, - Address: nets.ConvertIpByteToUint32(conn.dstIp), - }) + var networkAddress cache.NetworkAddress + networkAddress.Network = conn.dstNetwork + networkAddress.Address, _ = netip.AddrFromSlice(conn.dstIp) + dstWorkload = r.workloadCache.GetWorkloadByAddr(networkAddress) } allowPolicies, denyPolicies := r.aggregate(dstWorkload) @@ -483,9 +483,9 @@ func isEmptyMatch(m *security.Match) bool { // todo : get identity form tls connection func (r *Rbac) getIdentityByIp(ip []byte) Identity { - workload := r.workloadCache.GetWorkloadByAddr(cache.NetworkAddress{ - Address: nets.ConvertIpByteToUint32(ip), - }) + var networkAddress cache.NetworkAddress + networkAddress.Address, _ = netip.AddrFromSlice(ip) + workload := r.workloadCache.GetWorkloadByAddr(networkAddress) if workload == nil { log.Warnf("get worload from ip %v failed", ip) return Identity{} diff --git a/pkg/bpf/bpf_kmesh_workload.go b/pkg/bpf/bpf_kmesh_workload.go index d204153d5..777b62700 100644 --- a/pkg/bpf/bpf_kmesh_workload.go +++ b/pkg/bpf/bpf_kmesh_workload.go @@ -341,7 +341,7 @@ func (sm *BpfSendMsgWorkload) LoadSendMsg() error { return err } - prog := spec.Programs["sendmsg"] + prog := spec.Programs["sendmsg_prog"] sm.Info.Type = prog.Type sm.Info.AttachType = prog.AttachType return nil @@ -349,7 +349,7 @@ func (sm *BpfSendMsgWorkload) LoadSendMsg() error { func (sm *BpfSendMsgWorkload) Attach() error { // Use a program handle that cannot be closed by the caller - clone, err := sm.KmeshSendmsgObjects.KmeshSendmsgPrograms.Sendmsg.Clone() + clone, err := sm.KmeshSendmsgObjects.KmeshSendmsgPrograms.SendmsgProg.Clone() if err != nil { return err } @@ -372,7 +372,7 @@ func (sm *BpfSendMsgWorkload) Detach() error { if sm.AttachFD > 0 { args := link.RawDetachProgramOptions{ Target: sm.AttachFD, - Program: sm.KmeshSendmsgObjects.KmeshSendmsgPrograms.Sendmsg, + Program: sm.KmeshSendmsgObjects.KmeshSendmsgPrograms.SendmsgProg, Attach: ebpf.AttachSkMsgVerdict, } diff --git a/pkg/cni/plugin/plugin.go b/pkg/cni/plugin/plugin.go index f5282f744..db540fc68 100644 --- a/pkg/cni/plugin/plugin.go +++ b/pkg/cni/plugin/plugin.go @@ -112,14 +112,15 @@ func checkKmesh(client kubernetes.Interface, pod *v1.Pod) (bool, error) { return false, nil } + // Exclude istio managed gateway if gateway, ok := pod.Labels["gateway.istio.io/managed"]; ok { if strings.EqualFold(gateway, "istio.io-mesh-controller") { return false, nil } } - mode := namespace.Labels["istio.io/dataplane-mode"] - if strings.EqualFold(mode, "Kmesh") { + mode := namespace.Labels[constants.DataPlaneModeLabel] + if strings.EqualFold(mode, constants.DataPlaneModeKmesh) { return true, nil } diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 01af08819..75712396a 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -20,6 +20,11 @@ const ( AdsMode = "ads" WorkloadMode = "workload" + // DataPlaneModeLabel is the label used to indicate the data plane mode + DataPlaneModeLabel = "istio.io/dataplane-mode" + // DataPlaneModeKmesh is the value of the label to indicate the data plane mode is kmesh + DataPlaneModeKmesh = "kmesh" + XDP_PROG_NAME = "xdp_shutdown" RootCertPath = "/var/run/secrets/istio/root-cert.pem" diff --git a/pkg/controller/ads/ads_controller_test.go b/pkg/controller/ads/ads_controller_test.go index d0e8d25b5..1badf456c 100644 --- a/pkg/controller/ads/ads_controller_test.go +++ b/pkg/controller/ads/ads_controller_test.go @@ -19,47 +19,33 @@ package ads import ( "context" "errors" - "net" "reflect" "testing" - "time" "github.com/agiledragon/gomonkey/v2" config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" resource_v3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/anypb" "kmesh.net/kmesh/pkg/controller/xdstest" ) func TestAdsStreamAdsStreamCreateAndSend(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - adsStream := Controller{ - Processor: nil, - } - // create a fake grpc service client - mockDiscovery := xdstest.NewMockServer(t) - conn, err := grpc.Dial("buffcon", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return mockDiscovery.Listener.Dial() - })) + mockDiscovery := xdstest.NewXdsServer(t) + + client, err := xdstest.NewClient(mockDiscovery) if err != nil { t.Errorf("grpc connection client create failed, %s", err) } - defer conn.Close() - client := discoveryv3.NewAggregatedDiscoveryServiceClient(conn) - stream, streamErr := client.StreamAggregatedResources(ctx) - if streamErr != nil { - t.Errorf("create stream failed, %s", streamErr) + defer client.Cleanup() + + adsStream := Controller{ + Stream: client.AdsClient, + Processor: nil, } - adsStream.Stream = stream patches1 := gomonkey.NewPatches() patches2 := gomonkey.NewPatches() @@ -72,9 +58,9 @@ func TestAdsStreamAdsStreamCreateAndSend(t *testing.T) { { name: "test1: send request failed, should return error", beforeFunc: func() { - patches1.ApplyMethod(reflect.TypeOf(client), "StreamAggregatedResources", + patches1.ApplyMethod(reflect.TypeOf(client.Client), "StreamAggregatedResources", func(_ discoveryv3.AggregatedDiscoveryServiceClient, ctx context.Context, opts ...grpc.CallOption) (discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) { - return stream, nil + return client.AdsClient, nil }) patches2.ApplyMethod(reflect.TypeOf(adsStream.Stream), "Send", func(_ discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesClient, req *discoveryv3.DiscoveryRequest) error { @@ -88,7 +74,7 @@ func TestAdsStreamAdsStreamCreateAndSend(t *testing.T) { wantErr: true, }, { - name: "test2: no movk, send request successful, should return nil", + name: "test2: send request successful, should return nil", beforeFunc: func() {}, afterFunc: func() {}, wantErr: false, @@ -96,7 +82,7 @@ func TestAdsStreamAdsStreamCreateAndSend(t *testing.T) { { name: "test3: fail to create adsstream, should return error", beforeFunc: func() { - patches1.ApplyMethod(reflect.TypeOf(client), "StreamAggregatedResources", + patches1.ApplyMethod(reflect.TypeOf(client.Client), "StreamAggregatedResources", func(_ discoveryv3.AggregatedDiscoveryServiceClient, ctx context.Context, opts ...grpc.CallOption) (discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) { return nil, errors.New("fail to create adsstream") }) @@ -110,7 +96,7 @@ func TestAdsStreamAdsStreamCreateAndSend(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.beforeFunc() - err := adsStream.AdsStreamCreateAndSend(client, context.TODO()) + err := adsStream.AdsStreamCreateAndSend(client.Client, context.TODO()) if (err != nil) != tt.wantErr { t.Errorf("adsStream.AdsStreamCreateAndSend() error = %v, wantErr %v", err, tt.wantErr) return @@ -120,29 +106,17 @@ func TestAdsStreamAdsStreamCreateAndSend(t *testing.T) { } } -func TestAdsStream_AdsStreamProcess(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - adsStream := NewController() - +func TestHandleAdsStream(t *testing.T) { // create a fake grpc service client - mockDiscovery := xdstest.NewMockServer(t) - conn, err := grpc.Dial("buffcon", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return mockDiscovery.Listener.Dial() - })) + mockDiscovery := xdstest.NewXdsServer(t) + fakeClient, err := xdstest.NewClient(mockDiscovery) if err != nil { t.Errorf("grpc connection client create failed, %s", err) } - defer conn.Close() - client := discoveryv3.NewAggregatedDiscoveryServiceClient(conn) - stream, streamErr := client.StreamAggregatedResources(ctx) - if streamErr != nil { - t.Errorf("create stream failed, %s", streamErr) - } - adsStream.Stream = stream + defer fakeClient.Cleanup() + + adsStream := NewController() + adsStream.Stream = fakeClient.AdsClient patches1 := gomonkey.NewPatches() patches2 := gomonkey.NewPatches() diff --git a/pkg/controller/bypass/bypass_controller.go b/pkg/controller/bypass/bypass_controller.go index 61e157ee3..1c7ea4ee0 100644 --- a/pkg/controller/bypass/bypass_controller.go +++ b/pkg/controller/bypass/bypass_controller.go @@ -274,7 +274,7 @@ func getnspath(pod *corev1.Pod) (string, error) { if err != nil { return "", err } - res = path.Join("/proc", res) + res = path.Join("/host/proc", res) return res, nil } @@ -287,7 +287,7 @@ func BuiltinOrDir(dir string) fs.FS { func FindNetnsForPod(pod *corev1.Pod) (string, error) { netnsObserved := sets.New[uint64]() - fd := BuiltinOrDir("/proc") + fd := BuiltinOrDir("/host/proc") entries, err := fs.ReadDir(fd, ".") if err != nil { diff --git a/pkg/controller/client_test.go b/pkg/controller/client_test.go index 39338d266..50d1cdd83 100644 --- a/pkg/controller/client_test.go +++ b/pkg/controller/client_test.go @@ -50,7 +50,7 @@ func TestRecoverConnection(t *testing.T) { return nil, errors.New("failed to create grpc connect") } else { // returns a fake grpc connection - mockDiscovery := xdstest.NewMockServer(t) + mockDiscovery := xdstest.NewXdsServer(t) return grpc.Dial("buffcon", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), @@ -69,7 +69,7 @@ func TestClientResponseProcess(t *testing.T) { netPatches := gomonkey.NewPatches() defer netPatches.Reset() netPatches.ApplyFunc(nets.GrpcConnect, func(addr string) (*grpc.ClientConn, error) { - mockDiscovery := xdstest.NewMockServer(t) + mockDiscovery := xdstest.NewXdsServer(t) return grpc.Dial("buffcon", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), @@ -116,7 +116,7 @@ func TestClientResponseProcess(t *testing.T) { netPatches := gomonkey.NewPatches() defer netPatches.Reset() netPatches.ApplyFunc(nets.GrpcConnect, func(addr string) (*grpc.ClientConn, error) { - mockDiscovery := xdstest.NewMockServer(t) + mockDiscovery := xdstest.NewXdsServer(t) return grpc.Dial("buffcon", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index c5323ad4e..a311dd9eb 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -19,6 +19,7 @@ package controller import ( "fmt" + "kmesh.net/kmesh/daemon/options" "kmesh.net/kmesh/pkg/bpf" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/bypass" @@ -33,17 +34,19 @@ var ( ) type Controller struct { - mode string - bpfWorkloadObj *bpf.BpfKmeshWorkload - client *XdsClient - enableByPass bool + mode string + bpfWorkloadObj *bpf.BpfKmeshWorkload + client *XdsClient + enableByPass bool + enableSecretManager bool } -func NewController(mode string, enableByPass bool, bpfWorkloadObj *bpf.BpfKmeshWorkload) *Controller { +func NewController(opts *options.BootstrapConfigs, bpfWorkloadObj *bpf.BpfKmeshWorkload) *Controller { return &Controller{ - mode: mode, - enableByPass: enableByPass, - bpfWorkloadObj: bpfWorkloadObj, + mode: opts.BpfConfig.Mode, + enableByPass: opts.ByPassConfig.EnableByPass, + bpfWorkloadObj: bpfWorkloadObj, + enableSecretManager: opts.SecretManagerConfig.Enable, } } @@ -66,15 +69,16 @@ func (c *Controller) Start() error { return nil } - secertManager, err := security.NewSecretManager() - if err != nil { - return fmt.Errorf("secretManager create failed: %v", err) - } - go secertManager.Run(stopCh) - c.client = NewXdsClient(c.mode, c.bpfWorkloadObj) if c.client.WorkloadController != nil { - c.client.WorkloadController.Processor.Sm = secertManager + if c.enableSecretManager { + secertManager, err := security.NewSecretManager() + if err != nil { + return fmt.Errorf("secretManager create failed: %v", err) + } + go secertManager.Run(stopCh) + c.client.WorkloadController.Processor.SecretManager = secertManager + } } return c.client.Run(stopCh) diff --git a/pkg/controller/security/caclient.go b/pkg/controller/security/caclient.go index 0023dc27b..9b4b64496 100644 --- a/pkg/controller/security/caclient.go +++ b/pkg/controller/security/caclient.go @@ -24,6 +24,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/structpb" pb "istio.io/api/security/v1alpha1" "istio.io/istio/pkg/security" @@ -57,7 +58,7 @@ func newCaClient(opts *security.Options, tlsOpts *tlsOptions) (*caClient, error) opts: opts, } - conn, err := nets.GrpcConnect(CSRSignAddress) + conn, err := nets.GrpcConnect(caAddress) if err != nil { return nil, fmt.Errorf("failed to create grpcconnect : %v", err) } @@ -84,20 +85,13 @@ func (c *caClient) csrSend(csrPEM []byte, certValidsec int64, identity string) ( Metadata: crMeta, } - ctx := context.Background() - + // TODO: support customize clusterID, which is needed for multicluster mesh + ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("ClusterID", "Kubernetes")) // To handle potential grpc connection disconnection and retry once // when certificate acquisition fails. If it still fails, return an error. resp, err := c.client.CreateCertificate(ctx, req) if err != nil { - log.Errorf("create certificate: %v reconnect...", err) - if err := c.reconnect(); err != nil { - return nil, fmt.Errorf("reconnect error: %v", err) - } - resp, err = c.client.CreateCertificate(ctx, req) - if err != nil { - return nil, fmt.Errorf("create certificate: %v", err) - } + return nil, fmt.Errorf("create certificate failed: %v", err) } if len(resp.CertChain) <= 1 { @@ -140,7 +134,7 @@ func (c *caClient) fetchCert(identity string) (*security.SecretItem, error) { } certChainPEM, err := c.csrSend(csrPEM, int64(c.opts.SecretTTL.Seconds()), identity) if err != nil { - return nil, fmt.Errorf("failed to get certChainPEM") + return nil, err } certChain := standardCerts(certChainPEM) @@ -153,7 +147,7 @@ func (c *caClient) fetchCert(identity string) (*security.SecretItem, error) { rootCertPEM = []byte(certChainPEM[len(certChainPEM)-1]) - log.Debugf("cert for %v ExpireTime :%v", identity, expireTime) + log.Debugf("cert for %v expireTime :%v", identity, expireTime) return &security.SecretItem{ CertificateChain: certChain, PrivateKey: keyPEM, @@ -164,20 +158,6 @@ func (c *caClient) fetchCert(identity string) (*security.SecretItem, error) { }, nil } -func (c *caClient) reconnect() error { - if err := c.conn.Close(); err != nil { - return fmt.Errorf("failed to close connection: %v", err) - } - - conn, err := nets.GrpcConnect(CSRSignAddress) - if err != nil { - return err - } - c.conn = conn - c.client = pb.NewIstioCertificateServiceClient(conn) - return nil -} - func (c *caClient) close() error { return c.conn.Close() } diff --git a/pkg/controller/security/manager.go b/pkg/controller/security/manager.go index 7d3811c49..9a6a71cee 100644 --- a/pkg/controller/security/manager.go +++ b/pkg/controller/security/manager.go @@ -80,11 +80,13 @@ func (s *SecretManager) handleCertRequests(stop <-chan struct{}) { continue } // sign cert if only no cert exists for this identity - go s.addCert(identity) + go s.fetchCert(identity) + case RETRY: + s.retryFetchCert(identity) case DELETE: s.deleteCert(identity) case Rotate: - go s.rotateCert(identity) + s.rotateCert(identity) } } } @@ -180,12 +182,14 @@ func (s *SecretManager) rotateCerts() { } // addCert signs a cert for the identity and cache it. -func (s *SecretManager) addCert(identity string) { +func (s *SecretManager) fetchCert(identity string) { newCert, err := s.caClient.fetchCert(identity) if err != nil { - log.Errorf("fetcheCert %v error: %v", identity, err) - // in case fetchCert failed, retry - s.certRequestChan <- certRequest{Identity: identity, Operation: ADD} + log.Errorf("fetchCert for [%v] error: %v", identity, err) + // TODO: backoff retry + time.AfterFunc(time.Second, func() { + s.SendCertRequest(identity, RETRY) + }) return } @@ -225,5 +229,18 @@ func (s *SecretManager) rotateCert(identity string) { log.Debugf("cert %s expire at %T, skip rotate now", identity, certificate.cert.ExpireTime) } - s.addCert(identity) + go s.fetchCert(identity) +} + +func (s *SecretManager) retryFetchCert(identity string) { + s.certsCache.mu.RLock() + certificate := s.certsCache.certs[identity] + if certificate == nil { + s.certsCache.mu.RUnlock() + log.Debugf("identity: %v cert has been deleted", identity) + return + } + s.certsCache.mu.RUnlock() + + go s.fetchCert(identity) } diff --git a/pkg/controller/security/option.go b/pkg/controller/security/option.go index 95c484421..8cf9b2ff3 100644 --- a/pkg/controller/security/option.go +++ b/pkg/controller/security/option.go @@ -26,6 +26,7 @@ import ( const ( ADD = iota DELETE + RETRY Rotate maxConcurrentCSR = 128 // max concurrent CSR @@ -42,8 +43,8 @@ func NewSecurityOptions() *security.Options { } var ( - CSRSignAddress = env.Register("CA_CONTROLLER", "istiod.istio-system.svc:15012", "").Get() - secretTTLEnv = env.Register("SECRET_TTL", 24*time.Hour, + caAddress = env.Register("CA_ADDRESS", "istiod.istio-system.svc:15012", "").Get() + secretTTLEnv = env.Register("SECRET_TTL", 24*time.Hour, "The cert lifetime requested by kmesh CA agent").Get() workloadRSAKeySizeEnv = env.Register("WORKLOAD_RSA_KEY_SIZE", 2048, diff --git a/pkg/controller/workload/bpfcache/backend.go b/pkg/controller/workload/bpfcache/backend.go index b459e3802..b44069f69 100644 --- a/pkg/controller/workload/bpfcache/backend.go +++ b/pkg/controller/workload/bpfcache/backend.go @@ -32,10 +32,10 @@ type BackendKey struct { type ServiceList [MaxServiceNum]uint32 type BackendValue struct { - IPv4 uint32 // backend ip + Ip [16]byte ServiceCount uint32 Services ServiceList - WaypointAddr uint32 + WaypointAddr [16]byte WaypointPort uint32 } diff --git a/pkg/controller/workload/bpfcache/fake_map.go b/pkg/controller/workload/bpfcache/fake_map.go new file mode 100644 index 000000000..0d1b4248b --- /dev/null +++ b/pkg/controller/workload/bpfcache/fake_map.go @@ -0,0 +1,88 @@ +/* + * Copyright 2024 The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package bpfcache + +import ( + "testing" + "unsafe" + + "github.com/cilium/ebpf" + + "kmesh.net/kmesh/bpf/kmesh/bpf2go" +) + +func NewFakeWorkloadMap(t *testing.T) bpf2go.KmeshCgroupSockWorkloadMaps { + backEndMap, err := ebpf.NewMap(&ebpf.MapSpec{ + Name: "kmesh_backend", + Type: ebpf.Hash, + KeySize: uint32(unsafe.Sizeof(BackendKey{})), + ValueSize: uint32(unsafe.Sizeof(BackendValue{})), + MaxEntries: 1024, + }) + if err != nil { + t.Fatalf("create backEndMap map failed, err is %v", err) + } + + endpointMap, err := ebpf.NewMap(&ebpf.MapSpec{ + Name: "kmesh_endpoint", + Type: ebpf.Hash, + KeySize: uint32(unsafe.Sizeof(EndpointKey{})), + ValueSize: uint32(unsafe.Sizeof(EndpointValue{})), + MaxEntries: 1024, + }) + if err != nil { + t.Fatalf("create endpointMap map failed, err is %v", err) + } + + frontendMap, err := ebpf.NewMap(&ebpf.MapSpec{ + Name: "kmesh_frontend", + Type: ebpf.Hash, + KeySize: uint32(unsafe.Sizeof(FrontendKey{})), + ValueSize: uint32(unsafe.Sizeof(FrontendValue{})), + MaxEntries: 1024, + }) + if err != nil { + t.Fatalf("create frontendMap map failed, err is %v", err) + } + + serviceMap, err := ebpf.NewMap(&ebpf.MapSpec{ + Name: "kmesh_service", + Type: ebpf.Hash, + KeySize: uint32(unsafe.Sizeof(ServiceKey{})), + ValueSize: uint32(unsafe.Sizeof(ServiceValue{})), + MaxEntries: 1024, + }) + if err != nil { + t.Fatalf("create serviceMap map failed, err is %v", err) + } + + // TODO: add other maps when needed + + return bpf2go.KmeshCgroupSockWorkloadMaps{ + KmeshBackend: backEndMap, + KmeshEndpoint: endpointMap, + KmeshFrontend: frontendMap, + KmeshService: serviceMap, + } +} + +func CleanupFakeWorkloadMap(maps bpf2go.KmeshCgroupSockWorkloadMaps) { + maps.KmeshBackend.Close() + maps.KmeshEndpoint.Close() + maps.KmeshFrontend.Close() + maps.KmeshService.Close() +} diff --git a/pkg/controller/workload/bpfcache/frontend.go b/pkg/controller/workload/bpfcache/frontend.go index 6869289fa..804afc304 100644 --- a/pkg/controller/workload/bpfcache/frontend.go +++ b/pkg/controller/workload/bpfcache/frontend.go @@ -21,7 +21,7 @@ import ( ) type FrontendKey struct { - IPv4 uint32 // Service ip or Pod ip + Ip [16]byte // Service ip or Pod ip } type FrontendValue struct { diff --git a/pkg/controller/workload/bpfcache/service.go b/pkg/controller/workload/bpfcache/service.go index 17bed9a48..833f83752 100644 --- a/pkg/controller/workload/bpfcache/service.go +++ b/pkg/controller/workload/bpfcache/service.go @@ -36,7 +36,7 @@ type ServiceValue struct { LbPolicy uint32 // load balancing algorithm, currently only supports random algorithm ServicePort ServicePorts // ServicePort[i] and TargetPort[i] are a pair, i starts from 0 and max value is MaxPortNum-1 TargetPort TargetPorts - WaypointAddr uint32 + WaypointAddr [16]byte WaypointPort uint32 } diff --git a/pkg/controller/workload/cache/workload_cache.go b/pkg/controller/workload/cache/workload_cache.go index f63ebeac9..82eef734a 100644 --- a/pkg/controller/workload/cache/workload_cache.go +++ b/pkg/controller/workload/cache/workload_cache.go @@ -17,12 +17,12 @@ package cache import ( + "net/netip" "sync" "google.golang.org/protobuf/proto" "kmesh.net/kmesh/api/v2/workloadapi" - "kmesh.net/kmesh/pkg/nets" ) type WorkloadCache interface { @@ -35,7 +35,7 @@ type WorkloadCache interface { type NetworkAddress struct { Network string - Address uint32 + Address netip.Addr } type cache struct { @@ -63,7 +63,7 @@ func (w *cache) GetWorkloadByAddr(networkAddress NetworkAddress) *workloadapi.Wo return w.byAddr[networkAddress] } -func composeNetworkAddress(network string, addr uint32) NetworkAddress { +func composeNetworkAddress(network string, addr netip.Addr) NetworkAddress { networkAddress := NetworkAddress{ Network: network, Address: addr, @@ -85,7 +85,7 @@ func (w *cache) AddWorkload(workload *workloadapi.Workload) { } // remove same uid but old address workload, avoid leak worklaod by address. for _, ip := range workloadByUid.Addresses { - addr := nets.ConvertIpByteToUint32(ip) + addr, _ := netip.AddrFromSlice(ip) networkAddress := composeNetworkAddress(workloadByUid.Network, addr) delete(w.byAddr, networkAddress) } @@ -93,7 +93,7 @@ func (w *cache) AddWorkload(workload *workloadapi.Workload) { w.byUid[uid] = workload for _, ip := range workload.Addresses { - addr := nets.ConvertIpByteToUint32(ip) + addr, _ := netip.AddrFromSlice(ip) networkAddress := composeNetworkAddress(workload.Network, addr) w.byAddr[networkAddress] = workload } @@ -106,7 +106,7 @@ func (w *cache) DeleteWorkload(uid string) { workload, exist := w.byUid[uid] if exist { for _, ip := range workload.Addresses { - addr := nets.ConvertIpByteToUint32(ip) + addr, _ := netip.AddrFromSlice(ip) networkAddress := composeNetworkAddress(workload.Network, addr) delete(w.byAddr, networkAddress) } diff --git a/pkg/controller/workload/cache/workload_cache_test.go b/pkg/controller/workload/cache/workload_cache_test.go index 1332809db..4691320d6 100644 --- a/pkg/controller/workload/cache/workload_cache_test.go +++ b/pkg/controller/workload/cache/workload_cache_test.go @@ -17,12 +17,12 @@ package cache import ( + "net/netip" "testing" "github.com/stretchr/testify/assert" "kmesh.net/kmesh/api/v2/workloadapi" - "kmesh.net/kmesh/pkg/nets" ) func TestAddWorkload(t *testing.T) { @@ -39,8 +39,8 @@ func TestAddWorkload(t *testing.T) { } w.AddWorkload(workload) assert.Equal(t, workload, w.byUid["123456"]) - addr1 := nets.ConvertIpByteToUint32([]byte("192.168.224.22")) - addr2 := nets.ConvertIpByteToUint32([]byte("1.2.3.4")) + addr1, _ := netip.AddrFromSlice([]byte("192.168.224.22")) + addr2, _ := netip.AddrFromSlice([]byte("1.2.3.4")) assert.Equal(t, workload, w.byAddr[NetworkAddress{Network: workload.Network, Address: addr1}]) assert.Equal(t, workload, w.byAddr[NetworkAddress{Network: workload.Network, Address: addr2}]) }) @@ -58,8 +58,8 @@ func TestAddWorkload(t *testing.T) { } w.AddWorkload(workload) assert.Equal(t, workload, w.byUid["123456"]) - addr1 := nets.ConvertIpByteToUint32([]byte("192.168.224.22")) - addr2 := nets.ConvertIpByteToUint32([]byte("1.2.3.4")) + addr1, _ := netip.AddrFromSlice([]byte("192.168.224.22")) + addr2, _ := netip.AddrFromSlice([]byte("1.2.3.4")) assert.Equal(t, workload, w.byAddr[NetworkAddress{Network: workload.Network, Address: addr1}]) assert.Equal(t, workload, w.byAddr[NetworkAddress{Network: workload.Network, Address: addr2}]) newWorkload := &workloadapi.Workload{ @@ -73,8 +73,8 @@ func TestAddWorkload(t *testing.T) { } w.AddWorkload(newWorkload) assert.Equal(t, newWorkload, w.byUid["123456"]) - addr3 := nets.ConvertIpByteToUint32([]byte("192.168.10.25")) - addr4 := nets.ConvertIpByteToUint32([]byte("2.3.4.5")) + addr3, _ := netip.AddrFromSlice([]byte("192.168.10.25")) + addr4, _ := netip.AddrFromSlice([]byte("2.3.4.5")) assert.Equal(t, newWorkload, w.byAddr[NetworkAddress{Network: newWorkload.Network, Address: addr3}]) assert.Equal(t, newWorkload, w.byAddr[NetworkAddress{Network: newWorkload.Network, Address: addr4}]) assert.Equal(t, (*workloadapi.Workload)(nil), w.byAddr[NetworkAddress{Network: workload.Network, Address: addr1}]) @@ -93,7 +93,7 @@ func TestAddWorkload(t *testing.T) { } w.AddWorkload(workload) assert.Equal(t, workload, w.byUid["123456"]) - addr := nets.ConvertIpByteToUint32([]byte("192.168.224.22")) + addr, _ := netip.AddrFromSlice([]byte("192.168.224.22")) assert.Equal(t, workload, w.byAddr[NetworkAddress{Network: workload.Network, Address: addr}]) newWorkload := &workloadapi.Workload{ Name: "ut-workload", @@ -106,8 +106,8 @@ func TestAddWorkload(t *testing.T) { } w.AddWorkload(newWorkload) assert.Equal(t, newWorkload, w.byUid["123456"]) - addr1 := nets.ConvertIpByteToUint32([]byte("192.168.224.22")) - addr2 := nets.ConvertIpByteToUint32([]byte("2.3.4.5")) + addr1, _ := netip.AddrFromSlice([]byte("192.168.224.22")) + addr2, _ := netip.AddrFromSlice([]byte("2.3.4.5")) assert.Equal(t, newWorkload, w.byAddr[NetworkAddress{Network: newWorkload.Network, Address: addr1}]) assert.Equal(t, newWorkload, w.byAddr[NetworkAddress{Network: newWorkload.Network, Address: addr2}]) assert.Equal(t, (*workloadapi.Workload)(nil), w.byAddr[NetworkAddress{Network: workload.Network, Address: addr}]) @@ -130,8 +130,8 @@ func TestDeleteWorkload(t *testing.T) { assert.Equal(t, workload, w.byUid["123456"]) w.DeleteWorkload("123456") assert.Equal(t, (*workloadapi.Workload)(nil), w.byUid["123456"]) - addr1 := nets.ConvertIpByteToUint32([]byte("hello")) - addr2 := nets.ConvertIpByteToUint32([]byte("world")) + addr1, _ := netip.AddrFromSlice([]byte("hello")) + addr2, _ := netip.AddrFromSlice([]byte("world")) assert.Equal(t, (*workloadapi.Workload)(nil), w.byAddr[NetworkAddress{Network: "ut-net", Address: addr1}]) assert.Equal(t, (*workloadapi.Workload)(nil), w.byAddr[NetworkAddress{Network: "ut-net", Address: addr2}]) }) diff --git a/pkg/controller/workload/workload_controller.go b/pkg/controller/workload/workload_controller.go index 931f96800..6efc19f06 100644 --- a/pkg/controller/workload/workload_controller.go +++ b/pkg/controller/workload/workload_controller.go @@ -46,14 +46,34 @@ func NewController(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Controller { } func (ws *Controller) WorkloadStreamCreateAndSend(client discoveryv3.AggregatedDiscoveryServiceClient, ctx context.Context) error { - var err error + var ( + err error + initialResourceVersions map[string]string + ) ws.Stream, err = client.DeltaAggregatedResources(ctx) if err != nil { return fmt.Errorf("DeltaAggregatedResources failed, %s", err) } - if err := ws.Stream.Send(newWorkloadRequest(AddressType, nil)); err != nil { + if ws.Processor != nil { + cachedServices := ws.Processor.ServiceCache.List() + cachedWorkloads := ws.Processor.WorkloadCache.List() + initialResourceVersions = make(map[string]string, len(cachedServices)+len(cachedWorkloads)) + + // add cached resource names + for _, service := range cachedServices { + initialResourceVersions[service.ResourceName()] = "" + } + + for _, workload := range cachedWorkloads { + initialResourceVersions[workload.ResourceName()] = "" + } + } + + log.Debugf("Initial address resources: %v", initialResourceVersions) + + if err := ws.Stream.Send(newInitialWorkloadRequest(AddressType, nil, initialResourceVersions)); err != nil { return fmt.Errorf("send request failed, %s", err) } diff --git a/pkg/controller/workload/workload_controller_test.go b/pkg/controller/workload/workload_controller_test.go index dbf2232a3..8cf86fed3 100644 --- a/pkg/controller/workload/workload_controller_test.go +++ b/pkg/controller/workload/workload_controller_test.go @@ -19,16 +19,13 @@ package workload import ( "context" "errors" - "net" "reflect" "testing" - "time" "github.com/agiledragon/gomonkey/v2" config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/anypb" cluster_v2 "kmesh.net/kmesh/api/v2/cluster" @@ -37,31 +34,18 @@ import ( "kmesh.net/kmesh/pkg/controller/xdstest" ) -func TestWorklaodStreamCreateAndSend(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - workloadStream := Controller{ - Processor: nil, - } - +func TestWorkloadStreamCreateAndSend(t *testing.T) { // create a fake grpc service client - mockDiscovery := xdstest.NewMockServer(t) - conn, err := grpc.Dial("buffcon", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return mockDiscovery.Listener.Dial() - })) + mockDiscovery := xdstest.NewXdsServer(t) + fakeClient, err := xdstest.NewClient(mockDiscovery) if err != nil { - t.Errorf("grpc connection client create failed, %s", err) + t.Errorf("create stream failed, %s", err) } - defer conn.Close() - client := discoveryv3.NewAggregatedDiscoveryServiceClient(conn) - stream, streamErr := client.DeltaAggregatedResources(ctx) - if streamErr != nil { - t.Errorf("create stream failed, %s", streamErr) + defer fakeClient.Cleanup() + workloadStream := Controller{ + Processor: nil, + Stream: fakeClient.DeltaClient, } - workloadStream.Stream = stream patches1 := gomonkey.NewPatches() patches2 := gomonkey.NewPatches() @@ -74,9 +58,9 @@ func TestWorklaodStreamCreateAndSend(t *testing.T) { { name: "test1: send request failed, should return error", beforeFunc: func() { - patches1.ApplyMethod(reflect.TypeOf(client), "DeltaAggregatedResources", + patches1.ApplyMethod(reflect.TypeOf(fakeClient.Client), "DeltaAggregatedResources", func(_ discoveryv3.AggregatedDiscoveryServiceClient, ctx context.Context, opts ...grpc.CallOption) (discoveryv3.AggregatedDiscoveryService_DeltaAggregatedResourcesClient, error) { - return stream, nil + return fakeClient.DeltaClient, nil }) patches2.ApplyMethod(reflect.TypeOf(workloadStream.Stream), "Send", func(_ discoveryv3.AggregatedDiscoveryService_DeltaAggregatedResourcesClient, req *discoveryv3.DeltaDiscoveryRequest) error { @@ -116,7 +100,7 @@ func TestWorklaodStreamCreateAndSend(t *testing.T) { { name: "test3: fail to create workloadStream, should return error", beforeFunc: func() { - patches1.ApplyMethod(reflect.TypeOf(client), "DeltaAggregatedResources", + patches1.ApplyMethod(reflect.TypeOf(fakeClient.Client), "DeltaAggregatedResources", func(_ discoveryv3.AggregatedDiscoveryServiceClient, ctx context.Context, opts ...grpc.CallOption) (discoveryv3.AggregatedDiscoveryService_DeltaAggregatedResourcesClient, error) { return nil, errors.New("fail to create adsstream") }) @@ -130,7 +114,7 @@ func TestWorklaodStreamCreateAndSend(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.beforeFunc() - err := workloadStream.WorkloadStreamCreateAndSend(client, context.TODO()) + err := workloadStream.WorkloadStreamCreateAndSend(fakeClient.Client, context.TODO()) if (err != nil) != tt.wantErr { t.Errorf("worklaodStream.WorklaodStreamCreateAndSend() error = %v, wantErr %v", err, tt.wantErr) return @@ -141,9 +125,6 @@ func TestWorklaodStreamCreateAndSend(t *testing.T) { } func TestAdsStream_AdsStreamProcess(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - workloadStream := Controller{ Processor: &Processor{ ack: &discoveryv3.DeltaDiscoveryRequest{}, @@ -151,23 +132,13 @@ func TestAdsStream_AdsStreamProcess(t *testing.T) { } // create a fake grpc service client - mockDiscovery := xdstest.NewMockServer(t) - conn, err := grpc.Dial("buffcon", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return mockDiscovery.Listener.Dial() - })) + mockDiscovery := xdstest.NewXdsServer(t) + fakeClient, err := xdstest.NewClient(mockDiscovery) if err != nil { - t.Errorf("grpc connection client create failed, %s", err) - } - defer conn.Close() - client := discoveryv3.NewAggregatedDiscoveryServiceClient(conn) - stream, streamErr := client.DeltaAggregatedResources(ctx) - if streamErr != nil { - t.Errorf("create stream failed, %s", streamErr) + t.Errorf("create stream failed, %s", err) } - workloadStream.Stream = stream + defer fakeClient.Cleanup() + workloadStream.Stream = fakeClient.DeltaClient patches1 := gomonkey.NewPatches() patches2 := gomonkey.NewPatches() diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index f79d1e625..18e6d9364 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -17,7 +17,6 @@ package workload import ( - "encoding/binary" "fmt" "os" "strings" @@ -52,7 +51,7 @@ type Processor struct { // workloads indexer, svc key -> workload id endpointsByService map[string]map[string]struct{} bpf *bpf.Cache - Sm *kmeshsecurity.SecretManager + SecretManager *kmeshsecurity.SecretManager nodeName string WorkloadCache cache.WorkloadCache ServiceCache cache.ServiceCache @@ -69,6 +68,17 @@ func newProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor { } } +func newInitialWorkloadRequest(typeUrl string, names []string, initialResourceVersions map[string]string) *service_discovery_v3.DeltaDiscoveryRequest { + return &service_discovery_v3.DeltaDiscoveryRequest{ + TypeUrl: typeUrl, + ResourceNamesSubscribe: names, + InitialResourceVersions: initialResourceVersions, + ResponseNonce: "", + ErrorDetail: nil, + Node: config.GetConfig(constants.WorkloadMode).GetNode(), + } +} + func newWorkloadRequest(typeUrl string, names []string) *service_discovery_v3.DeltaDiscoveryRequest { return &service_discovery_v3.DeltaDiscoveryRequest{ TypeUrl: typeUrl, @@ -139,7 +149,7 @@ func (p *Processor) deletePodFrontendData(uid uint32) error { bk.BackendUid = uid if err := p.bpf.BackendLookup(&bk, &bv); err == nil { log.Debugf("Find BackendValue: [%#v]", bv) - fk.IPv4 = bv.IPv4 + fk.Ip = bv.Ip if err = p.bpf.FrontendDelete(&fk); err != nil { log.Errorf("FrontendDelete failed: %s", err) return err @@ -155,7 +165,8 @@ func (p *Processor) storePodFrontendData(uid uint32, ip []byte) error { fv = bpf.FrontendValue{} ) - fk.IPv4 = binary.LittleEndian.Uint32(ip) + nets.CopyIpByteFromSlice(&fk.Ip, &ip) + fv.UpstreamId = uid if err := p.bpf.FrontendUpdate(&fk, &fv); err != nil { log.Errorf("Update frontend map failed, err:%s", err) @@ -176,13 +187,15 @@ func (p *Processor) removeWorkloadResource(removedResources []string) error { ) for _, uid := range removedResources { - exist := p.WorkloadCache.GetWorkloadByUid(uid) - if exist != nil && p.isManagedWorkload(exist) { - Identity := p.getIdentityByUid(uid) - p.Sm.SendCertRequest(Identity, kmeshsecurity.DELETE) + if p.SecretManager != nil { + exist := p.WorkloadCache.GetWorkloadByUid(uid) + if exist != nil && p.isManagedWorkload(exist) { + Identity := p.getIdentityByUid(uid) + p.SecretManager.SendCertRequest(Identity, kmeshsecurity.DELETE) + } } - p.WorkloadCache.DeleteWorkload(uid) + p.WorkloadCache.DeleteWorkload(uid) backendUid := p.hashName.StrToNum(uid) // for Pod to Pod access, Pod info stored in frontend map, when Pod offline, we need delete the related records if err = p.deletePodFrontendData(backendUid); err != nil { @@ -344,7 +357,7 @@ func (p *Processor) storeBackendData(uid uint32, ip []byte, waypoint *workloadap ) bk.BackendUid = uid - bv.IPv4 = nets.ConvertIpByteToUint32(ip) + nets.CopyIpByteFromSlice(&bv.Ip, &ip) bv.ServiceCount = 0 for serviceName := range portList { bv.Services[bv.ServiceCount] = p.hashName.StrToNum(serviceName) @@ -356,8 +369,11 @@ func (p *Processor) storeBackendData(uid uint32, ip []byte, waypoint *workloadap } if waypoint != nil { - bv.WaypointAddr = nets.ConvertIpByteToUint32(waypoint.GetAddress().Address) + nets.CopyIpByteFromSlice(&bv.WaypointAddr, &waypoint.GetAddress().Address) bv.WaypointPort = nets.ConvertPortToBigEndian(waypoint.GetHboneMtlsPort()) + + fmt.Printf("addr %x", bv.WaypointAddr) + fmt.Printf("port %x", bv.WaypointPort) } if err := p.bpf.BackendUpdate(&bk, &bv); err != nil { @@ -427,17 +443,18 @@ func (p *Processor) handleDataWithoutService(workload *workloadapi.Workload) err bk = bpf.BackendKey{} bv = bpf.BackendValue{} ) + uid := p.hashName.StrToNum(workload.GetUid()) ips := workload.GetAddresses() for _, ip := range ips { if waypoint := workload.GetWaypoint(); waypoint != nil { - addr := waypoint.GetAddress().Address - bv.WaypointAddr = nets.ConvertIpByteToUint32(addr) + nets.CopyIpByteFromSlice(&bv.WaypointAddr, &waypoint.GetAddress().Address) bv.WaypointPort = nets.ConvertPortToBigEndian(waypoint.GetHboneMtlsPort()) } bk.BackendUid = uid - bv.IPv4 = nets.ConvertIpByteToUint32(ip) + + nets.CopyIpByteFromSlice(&bv.Ip, &ip) if err = p.bpf.BackendUpdate(&bk, &bv); err != nil { log.Errorf("Update backend map failed, err:%s", err) return err @@ -453,16 +470,18 @@ func (p *Processor) handleDataWithoutService(workload *workloadapi.Workload) err func (p *Processor) handleWorkload(workload *workloadapi.Workload) error { log.Debugf("handle workload: %s", workload.Uid) - if p.isManagedWorkload(workload) { - oldIdentity := p.getIdentityByUid(workload.Uid) - if oldIdentity == "" { - newIdentity := spiffe.Identity{ + if p.SecretManager != nil && p.isManagedWorkload(workload) { + wl := p.WorkloadCache.GetWorkloadByUid(workload.Uid) + // only send cert request for a workload once + // because workload identity can be updated + if wl == nil { + identity := spiffe.Identity{ TrustDomain: workload.TrustDomain, Namespace: workload.Namespace, ServiceAccount: workload.ServiceAccount, }.String() // This is the case workload added first time - p.Sm.SendCertRequest(newIdentity, kmeshsecurity.ADD) + p.SecretManager.SendCertRequest(identity, kmeshsecurity.ADD) } } @@ -493,8 +512,7 @@ func (p *Processor) storeServiceFrontendData(serviceId uint32, service *workload fv.UpstreamId = serviceId for _, networkAddress := range service.GetAddresses() { - address := networkAddress.Address - fk.IPv4 = nets.ConvertIpByteToUint32(address) + nets.CopyIpByteFromSlice(&fk.Ip, &networkAddress.Address) if err = p.bpf.FrontendUpdate(&fk, &fv); err != nil { log.Errorf("Update Frontend failed, err:%s", err) return err @@ -517,7 +535,7 @@ func (p *Processor) storeServiceData(serviceName string, waypoint *workloadapi.G newValue := bpf.ServiceValue{} newValue.LbPolicy = LbPolicyRandom if waypoint != nil { - newValue.WaypointAddr = nets.ConvertIpByteToUint32(waypoint.GetAddress().Address) + nets.CopyIpByteFromSlice(&newValue.WaypointAddr, &waypoint.GetAddress().Address) newValue.WaypointPort = nets.ConvertPortToBigEndian(waypoint.GetHboneMtlsPort()) } diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index dcc107276..7660337da 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -26,9 +26,106 @@ import ( "kmesh.net/kmesh/api/v2/workloadapi" "kmesh.net/kmesh/daemon/options" "kmesh.net/kmesh/pkg/constants" + "kmesh.net/kmesh/pkg/controller/workload/bpfcache" + "kmesh.net/kmesh/pkg/nets" "kmesh.net/kmesh/pkg/utils/test" ) +func Test_handleWorkload(t *testing.T) { + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := newProcessor(workloadMap) + + // 1. handle workload with service, but service not handled yet + // In this case, only frontend map and backend map should be updated. + wl := createTestWorkloadWithService() + _ = p.handleDataWithService(createTestWorkloadWithService()) + var ( + ek bpfcache.EndpointKey + ev bpfcache.EndpointValue + ) + + workloadID := checkFrontEndMap(t, wl.Addresses[0], p) + checkBackendMap(t, p, workloadID, wl) + + epKeys := p.bpf.EndpointIterFindKey(workloadID) + assert.Equal(t, len(epKeys), 0) + for svcName := range wl.Services { + endpoints := p.endpointsByService[svcName] + assert.Len(t, endpoints, 1) + if _, ok := endpoints[wl.Uid]; ok { + assert.True(t, ok) + } + } + + // 2. add related service + fakeSvc := createFakeService("testsvc", "10.240.10.1", "10.240.10.2") + _ = p.handleService(fakeSvc) + + // 2.1 check front end map contains service + svcID := checkFrontEndMap(t, fakeSvc.Addresses[0].Address, p) + + // 2.2 check service map contains service + checkServiceMap(t, p, svcID, fakeSvc, 1) + + // 2.3 check endpoint map now contains the workloads + ek.BackendIndex = 1 + ek.ServiceId = svcID + err := p.bpf.EndpointLookup(&ek, &ev) + assert.NoError(t, err) + assert.Equal(t, ev.BackendUid, workloadID) + + // 3. add another workload with service + workload2 := createFakeWorkload("1.2.3.5") + _ = p.handleDataWithService(workload2) + + // 3.1 check endpoint map now contains the new workloads + workload2ID := checkFrontEndMap(t, workload2.Addresses[0], p) + ek.BackendIndex = 2 + ek.ServiceId = svcID + err = p.bpf.EndpointLookup(&ek, &ev) + assert.NoError(t, err) + assert.Equal(t, ev.BackendUid, workload2ID) + + // 3.2 check service map contains service + checkServiceMap(t, p, svcID, fakeSvc, 2) +} + +func checkServiceMap(t *testing.T, p *Processor, svcId uint32, fakeSvc *workloadapi.Service, endpointCount uint32) { + var sv bpfcache.ServiceValue + err := p.bpf.ServiceLookup(&bpfcache.ServiceKey{ServiceId: svcId}, &sv) + assert.NoError(t, err) + assert.Equal(t, sv.EndpointCount, endpointCount) + waypointAddr := fakeSvc.GetWaypoint().GetAddress().GetAddress() + if waypointAddr != nil { + assert.Equal(t, test.EqualIp(sv.WaypointAddr, waypointAddr), true) + } + assert.Equal(t, sv.WaypointPort, nets.ConvertPortToBigEndian(15008)) +} + +func checkBackendMap(t *testing.T, p *Processor, workloadID uint32, wl *workloadapi.Workload) { + var bv bpfcache.BackendValue + err := p.bpf.BackendLookup(&bpfcache.BackendKey{BackendUid: workloadID}, &bv) + assert.NoError(t, err) + assert.Equal(t, test.EqualIp(bv.Ip, wl.Addresses[0]), true) + waypointAddr := wl.GetWaypoint().GetAddress().GetAddress() + if waypointAddr != nil { + assert.Equal(t, test.EqualIp(bv.WaypointAddr, waypointAddr), true) + } + assert.Equal(t, bv.WaypointPort, nets.ConvertPortToBigEndian(wl.GetWaypoint().GetHboneMtlsPort())) +} + +func checkFrontEndMap(t *testing.T, ip []byte, p *Processor) (upstreamId uint32) { + var fk bpfcache.FrontendKey + var fv bpfcache.FrontendValue + nets.CopyIpByteFromSlice(&fk.Ip, &ip) + err := p.bpf.FrontendLookup(&fk, &fv) + assert.NoError(t, err) + upstreamId = fv.UpstreamId + return +} + func BenchmarkHandleDataWithService(b *testing.B) { t := &testing.T{} config := options.BpfConfig{ @@ -44,13 +141,13 @@ func BenchmarkHandleDataWithService(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - workload := createTestWorkload() + workload := createTestWorkloadWithService() err := workloadController.Processor.handleDataWithService(workload) assert.NoError(t, err) } } -func createTestWorkload() *workloadapi.Workload { +func createTestWorkloadWithService() *workloadapi.Workload { workload := workloadapi.Workload{ Namespace: "ns", Name: "name", @@ -63,7 +160,42 @@ func createTestWorkload() *workloadapi.Workload { Status: workloadapi.WorkloadStatus_HEALTHY, ClusterId: "cluster0", Services: map[string]*workloadapi.PortList{ - "ns/hostname": { + "default/testsvc.default.svc.cluster.local": { + Ports: []*workloadapi.Port{ + { + ServicePort: 80, + TargetPort: 8080, + }, + { + ServicePort: 81, + TargetPort: 8180, + }, + { + ServicePort: 82, + TargetPort: 82, + }, + }, + }, + }, + } + workload.Uid = "cluster0/" + rand.String(6) + return &workload +} + +func createFakeWorkload(ip string) *workloadapi.Workload { + workload := workloadapi.Workload{ + Namespace: "ns", + Name: "name", + Addresses: [][]byte{netip.MustParseAddr(ip).AsSlice()}, + Network: "testnetwork", + CanonicalName: "foo", + CanonicalRevision: "latest", + WorkloadType: workloadapi.WorkloadType_POD, + WorkloadName: "name", + Status: workloadapi.WorkloadStatus_HEALTHY, + ClusterId: "cluster0", + Services: map[string]*workloadapi.PortList{ + "default/testsvc.default.svc.cluster.local": { Ports: []*workloadapi.Port{ { ServicePort: 80, @@ -84,3 +216,38 @@ func createTestWorkload() *workloadapi.Workload { workload.Uid = "cluster0/" + rand.String(6) return &workload } + +func createFakeService(name, ip, waypoint string) *workloadapi.Service { + return &workloadapi.Service{ + Name: name, + Namespace: "default", + Hostname: "testsvc.default.svc.cluster.local", + Addresses: []*workloadapi.NetworkAddress{ + { + Address: netip.MustParseAddr(ip).AsSlice(), + }, + }, + Ports: []*workloadapi.Port{ + { + ServicePort: 80, + TargetPort: 8080, + }, + { + ServicePort: 81, + TargetPort: 8180, + }, + { + ServicePort: 82, + TargetPort: 82, + }, + }, + Waypoint: &workloadapi.GatewayAddress{ + Destination: &workloadapi.GatewayAddress_Address{ + Address: &workloadapi.NetworkAddress{ + Address: netip.MustParseAddr(waypoint).AsSlice(), + }, + }, + HboneMtlsPort: 15008, + }, + } +} diff --git a/pkg/controller/xdstest/fake_xdsclient.go b/pkg/controller/xdstest/fake_xdsclient.go new file mode 100644 index 000000000..489338128 --- /dev/null +++ b/pkg/controller/xdstest/fake_xdsclient.go @@ -0,0 +1,72 @@ +/* + * Copyright 2024 The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xdstest + +import ( + "context" + "fmt" + "net" + + discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type XDSClient struct { + Client discoveryv3.AggregatedDiscoveryServiceClient + AdsClient discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesClient + DeltaClient discoveryv3.AggregatedDiscoveryService_DeltaAggregatedResourcesClient + conn *grpc.ClientConn +} + +func NewClient(xdsServer *XDSServer) (*XDSClient, error) { + conn, err := grpc.Dial("buffcon", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return xdsServer.Listener.Dial() + })) + if err != nil { + return nil, fmt.Errorf("grpc connection client create failed, %s", err) + } + client := discoveryv3.NewAggregatedDiscoveryServiceClient(conn) + deltaClient, err := client.DeltaAggregatedResources(context.Background()) + if err != nil { + return nil, fmt.Errorf("DeltaAggregatedResources failed, %s", err) + } + adsClient, err := client.StreamAggregatedResources(context.Background()) + if err != nil { + return nil, fmt.Errorf("StreamAggregatedResources failed, %s", err) + } + + return &XDSClient{ + Client: client, + AdsClient: adsClient, + DeltaClient: deltaClient, + conn: conn, + }, nil +} + +func (c *XDSClient) Cleanup() { + if c.AdsClient != nil { + _ = c.AdsClient.CloseSend() + } + if c.DeltaClient != nil { + _ = c.DeltaClient.CloseSend() + } + c.conn.Close() +} diff --git a/pkg/controller/xdstest/fake_xdsserver.go b/pkg/controller/xdstest/fake_xdsserver.go index 4d6f99240..8a73b619f 100644 --- a/pkg/controller/xdstest/fake_xdsserver.go +++ b/pkg/controller/xdstest/fake_xdsserver.go @@ -25,15 +25,15 @@ import ( "istio.io/pkg/log" ) -type MockDiscovery struct { +type XDSServer struct { Listener *bufconn.Listener responses chan *discoveryv3.DiscoveryResponse DeltaResponses chan *discoveryv3.DeltaDiscoveryResponse close chan struct{} } -func NewMockServer(t *testing.T) *MockDiscovery { - s := &MockDiscovery{ +func NewXdsServer(t *testing.T) *XDSServer { + s := &XDSServer{ close: make(chan struct{}), responses: make(chan *discoveryv3.DiscoveryResponse), DeltaResponses: make(chan *discoveryv3.DeltaDiscoveryResponse), @@ -56,7 +56,7 @@ func NewMockServer(t *testing.T) *MockDiscovery { return s } -func (f *MockDiscovery) StreamAggregatedResources(server discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { +func (f *XDSServer) StreamAggregatedResources(server discoveryv3.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { numberOfSends := 0 for { select { @@ -72,7 +72,7 @@ func (f *MockDiscovery) StreamAggregatedResources(server discoveryv3.AggregatedD } } -func (f *MockDiscovery) DeltaAggregatedResources(server discoveryv3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { +func (f *XDSServer) DeltaAggregatedResources(server discoveryv3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { numberOfSends := 0 for { select { diff --git a/pkg/nets/nets.go b/pkg/nets/nets.go index 3e21473c5..575816745 100644 --- a/pkg/nets/nets.go +++ b/pkg/nets/nets.go @@ -58,5 +58,19 @@ func ConvertPortToBigEndian(little uint32) uint32 { // ConvertIpByteToUint32 converts ip to little-endian uint32 format func ConvertIpByteToUint32(ip []byte) uint32 { + if len(ip) != 4 { + return 0 + } return binary.LittleEndian.Uint32(ip) } + +func CopyIpByteFromSlice(dst *[16]byte, src *[]byte) { + len := len(*src) + if len != 4 && len != 16 { + return + } + + for i := 0; i < len; i++ { + (*dst)[i] = (*src)[i] + } +} diff --git a/pkg/utils/test/bpf_map.go b/pkg/utils/test/bpf_map.go index 4c3b362d0..e82ba621a 100644 --- a/pkg/utils/test/bpf_map.go +++ b/pkg/utils/test/bpf_map.go @@ -85,3 +85,23 @@ func CleanupBpfMap() { fmt.Println("remove /mnt/kmesh_cgroup2 error: ", err) } } + +func EqualIp(src [16]byte, dst []byte) bool { + if dst == nil { + return false + } + size := len(dst) + if size == 0 { + return false + } + if size != 4 && size != 16 { + return false + } + + for i := 0; i < size; i++ { + if src[i] != dst[i] { + return false + } + } + return true +}