Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(restartStuckPod):Restart stuck pods #413

Closed
wants to merge 14 commits into from
9 changes: 9 additions & 0 deletions api/v1/self_healing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type SelfHealSpec struct {
//
// +optional
HeightDriftMitigation *HeightDriftMitigationSpec `json:"heightDriftMitigation"`
// Take action when a pod is stuck.
//
// +optional
StuckPodMitigation *StuckPodMitigationSpec `json:"stuckPodMitigation"`
}

type PVCAutoScaleSpec struct {
Expand Down Expand Up @@ -63,6 +67,11 @@ type HeightDriftMitigationSpec struct {
Threshold uint32 `json:"threshold"`
}

type StuckPodMitigationSpec struct {
// If a pod is stuck in a non-running state for this duration, the pod is deleted.
Threshold uint32 `json:"threshold"`
}

type SelfHealingStatus struct {
// PVC auto-scaling status.
// +optional
Expand Down
20 changes: 20 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5669,6 +5669,17 @@ spec:
- increaseQuantity
- usedSpacePercentage
type: object
stuckPodMitigation:
description: Take action when a pod is stuck.
properties:
threshold:
description: If a pod is stuck in a non-running state for
this duration, the pod is deleted.
format: int32
type: integer
required:
- threshold
type: object
type: object
service:
description: |-
Expand Down
7 changes: 7 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,16 @@ rules:
resources:
- pods
verbs:
- delete
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods/log
verbs:
- get
- apiGroups:
- ""
resources:
Expand Down
37 changes: 30 additions & 7 deletions controllers/selfhealing_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/strangelove-ventures/cosmos-operator/internal/fullnode"
"github.com/strangelove-ventures/cosmos-operator/internal/healthcheck"
"github.com/strangelove-ventures/cosmos-operator/internal/kube"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -42,6 +43,7 @@ type SelfHealingReconciler struct {
driftDetector fullnode.DriftDetection
pvcAutoScaler *fullnode.PVCAutoScaler
recorder record.EventRecorder
stuckDetector *fullnode.StuckPodDetection
}

func NewSelfHealing(
Expand All @@ -61,6 +63,9 @@ func NewSelfHealing(
}
}

//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;delete
//+kubebuilder:rbac:groups="",resources=pods/log,verbs=get

// Reconcile reconciles only the self-healing spec in CosmosFullNode. If changes needed, this controller
// updates a CosmosFullNode status subresource thus triggering another reconcile loop. The CosmosFullNode
// uses the status object to reconcile its state.
Expand All @@ -85,6 +90,7 @@ func (r *SelfHealingReconciler) Reconcile(ctx context.Context, req ctrl.Request)

r.pvcAutoScale(ctx, reporter, crd)
r.mitigateHeightDrift(ctx, reporter, crd)
r.mitigateStuckPods(ctx, reporter, crd)

return ctrl.Result{RequeueAfter: 60 * time.Second}, nil
}
Expand Down Expand Up @@ -120,21 +126,38 @@ func (r *SelfHealingReconciler) mitigateHeightDrift(ctx context.Context, reporte
}

pods := r.driftDetector.LaggingPods(ctx, crd)
deleted := r.DeletePods(pods, "HeightDriftMitigationDeletePod", reporter, ctx)
if deleted > 0 {
msg := fmt.Sprintf("Height lagged behind by %d or more blocks; deleted pod(s)", crd.Spec.SelfHeal.HeightDriftMitigation.Threshold)
reporter.RecordInfo("HeightDriftMitigation", msg)
}
}

func (r *SelfHealingReconciler) mitigateStuckPods(ctx context.Context, reporter kube.Reporter, crd *cosmosv1.CosmosFullNode) {
if crd.Spec.SelfHeal.StuckPodMitigation == nil {
return
}

pods := r.stuckDetector.StuckPods(ctx, crd)
deleted := r.DeletePods(pods, "StuckPodMitigationDeletePod", reporter, ctx)
if deleted > 0 {
msg := fmt.Sprintf("Stuck for %d seconds; deleted pod(s)", crd.Spec.SelfHeal.StuckPodMitigation.Threshold)
reporter.RecordInfo("StuckPodMitigation", msg)
}
}

