Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add K8s version query function through Tongyi Lingma analysis #1135

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/ingress/kube/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,16 @@ var (
"Total invalid ingresses known to pilot.",
monitoring.WithLabels(clusterTag, invalidType),
)

queryK8sVersionFail = monitoring.NewSum(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个指标没有意义

"pilot_query_k8s_version_fail",
"query k8s version of remote cluster fail number")
)

func init() {
monitoring.MustRegister(totalIngresses)
monitoring.MustRegister(totalInvalidIngress)
monitoring.MustRegister(queryK8sVersionFail)
}

func RecordIngressNumber(cluster string, number int) {
Expand Down
121 changes: 121 additions & 0 deletions pkg/ingress/kube/common/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package common
import (
"crypto/md5"
"encoding/hex"
"k8s.io/apimachinery/pkg/util/wait"
"net"
"sort"
"strings"
"time"

networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pilot/pkg/model"
Expand All @@ -32,6 +34,30 @@ import (
. "github.com/alibaba/higress/pkg/ingress/log"
)

const (
defaultInterval = 3 * time.Second
defaultTimeout = 1 * time.Minute
)

type retry struct {
interval time.Duration
timeout time.Duration
}

type RetryOption func(o *retry)

func WithInterval(interval time.Duration) RetryOption {
return func(r *retry) {
r.interval = interval
}
}

func WithTimeout(timeout time.Duration) RetryOption {
return func(r *retry) {
r.timeout = timeout
}
}

func ValidateBackendResource(resource *v1.TypedLocalObjectReference) bool {
if resource == nil || resource.APIGroup == nil ||
*resource.APIGroup != netv1.SchemeGroupVersion.Group ||
Expand Down Expand Up @@ -62,6 +88,62 @@ func V1Available(client kube.Client) bool {
return runningVersion.AtLeast(version119)
}

func V1Available(client kube.Client, retryOptions ...RetryOption) bool {
retry := &retry{
interval: defaultInterval,
timeout: defaultTimeout,
}

for _, option := range retryOptions {
option(retry)
}

// most case is greater than 1.18
supportV1 := true
err := wait.PollImmediate(retry.interval, retry.timeout, func() (done bool, err error) {
available, err := v1Available(client)
if err != nil {
IngressLog.Errorf("check v1 available error: %v", err)
// retry
return false, nil
}
supportV1 = available
// we have done.
return true, nil
})

if err != nil {
IngressLog.Errorf("check v1 available finally error: %v", err)
}

return supportV1
}

func IsRunningVersionAtLeast(atLeastVersionStr string, client kube.Client) (bool, error) {
atLeastVersion, _ := version.ParseGeneric(atLeastVersionStr)

serverVersion, err := client.GetKubernetesVersion()
if err != nil {
// Consider the new ingress package is available as default
return true
queryK8sVersionFail.Increment()
return false, err
}

runningVersion, err := version.ParseGeneric(serverVersion.String())
if err != nil {
// Consider the new ingress package is available as default
IngressLog.Errorf("unexpected error parsing running Kubernetes version: %v", err)
return true
queryK8sVersionFail.Increment()
return false, err
}

return runningVersion.AtLeast(version119)
return runningVersion.AtLeast(atLeastVersion), nil
}


// NetworkingIngressAvailable check if the "networking" group Ingress is available.
func NetworkingIngressAvailable(client kube.Client) bool {
// check kubernetes version to use new ingress package or not
Expand All @@ -81,6 +163,45 @@ func NetworkingIngressAvailable(client kube.Client) bool {
return runningVersion.AtLeast(version118)
}

func NetworkingIngressAvailable(client kube.Client, retryOptions ...RetryOption) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这么做的好处在哪里?原本机制有什么问题?

retry := &retry{
interval: defaultInterval,
timeout: defaultTimeout,
}

serverVersion, err := client.GetKubernetesVersion()
if err != nil {
return false
for _, option := range retryOptions {
option(retry)
}

runningVersion, err := version.ParseGeneric(serverVersion.String())
// most case is greater than or equal 1.18.
supportNetworking := true

err := wait.PollImmediate(retry.interval, retry.timeout, func() (done bool, err error) {
available, err := networkingIngressAvailable(client)
if err != nil {
IngressLog.Errorf("check networking available error: %v", err)
// retry
return false, nil
}
supportNetworking = available
// we have done.
return true, nil
})

if err != nil {
IngressLog.Errorf("unexpected error parsing running Kubernetes version: %v", err)
return false
IngressLog.Errorf("check networking available finally error: %v", err)
}

return runningVersion.AtLeast(version118)
return supportNetworking
}

// SortIngressByCreationTime sorts the list of config objects in ascending order by their creation time (if available).
func SortIngressByCreationTime(configs []config.Config) {
sort.Slice(configs, func(i, j int) bool {
Expand Down
104 changes: 104 additions & 0 deletions pkg/ingress/kube/common/tool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,107 @@ func TestSortHTTPRoutesWithMoreRules(t *testing.T) {
}
}
}

type supportV1Client struct {
kube.Client
}

func (s *supportV1Client) GetKubernetesVersion() (*kubeVersion.Info, error) {
return &kubeVersion.Info{
GitVersion: "v1.28.3-aliyun.1",
}, nil
}

type unSupportV1Client struct {
kube.Client
}

func (u *unSupportV1Client) GetKubernetesVersion() (*kubeVersion.Info, error) {
return &kubeVersion.Info{
GitVersion: "v1.18.0",
}, nil
}

type supportNetworkingClient struct {
kube.Client
}

func (s *supportNetworkingClient) GetKubernetesVersion() (*kubeVersion.Info, error) {
return &kubeVersion.Info{
GitVersion: "v1.18.0-aliyun.1",
}, nil
}

type unSupportNetworkingClient struct {
kube.Client
}

func (u *unSupportNetworkingClient) GetKubernetesVersion() (*kubeVersion.Info, error) {
return &kubeVersion.Info{
GitVersion: "v1.17.0-aliyun.1",
}, nil
}

type errorClient struct {
kube.Client
}

func (e *errorClient) GetKubernetesVersion() (*kubeVersion.Info, error) {
return &kubeVersion.Info{
GitVersion: "error",
}, nil
}

func TestV1Available(t *testing.T) {
fakeClient := kube.NewFakeClient()

v1Client := &supportV1Client{
fakeClient,
}

if !V1Available(v1Client) {
t.Fatal("should support v1")
}

v1Beta1Client := &unSupportV1Client{
fakeClient,
}
if V1Available(v1Beta1Client) {
t.Fatal("should not support v1")
}

errorClient := &errorClient{
fakeClient,
}
// will fallback to v1
if !V1Available(errorClient, WithInterval(1*time.Second), WithTimeout(3*time.Second)) {
t.Fatal("should fallback to v1")
}
}

func TestNetworkingIngressAvailable(t *testing.T) {
fakeClient := kube.NewFakeClient()

networkingClient := &supportNetworkingClient{
fakeClient,
}

if !NetworkingIngressAvailable(networkingClient) {
t.Fatal("should support networking")
}

notNetworkingClient := &unSupportNetworkingClient{
fakeClient,
}
if NetworkingIngressAvailable(notNetworkingClient) {
t.Fatal("should not support networking")
}

errorClient := &errorClient{
fakeClient,
}
// will fallback to networking
if !NetworkingIngressAvailable(errorClient, WithInterval(1*time.Second), WithTimeout(3*time.Second)) {
t.Fatal("should fallback to networking")
}
}
78 changes: 57 additions & 21 deletions plugins/wasm-cpp/common/http_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,32 +158,68 @@ inline std::string subspan(absl::string_view source, size_t start, size_t end) {
return {source.data() + start, end - start};
}

QueryParams parseParameters(absl::string_view data, size_t start,
bool decode_params) {
QueryParams params;
class QueryParams {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

修改这里是要干什么?

public:
using ParamMap = std::unordered_map<std::string, std::string>;
ParamMap params;

while (start < data.size()) {
size_t end = data.find('&', start);
if (end == std::string::npos) {
end = data.size();
void emplace(const std::string& name, const std::string& value) {
params.emplace(name, value);
}
absl::string_view param(data.data() + start, end - start);

const size_t equal = param.find('=');
if (equal != std::string::npos) {
const auto param_name = subspan(data, start, start + equal);
const auto param_value = subspan(data, start + equal + 1, end);
params.emplace(
decode_params ? PercentEncoding::decode(param_name) : param_name,
decode_params ? PercentEncoding::decode(param_value) : param_value);
} else {
params.emplace(subspan(data, start, end), "");
};

QueryParams parseParameters(absl::string_view data, size_t start,
bool decode_params) {
if (start > data.size()) {
throw std::out_of_range("Start index is out of range.");
}

start = end + 1;
}
QueryParams params;

return params;
while (start < data.size()) {
size_t end = data.find('&', start);
if (end == std::string::npos) {
end = data.size();
}

absl::string_view param(data.data() + start, end - start);

const size_t equal = param.find('=');
if (equal != std::string::npos) {
auto param_name = subspan(data, start, start + equal);
auto param_value = subspan(data, start + equal + 1, end);

try {
if (decode_params) {
param_name = PercentEncoding::decode(param_name);
param_value = PercentEncoding::decode(param_value);
}
} catch (const std::exception& e) {
// Handle decoding exceptions if necessary, or rethrow.
throw;
}

params.emplace(param_name, param_value);
} else {
// Handle parameters without '=' (i.e., no value).
// Optionally decode according to `decode_params`.
try {
if (decode_params) {
auto decoded_param = PercentEncoding::decode(param);
params.emplace(decoded_param, "");
} else {
params.emplace(param, "");
}
} catch (const std::exception& e) {
// Handle decoding exceptions if necessary, or rethrow.
throw;
}
}

start = end + 1;
}

return params;
}

std::vector<std::string> getAllOfHeader(std::string_view key) {
Expand Down