Skip to content


Merge pull request kubeedge#2727 from Iceber/optimize-leaderlection
Browse files Browse the repository at this point in the history
cloud/leaderelection: fix and optimize leaderelection
  • Loading branch information
kubeedge-bot authored Mar 29, 2021
2 parents d610464 + debd988 commit 46b6e52
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 78 deletions.
36 changes: 19 additions & 17 deletions cloud/pkg/common/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,31 @@ import (
cloudcoreConfig ""

var keClient *kubeEdgeClient
var once sync.Once
var (
initOnce sync.Once
keClient *kubeEdgeClient

func InitKubeEdgeClient(config *cloudcoreConfig.KubeAPIConfig) {
kubeConfig, err := clientcmd.BuildConfigFromFlags(config.Master,
if err != nil {
klog.Errorf("Failed to build config, err: %v", err)
kubeConfig.QPS = float32(config.QPS)
kubeConfig.Burst = int(config.Burst)
initOnce.Do(func() {
kubeConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.KubeConfig)
if err != nil {
klog.Errorf("Failed to build config, err: %v", err)

kubeConfig.QPS = float32(config.QPS)
kubeConfig.Burst = int(config.Burst)

dynamicClient := dynamic.NewForConfigOrDie(kubeConfig)
dynamicClient := dynamic.NewForConfigOrDie(kubeConfig)

kubeConfig.ContentType = runtime.ContentTypeProtobuf
kubeClient := kubernetes.NewForConfigOrDie(kubeConfig)
kubeConfig.ContentType = runtime.ContentTypeProtobuf
kubeClient := kubernetes.NewForConfigOrDie(kubeConfig)

crdKubeConfig := rest.CopyConfig(kubeConfig)
crdKubeConfig.ContentType = runtime.ContentTypeJSON
crdClient := crdClientset.NewForConfigOrDie(crdKubeConfig)
crdKubeConfig := rest.CopyConfig(kubeConfig)
crdKubeConfig.ContentType = runtime.ContentTypeJSON
crdClient := crdClientset.NewForConfigOrDie(crdKubeConfig)

once.Do(func() {
keClient = &kubeEdgeClient{
kubeClient: kubeClient,
crdClient: crdClient,
Expand Down
131 changes: 70 additions & 61 deletions cloud/pkg/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,47 +45,50 @@ func Run(cfg *config.CloudCoreConfig, readyzAdaptor *ReadyzAdaptor) {
klog.Warningf("Create Namespace kubeedge failed with error: %s", err)

coreRecorder := coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "CloudCore"})
leaderElectionConfig, err := makeLeaderElectionConfig(*cfg.LeaderElection, cli, coreRecorder)

if err != nil {
klog.Errorf("couldn't create leaderElectorConfig: %v", err)

leaderElectionConfig.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// Start all modules,

// Patch PodReadinessGate if program run in pod
err := TryToPatchPodReadinessGate(corev1.ConditionTrue)
if err != nil {
if err := TryToPatchPodReadinessGate(corev1.ConditionTrue); err != nil {
// Terminate the program gracefully
klog.Errorf("Error patching pod readinessGate: %v", err)
if err := TriggerGracefulShutdown(); err != nil {
klog.Fatalf("failed to gracefully terminate program: %v", err)
OnStoppedLeading: func() {
// TODO: is it necessary to terminate the program gracefully?
//klog.Fatalf("leaderelection lost, rudely terminate program")
klog.Errorf("leaderelection lost, gracefully terminate program")

// Reset PodReadinessGate to false if cloudcore stop
err := TryToPatchPodReadinessGate(corev1.ConditionFalse)
if err != nil {
if err := TryToPatchPodReadinessGate(corev1.ConditionFalse); err != nil {
klog.Errorf("Error reset pod readinessGate: %v", err)

// Trigger core.GracefulShutdown()
if err := TriggerGracefulShutdown(); err != nil {
klog.Fatalf("failed to gracefully terminate program: %v", err)

leaderElector, err := leaderelection.NewLeaderElector(*leaderElectionConfig)
// Set readyzAdaptor manually
if err != nil {
klog.Errorf("couldn't create leader elector: %v", err)

// Start leaderElection until becoming leader, terminate program if leader lost or context.cancel
go leaderElector.Run(beehiveContext.GetContext())
Expand Down Expand Up @@ -130,69 +133,75 @@ func makeLeaderElectionConfig(config componentbaseconfig.LeaderElectionConfigura
// Try to patch PodReadinessGate if program runs in pod
func TryToPatchPodReadinessGate(status corev1.ConditionStatus) error {
podname, isInPod := os.LookupEnv("CLOUDCORE_POD_NAME")
if isInPod {
namespace := os.Getenv("CLOUDCORE_POD_NAMESPACE")
klog.Infof("CloudCore is running in pod %v/%v, try to patch PodReadinessGate", namespace, podname)
//TODO: use specific clients
client := client.GetKubeClient()

//Creat patchBytes
getPod, err := client.CoreV1().Pods(namespace).Get(context.Background(), podname, metav1.GetOptions{})
originalJSON, err := json.Marshal(getPod)
if err != nil {
return fmt.Errorf("failed to marshal modified pod %q into JSON: %v", podname, err)
//Todo: Read PodReadinessGate from CloudCore configuration or env
condition := corev1.PodCondition{Type: "", Status: status}
podutil.UpdatePodCondition(&getPod.Status, &condition)
newJSON, err := json.Marshal(getPod)
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(originalJSON, newJSON, corev1.Pod{})
if err != nil {
return fmt.Errorf("failed to create two way merge patch: %v", err)
if !isInPod {
klog.Infoln("CloudCore is not running in pod")
return nil

var maxRetries = 3
var isPatchSuccess = false
for i := 1; i <= maxRetries; i++ {
_, err = client.CoreV1().Pods(namespace).Patch(context.Background(), podname, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err == nil {
isPatchSuccess = true
klog.Infof("Successfully patching podReadinessGate: to pod %q through apiserver", podname)
if errors.IsConflict(err) {
// If the patch failure is due to update conflict, the necessary retransmission is performed
if i >= maxRetries {
klog.Errorf("updateMaxRetries(%d) has reached, failed to patching podReadinessGate: because of update conflict", maxRetries)
namespace := os.Getenv("CLOUDCORE_POD_NAMESPACE")
klog.Infof("CloudCore is running in pod %s/%s, try to patch PodReadinessGate", namespace, podname)
client := client.GetKubeClient()

//Creat patchBytes
getPod, err := client.CoreV1().Pods(namespace).Get(context.Background(), podname, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pod(%s/%s): %v", namespace, podname, err)
originalJSON, err := json.Marshal(getPod)
if err != nil {
return fmt.Errorf("failed to marshal original pod %q into JSON: %v", podname, err)

//Todo: Read PodReadinessGate from CloudCore configuration or env
condition := corev1.PodCondition{Type: "", Status: status}
podutil.UpdatePodCondition(&getPod.Status, &condition)
newJSON, err := json.Marshal(getPod)
if err != nil {
return fmt.Errorf("failed to marshal modified pod %q into JSON: %v", podname, err)
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(originalJSON, newJSON, corev1.Pod{})
if err != nil {
return fmt.Errorf("failed to create two way merge patch: %v", err)

var maxRetries = 3
for i := 1; i <= maxRetries; i++ {
_, err = client.CoreV1().Pods(namespace).Patch(context.Background(), podname, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err == nil {
klog.Infof("Successfully patching podReadinessGate: to pod %q through apiserver", podname)
return nil
if !isPatchSuccess {
if !errors.IsConflict(err) {
return err
} else {
klog.Infoln("CloudCore is not running in pod")

// If the patch failure is due to update conflict, the necessary retransmission is performed
if i >= maxRetries {
klog.Errorf("updateMaxRetries(%d) has reached, failed to patching podReadinessGate: because of update conflict", maxRetries)
return nil

return err

// Trigger core.GracefulShutdown()
func TriggerGracefulShutdown() {
// TriggerGracefulShutdown triggers core.GracefulShutdown()
func TriggerGracefulShutdown() error {
if beehiveContext.GetContext().Err() != nil {
klog.Errorln("Program is in gracefully shutdown")
klog.Infoln("Program is in gracefully shutdown")
return nil
klog.Errorln("Trigger graceful shutdown!")

klog.Infoln("Trigger graceful shutdown!")
p, err := os.FindProcess(syscall.Getpid())
if err != nil {
klog.Errorf("Failed to find self process: %v", err)
return fmt.Errorf("Failed to find self process: %v", err)
err = p.Signal(os.Interrupt)
if err != nil {
klog.Errorf("Failed to trigger graceful shutdown: %v", err)

if err := p.Signal(os.Interrupt); err != nil {
return fmt.Errorf("Failed to trigger graceful shutdown: %v", err)
return nil

func CreateNamespaceIfNeeded(cli clientset.Interface, ns string) error {
Expand Down

0 comments on commit 46b6e52

Please sign in to comment.