Skip to content

Commit

Permalink
Feature: Logs and Describe overlay (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
dzsak authored Jan 13, 2024
1 parent e5126fe commit 56b48eb
Show file tree
Hide file tree
Showing 17 changed files with 551 additions and 129 deletions.
5 changes: 4 additions & 1 deletion cmd/capacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/gimlet-io/capacitor/pkg/api"
"github.com/gimlet-io/capacitor/pkg/controllers"
"github.com/gimlet-io/capacitor/pkg/logs"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -38,6 +39,8 @@ func main() {
panic(err.Error())
}

runningLogStreams := logs.NewRunningLogStreams()

stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -49,7 +52,7 @@ func main() {
kustomizationController := controllers.KustomizeController(dynamicClient, clientHub)
go kustomizationController.Run(1, stopCh)

r := api.SetupRouter(client, dynamicClient, config, clientHub)
r := api.SetupRouter(client, dynamicClient, config, clientHub, runningLogStreams)
go func() {
err = http.ListenAndServe(":9000", r)
if err != nil {
Expand Down
56 changes: 56 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"net/http"

"github.com/gimlet-io/capacitor/pkg/flux"
"github.com/gimlet-io/capacitor/pkg/logs"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
apps_v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -96,3 +99,56 @@ func describeSecret(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(output))
}

func describeDeployment(w http.ResponseWriter, r *http.Request) {
config, _ := r.Context().Value("config").(*rest.Config)
namespace := r.URL.Query().Get("namespace")
name := r.URL.Query().Get("name")

describer, ok := describe.DescriberFor(schema.GroupKind{Group: apps_v1.GroupName, Kind: "Deployment"}, config)
if !ok {
logrus.Errorf("could not get describer for deployment")
return
}

output, err := describer.Describe(namespace, name, describe.DescriberSettings{ShowEvents: true, ChunkSize: 500})
if err != nil {
logrus.Errorf("could not get output of describer: %s", err)
return
}

w.WriteHeader(http.StatusOK)
w.Write([]byte(output))
}

func streamLogs(w http.ResponseWriter, r *http.Request) {
namespace := r.URL.Query().Get("namespace")
svc := r.URL.Query().Get("serviceName")
runningLogStreams, _ := r.Context().Value("runningLogStreams").(*logs.RunningLogStreams)
dynamicClient, _ := r.Context().Value("dynamicClient").(*dynamic.DynamicClient)
client, _ := r.Context().Value("client").(*kubernetes.Clientset)
clientHub, _ := r.Context().Value("clientHub").(*streaming.ClientHub)

go logs.Logs(
client,
dynamicClient,
namespace,
svc,
clientHub,
runningLogStreams,
)

w.WriteHeader(http.StatusOK)
w.Write([]byte("{}"))
}

func stopLogs(w http.ResponseWriter, r *http.Request) {
namespace := r.URL.Query().Get("namespace")
svc := r.URL.Query().Get("serviceName")
runningLogStreams, _ := r.Context().Value("runningLogStreams").(*logs.RunningLogStreams)

runningLogStreams.Stop(namespace, svc)

w.WriteHeader(http.StatusOK)
w.Write([]byte("{}"))
}
7 changes: 7 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"
"strings"

"github.com/gimlet-io/capacitor/pkg/logs"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
Expand All @@ -17,11 +18,14 @@ func SetupRouter(
dynamicClient *dynamic.DynamicClient,
config *rest.Config,
clientHub *streaming.ClientHub,
runningLogStreams *logs.RunningLogStreams,
) *chi.Mux {
r := chi.NewRouter()
r.Use(middleware.WithValue("dynamicClient", dynamicClient))
r.Use(middleware.WithValue("client", client))
r.Use(middleware.WithValue("config", config))
r.Use(middleware.WithValue("runningLogStreams", runningLogStreams))
r.Use(middleware.WithValue("clientHub", clientHub))

r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
Expand All @@ -30,6 +34,9 @@ func SetupRouter(
r.Get("/api/services", servicesHandler)
r.Get("/api/describeConfigmap", describeConfigmap)
r.Get("/api/describeSecret", describeSecret)
r.Get("/api/describeDeployment", describeDeployment)
r.Get("/api/logs", streamLogs)
r.Get("/api/stopLogs", stopLogs)
r.Get("/ws/", func(w http.ResponseWriter, r *http.Request) {
streaming.ServeWs(clientHub, w, r)
})
Expand Down
47 changes: 3 additions & 44 deletions pkg/flux/flux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

kustomizationv1 "github.com/fluxcd/kustomize-controller/api/v1"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
"github.com/gimlet-io/capacitor/pkg/k8s"
apps_v1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -64,7 +65,7 @@ func Services(c *kubernetes.Clientset, dc *dynamic.DynamicClient) ([]Service, er

for idx, service := range services {
for _, deployment := range deploymentsInNamespaces[service.Svc.Namespace] {
if selectorsMatch(deployment.Spec.Selector.MatchLabels, service.Svc.Spec.Selector) {
if k8s.SelectorsMatch(deployment.Spec.Selector.MatchLabels, service.Svc.Spec.Selector) {
services[idx].Deployment = &deployment
}
}
Expand All @@ -77,7 +78,7 @@ func Services(c *kubernetes.Clientset, dc *dynamic.DynamicClient) ([]Service, er
for idx, service := range services {
services[idx].Pods = []v1.Pod{}
for _, pod := range pods.Items {
if labelsMatchSelectors(pod.ObjectMeta.Labels, service.Svc.Spec.Selector) {
if k8s.LabelsMatchSelectors(pod.ObjectMeta.Labels, service.Svc.Spec.Selector) {
services[idx].Pods = append(services[idx].Pods, pod)
}
}
Expand All @@ -86,48 +87,6 @@ func Services(c *kubernetes.Clientset, dc *dynamic.DynamicClient) ([]Service, er
return services, nil
}

func selectorsMatch(first map[string]string, second map[string]string) bool {
if len(first) != len(second) {
return false
}

for k, v := range first {
if v2, ok := second[k]; ok {
if v != v2 {
return false
}
} else {
return false
}
}

for k2, v2 := range second {
if v, ok := first[k2]; ok {
if v2 != v {
return false
}
} else {
return false
}
}

return true
}

func labelsMatchSelectors(labels map[string]string, selectors map[string]string) bool {
for k2, v2 := range selectors {
if v, ok := labels[k2]; ok {
if v2 != v {
return false
}
} else {
return false
}
}

return true
}

func inventory(dc *dynamic.DynamicClient) ([]object.ObjMetadata, error) {
inventory := []object.ObjMetadata{}

Expand Down
43 changes: 43 additions & 0 deletions pkg/k8s/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package k8s

func SelectorsMatch(first map[string]string, second map[string]string) bool {
if len(first) != len(second) {
return false
}

for k, v := range first {
if v2, ok := second[k]; ok {
if v != v2 {
return false
}
} else {
return false
}
}

for k2, v2 := range second {
if v, ok := first[k2]; ok {
if v2 != v {
return false
}
} else {
return false
}
}

return true
}

func LabelsMatchSelectors(labels map[string]string, selectors map[string]string) bool {
for k2, v2 := range selectors {
if v, ok := labels[k2]; ok {
if v2 != v {
return false
}
} else {
return false
}
}

return true
}
140 changes: 140 additions & 0 deletions pkg/logs/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package logs

import (
"bufio"
"context"
"encoding/json"
"strings"

"github.com/gimlet-io/capacitor/pkg/k8s"
"github.com/gimlet-io/capacitor/pkg/streaming"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
)

func Logs(
client *kubernetes.Clientset,
dynamicClient *dynamic.DynamicClient,
namespace string,
serviceName string,
clientHub *streaming.ClientHub,
runningLogStreams *RunningLogStreams,
) {
pods, err := pods(client, namespace, serviceName)
if err != nil {
logrus.Warnf("could not get pods to stream logs: %v", err)
return
}

for _, pod := range pods {
containers := podContainers(pod.Spec)
for _, container := range containers {
go streamLogs(client, namespace, pod.Name, container.Name, serviceName, clientHub, runningLogStreams)
}
}
}

func pods(client *kubernetes.Clientset, namespace string, serviceName string) ([]v1.Pod, error) {
svc, err := client.CoreV1().Services(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
return nil, err
}

podsInNamespace, err := client.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}

pods := []v1.Pod{}
for _, pod := range podsInNamespace.Items {
if k8s.LabelsMatchSelectors(pod.ObjectMeta.Labels, svc.Spec.Selector) {
pods = append(pods, pod)
}
}

return pods, nil
}

func podContainers(podSpec v1.PodSpec) (containers []v1.Container) {
containers = append(containers, podSpec.InitContainers...)
containers = append(containers, podSpec.Containers...)

return containers
}

func streamLogs(
client *kubernetes.Clientset,
namespace string,
pod string,
containerName string,
serviceName string,
clientHub *streaming.ClientHub,
runningLogStreams *RunningLogStreams,
) {
count := int64(100)
podLogOpts := v1.PodLogOptions{
Container: containerName,
TailLines: &count,
Follow: true,
Timestamps: true,
}
logsReq := client.CoreV1().Pods(namespace).GetLogs(pod, &podLogOpts)

podLogs, err := logsReq.Stream(context.Background())
if err != nil {
logrus.Errorf("could not stream pod logs: %v", err)
return
}
defer podLogs.Close()

stopCh := runningLogStreams.register(namespace, serviceName)

go func() {
<-stopCh
podLogs.Close()
}()

sc := bufio.NewScanner(podLogs)
for sc.Scan() {
text := sc.Text()
chunks := chunks(text, 1000)
for _, chunk := range chunks {
timestamp, message := parseMessage(chunk)
payload := streaming.PodLogMessage{
Timestamp: timestamp,
Container: containerName,
Pod: pod,
Svc: namespace + "/" + serviceName,
Message: message,
}

msgBytes, err := json.Marshal(streaming.Envelope{
Type: streaming.POD_LOGS_RECEIVED,
Payload: payload,
})

if err != nil {
logrus.Error("cannot serialize message", err)
}

clientHub.Broadcast <- msgBytes
}
}
}

func chunks(str string, size int) []string {
if len(str) <= size {
return []string{str}
}
return append([]string{string(str[0:size])}, chunks(str[size:], size)...)
}

func parseMessage(chunk string) (string, string) {
parts := strings.SplitN(chunk, " ", 2)

return parts[0], parts[1]
}
Loading

0 comments on commit 56b48eb

Please sign in to comment.