Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
feat: add kube job entropy model (#69)
Browse files Browse the repository at this point in the history
* feat: add kube job entropy model

* feat(dlq): update kubejob entropy model

* feat: add dlq to entropy job mapper (WIP)

* feat(dlq): update mapper

* feat(dlq): updated entropy job config

* feat: added new fields for mapping DlqJob

* feat: added functionality for mapping

* chore(dlq): access app config for mapping

* feat: mapping

* feat(dlq): finalise mapper

* feat: modify swagger

* chore: fix error handling

* chore: generate swagger

* chore: fix mapping env vars

* chore: update swagger generated models

---------

Co-authored-by: Lifosmin Simon <[email protected]>
Co-authored-by: Rahmat Hidayat <[email protected]>
  • Loading branch information
3 people authored Oct 12, 2023
1 parent 39da93c commit 75d6205
Show file tree
Hide file tree
Showing 19 changed files with 478 additions and 47 deletions.
6 changes: 6 additions & 0 deletions cli/server/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type serverConfig struct {
Compass compassConfig `mapstructure:"compass"`
Optimus optimusConfig `mapstructure:"optimus"`
StencilAddr string `mapstructure:"stencil_addr"`
Dlq dlqConfig `mapstructure:"dlq"`
}

type odinConfig struct {
Expand All @@ -49,6 +50,11 @@ type optimusConfig struct {
Addr string `mapstructure:"addr"`
}

type dlqConfig struct {
DlqJobImage string `mapstructure:"dlq_job_image"`
PrometheusHost string `mapstructure:"prometheus_host"`
}

type serveConfig struct {
Host string `mapstructure:"host" default:""`
Port int `mapstructure:"port" default:"8080"`
Expand Down
9 changes: 9 additions & 0 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/goto/dex/internal/server"
"github.com/goto/dex/internal/server/gcs"
"github.com/goto/dex/internal/server/v1/dlq"
"github.com/goto/dex/internal/server/v1/optimus"
"github.com/goto/dex/pkg/logger"
"github.com/goto/dex/pkg/telemetry"
Expand Down Expand Up @@ -100,6 +101,13 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap
if err != nil {
return err
}

dlqConfig := &dlq.DlqJobConfig{
// TODO: map cfg.Dlq\
DlqJobImage: cfg.Dlq.DlqJobImage,
PrometheusHost: cfg.Dlq.PrometheusHost,
}

return server.Serve(ctx, cfg.Service.Addr(), nrApp, zapLog,
shieldv1beta1.NewShieldServiceClient(shieldConn),
&optimus.ClientBuilder{},
Expand All @@ -109,5 +117,6 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap
&gcs.Client{StorageClient: gcsClient},
cfg.Odin.Addr,
cfg.StencilAddr,
dlqConfig,
)
}
6 changes: 6 additions & 0 deletions entropy/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package entropy

type UsageSpec struct {
CPU string `json:"cpu,omitempty" validate:"required"`
Memory string `json:"memory,omitempty" validate:"required"`
}
37 changes: 16 additions & 21 deletions entropy/entropy.go → entropy/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package entropy

import "time"

type Config struct {
type FirehoseConfig struct {
// Stopped flag when set forces the firehose to be stopped on next sync.
Stopped bool `json:"stopped"`

Expand All @@ -27,19 +27,14 @@ type Config struct {
// ResetOffset represents the value to which kafka consumer offset was set to
ResetOffset string `json:"reset_offset,omitempty"`

Limits UsageSpec `json:"limits,omitempty"`
Requests UsageSpec `json:"requests,omitempty"`
Telegraf *Telegraf `json:"telegraf,omitempty"`
ChartValues *ChartValues `json:"chart_values,omitempty"`
InitContainer InitContainer `json:"init_container,omitempty"`
Limits UsageSpec `json:"limits,omitempty"`
Requests UsageSpec `json:"requests,omitempty"`
Telegraf *FirehoseTelegraf `json:"telegraf,omitempty"`
ChartValues *FirehoseChartValues `json:"chart_values,omitempty"`
InitContainer FirehoseInitContainer `json:"init_container,omitempty"`
}

type UsageSpec struct {
CPU string `json:"cpu,omitempty" validate:"required"`
Memory string `json:"memory,omitempty" validate:"required"`
}

type InitContainer struct {
type FirehoseInitContainer struct {
Enabled bool `json:"enabled"`

Args []string `json:"args"`
Expand All @@ -50,31 +45,31 @@ type InitContainer struct {
PullPolicy string `json:"pull_policy"`
}

type Telegraf struct {
Enabled bool `json:"enabled,omitempty"`
Image map[string]any `json:"image,omitempty"`
Config TelegrafConf `json:"config,omitempty"`
type FirehoseTelegraf struct {
Enabled bool `json:"enabled,omitempty"`
Image map[string]any `json:"image,omitempty"`
Config FirehoseTelegrafConf `json:"config,omitempty"`
}

type TelegrafConf struct {
type FirehoseTelegrafConf struct {
Output map[string]any `json:"output"`
AdditionalGlobalTags map[string]string `json:"additional_global_tags"`
}

type ChartValues struct {
type FirehoseChartValues struct {
ImageTag string `json:"image_tag" validate:"required"`
ChartVersion string `json:"chart_version" validate:"required"`
ImagePullPolicy string `json:"image_pull_policy" validate:"required"`
}

type ScaleParams struct {
type FirehoseScaleParams struct {
Replicas int `json:"replicas"`
}

type StartParams struct {
type FirehoseStartParams struct {
StopTime *time.Time `json:"stop_time"`
}

type ResetParams struct {
type FirehoseResetParams struct {
To string `json:"to"`
}
42 changes: 42 additions & 0 deletions entropy/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package entropy

type JobConfig struct {
Replicas int32 `json:"replicas"`
Namespace string `json:"namespace"`
Name string `json:"name,omitempty"`
Containers []JobContainer `json:"containers,omitempty"`
JobLabels map[string]string `json:"job_labels,omitempty"`
Volumes []JobVolume `json:"volumes,omitempty"`
TTLSeconds *int32 `json:"ttl_seconds,omitempty"`
}

type JobVolume struct {
Name string
Kind string // secret or config-map. secret is for gcs/bq credential
}

type JobContainer struct {
Name string `json:"name"`
Image string `json:"image"`
ImagePullPolicy string `json:"image_pull_policy,omitempty"`
Command []string `json:"command,omitempty"`
Args []string `json:"args,omitempty"`
SecretsVolumes []JobSecret `json:"secrets_volumes,omitempty"`
ConfigMapsVolumes []JobConfigMap `json:"config_maps_volumes,omitempty"`
Limits UsageSpec `json:"limits,omitempty"`
Requests UsageSpec `json:"requests,omitempty"`
EnvConfigMaps []string `json:"env_config_maps,omitempty"`
EnvVariables map[string]string `json:"env_variables,omitempty"`
PreStopCmd []string `json:"pre_stop_cmd,omitempty"`
PostStartCmd []string `json:"post_start_cmd,omitempty"`
}

type JobSecret struct {
Name string `json:"name"`
Mount string `json:"mount"`
}

type JobConfigMap struct {
Name string `json:"name"`
Mount string `json:"mount"`
}
6 changes: 6 additions & 0 deletions entropy/kind.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package entropy

const (
ResourceKindFirehose = "firehose"
ResourceKindJob = "job"
)
71 changes: 68 additions & 3 deletions generated/models/dlq_job.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func Serve(ctx context.Context, addr string,
gcsClient gcs.BlobStorageClient,
odinAddr string,
stencilAddr string,
dlqConfig *dlqv1.DlqJobConfig,
) error {
alertSvc := alertsv1.NewService(sirenClient)

Expand All @@ -62,7 +63,7 @@ func Serve(ctx context.Context, addr string,
r.Route("/alerts", alertsv1.AlertRoutes(sirenClient, shieldClient))
r.Route("/optimus", optimusv1.Routes(shieldClient, optimusClient))
r.Route("/projects", projectsv1.Routes(shieldClient))
r.Route("/dlq", dlqv1.Routes(entropyClient, gcsClient))
r.Route("/dlq", dlqv1.Routes(entropyClient, gcsClient, dlqConfig))
r.Route("/firehoses", firehosev1.Routes(entropyClient, shieldClient, alertSvc, compassClient, odinAddr, stencilAddr))
r.Route("/kubernetes", kubernetesv1.Routes(entropyClient))
})
Expand Down
8 changes: 8 additions & 0 deletions internal/server/v1/dlq/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dlq

import "errors"

var (
ErrFirehoseNamespaceNotFound = errors.New("could not find firehose namespace from resource output")
ErrFirehoseNamespaceInvalid = errors.New("invalid firehose namespace from resource output")
)
5 changes: 4 additions & 1 deletion internal/server/v1/dlq/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (h *Handler) ListFirehoseDLQ(w http.ResponseWriter, r *http.Request) {
log.Println(err)
return
}
conf := &entropy.Config{}
conf := &entropy.FirehoseConfig{}
err = utils.ProtoStructToGoVal(resp.GetResource().GetSpec().GetConfigs(), conf)
if err != nil {
utils.WriteErr(w, err)
Expand Down Expand Up @@ -62,6 +62,9 @@ func (*Handler) listDlqJobs(w http.ResponseWriter, _ *http.Request) {
}

func (*Handler) createDlqJob(w http.ResponseWriter, _ *http.Request) {
// transform request body into DlqJob (validation?)
// call service.CreateDLQJob

utils.WriteJSON(w, http.StatusOK, map[string]interface{}{
"dlq_job": nil,
})
Expand Down
14 changes: 7 additions & 7 deletions internal/server/v1/dlq/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func (*testHTTPWriter) WriteHeader(int) {
func TestListTopicDates(t *testing.T) {
eService := &mocks.ResourceServiceClient{}
gClient := &mocks.BlobStorageClient{}
handler := dlq.NewHandler(dlq.NewService(eService, gClient))
handler := dlq.NewHandler(dlq.NewService(eService, gClient, &dlq.DlqJobConfig{}))
httpWriter := &testHTTPWriter{}
httpRequest := &http.Request{}
config := &entropy.Config{
config := &entropy.FirehoseConfig{
Stopped: false,
StopTime: nil,
Replicas: 0,
Expand All @@ -58,7 +58,7 @@ func TestListTopicDates(t *testing.T) {
Requests: entropy.UsageSpec{},
Telegraf: nil,
ChartValues: nil,
InitContainer: entropy.InitContainer{},
InitContainer: entropy.FirehoseInitContainer{},
}
configProto, _ := utils.GoValToProtoStruct(config)
eService.On(
Expand Down Expand Up @@ -115,10 +115,10 @@ func TestListTopicDates(t *testing.T) {
func TestErrorFromGCSClient(t *testing.T) {
eService := &mocks.ResourceServiceClient{}
gClient := &mocks.BlobStorageClient{}
handler := dlq.NewHandler(dlq.NewService(eService, gClient))
handler := dlq.NewHandler(dlq.NewService(eService, gClient, &dlq.DlqJobConfig{}))
httpWriter := &testHTTPWriter{}
httpRequest := &http.Request{}
config := &entropy.Config{
config := &entropy.FirehoseConfig{
Stopped: false,
StopTime: nil,
Replicas: 0,
Expand All @@ -133,7 +133,7 @@ func TestErrorFromGCSClient(t *testing.T) {
Requests: entropy.UsageSpec{},
Telegraf: nil,
ChartValues: nil,
InitContainer: entropy.InitContainer{},
InitContainer: entropy.FirehoseInitContainer{},
}
configProto, _ := utils.GoValToProtoStruct(config)
eService.On(
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestErrorFromGCSClient(t *testing.T) {
func TestErrorFromFirehoseResource(t *testing.T) {
eService := &mocks.ResourceServiceClient{}
gClient := &mocks.BlobStorageClient{}
handler := dlq.NewHandler(dlq.NewService(eService, gClient))
handler := dlq.NewHandler(dlq.NewService(eService, gClient, &dlq.DlqJobConfig{}))
httpWriter := &testHTTPWriter{}
httpRequest := &http.Request{}
eService.On(
Expand Down
Loading

0 comments on commit 75d6205

Please sign in to comment.