func (r *SelfHealingReconciler) DeletePods(pods []*v1.Pod, reason string, reporter kube.Reporter, ctx context.Context) int {
var deleted int
for _, pod := range pods {
// CosmosFullNodeController will detect missing pod and re-create it.
if err := r.Delete(ctx, pod); kube.IgnoreNotFound(err) != nil {
reporter.Error(err, "Failed to delete pod", "pod", pod.Name)
reporter.RecordError("HeightDriftMitigationDeletePod", err)
reporter.RecordError(reason, err)
continue
}
reporter.Info("Deleted pod for meeting height drift threshold", "pod", pod.Name)
reporter.Info("Deleted pod for ", reason, " pod:", pod.Name)
deleted++
}
if deleted > 0 {
msg := fmt.Sprintf("Height lagged behind by %d or more blocks; deleted pod(s)", crd.Spec.SelfHeal.HeightDriftMitigation.Threshold)
reporter.RecordInfo("HeightDriftMitigation", msg)
}
return deleted
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
121 changes: 121 additions & 0 deletions internal/fullnode/stuck_detection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package fullnode

import (
"context"
"fmt"
"io"
"strings"
"time"

cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1"
"github.com/strangelove-ventures/cosmos-operator/internal/kube"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type StuckPodDetection struct {
available func(pods []*corev1.Pod, minReady time.Duration, now time.Time) []*corev1.Pod
collector StatusCollector
computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int
}

func NewStuckDetection(collector StatusCollector) StuckPodDetection {
return StuckPodDetection{
available: kube.AvailablePods,
collector: collector,
computeRollout: kube.ComputeRollout,
}
}

// StuckPods returns pods that are stuck on a block height due to a cometbft issue that manifests on sentries using horcrux.
func (d StuckPodDetection) StuckPods(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod {
pods := d.collector.Collect(ctx, client.ObjectKeyFromObject(crd)).Synced().Pods()

for i, pod := range pods {
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}

receivedString := getPodLogsLastLine(clientset, pod)
fmt.Println(receivedString)
podIsStuck := isPodStuck(receivedString)

//MORE TODO HERE
if podIsStuck {
pods = removeElement(pods, i)
}
}
return pods
}

func isPodStuck(receivedString string) bool {
if strings.Contains(receivedString, "SignerListener: Connected") {
timeInLog, err := extractTimeFromLog(receivedString)
if err != nil {
fmt.Println("Error parsing time from log:", err)
return true
Copy link
Contributor

@pharr117 pharr117 Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not want to return true here, if the time parse fails this will return true from isPodStuck, which I assume would kill the pod inadvertantly.

Can we change this function to return (bool, error) so we can track the errors better?

}

currentTime := time.Now().UTC()

logTimeToday := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(),
timeInLog.Hour(), timeInLog.Minute(), timeInLog.Second(), timeInLog.Nanosecond(), currentTime.Location())

timeDiff := currentTime.Sub(logTimeToday)

if timeDiff >= time.Minute {
return true
}
}

return false
}

func extractTimeFromLog(log string) (time.Time, error) {
parts := strings.Fields(log)

const timeLayout = "3:04PM"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to run the time parse on a few layout strings, for example I am looking at an Axelar Testnet sentry node running and it has this time string:

2024-04-17T17:37:18Z

parsedTime, err := time.Parse(timeLayout, parts[0])
if err != nil {
return time.Time{}, err
}

return parsedTime, nil
}

func getPodLogsLastLine(clientset *kubernetes.Clientset, pod *corev1.Pod) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may also want to change this to return (string, error) so that we can track the errors better

podLogOpts := corev1.PodLogOptions{}
logRequest := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts)

logStream, err := logRequest.Stream(context.Background())
if err != nil {
fmt.Printf("Error getting logs for pod %s: %v\n", pod.Name, err)
return ""
}
defer logStream.Close()

logBytes, err := io.ReadAll(logStream)
if err != nil {
fmt.Printf("Error reading logs for pod %s: %v\n", pod.Name, err)
return ""
}

logLines := strings.Split(strings.TrimRight(string(logBytes), "\n"), "\n")
if len(logLines) > 0 {
return logLines[len(logLines)-1]
}
return ""
}

func removeElement(slice []*corev1.Pod, index int) []*corev1.Pod {
return append(slice[:index], slice[index+1:]...)
}
Loading