Skip to content

Commit

Permalink
Merge pull request #138 from equinor/main
Browse files Browse the repository at this point in the history
fix-create-job-delay (#137)
  • Loading branch information
satr authored Aug 30, 2024
2 parents 8e41eb7 + 1134b28 commit 070bd56
Show file tree
Hide file tree
Showing 36 changed files with 746 additions and 819 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ mocks: bootstrap
mockgen -source ./api/v2/handler.go -destination ./api/v2/mock/handler_mock.go -package mock
mockgen -source ./api/v1/jobs/job_handler.go -destination ./api/v1/jobs/mock/job_mock.go -package mock
mockgen -source ./api/v1/batches/batch_handler.go -destination ./api/v1/batches/mock/batch_mock.go -package mock
mockgen -source ./models/notifications/notifier.go -destination ./models/notifications/notifier_mock.go -package notifications
mockgen -source ./pkg/notifications/notifier.go -destination ./pkg/notifications/notifier_mock.go -package notifications
mockgen -source ./pkg/batch/history.go -destination ./pkg/batch/history_mock.go -package batch

.PHONY: generate
generate: swagger mocks
Expand Down
19 changes: 10 additions & 9 deletions api/v1/batches/batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package batchesv1
import (
"context"

"github.com/equinor/radix-common/utils"
"github.com/equinor/radix-common/utils/slice"
apiv1 "github.com/equinor/radix-job-scheduler/api/v1"
apiv2 "github.com/equinor/radix-job-scheduler/api/v2"
"github.com/equinor/radix-job-scheduler/internal"
"github.com/equinor/radix-job-scheduler/models"
"github.com/equinor/radix-job-scheduler/models/common"
modelsv1 "github.com/equinor/radix-job-scheduler/models/v1"
modelsv2 "github.com/equinor/radix-job-scheduler/models/v2"
"github.com/equinor/radix-job-scheduler/pkg/batch"
"github.com/equinor/radix-job-scheduler/utils/radix/jobs"
"github.com/equinor/radix-operator/pkg/apis/kube"
radixv1 "github.com/equinor/radix-operator/pkg/apis/radix/v1"
Expand All @@ -32,8 +35,6 @@ type BatchHandler interface {
CreateBatch(ctx context.Context, batchScheduleDescription *common.BatchScheduleDescription) (*modelsv1.BatchStatus, error)
// CopyBatch creates a copy of an existing batch with deploymentName as value for radixDeploymentJobRef.name
CopyBatch(ctx context.Context, batchName string, deploymentName string) (*modelsv1.BatchStatus, error)
// MaintainHistoryLimit Delete outdated batches
MaintainHistoryLimit(ctx context.Context) error
// DeleteBatch Delete a batch
DeleteBatch(ctx context.Context, batchName string) error
// StopBatch Stop a batch
Expand Down Expand Up @@ -130,11 +131,11 @@ func (handler *batchHandler) CopyBatch(ctx context.Context, batchName string, de
func (handler *batchHandler) DeleteBatch(ctx context.Context, batchName string) error {
logger := log.Ctx(ctx)
logger.Debug().Msgf("delete batch %s for namespace: %s", batchName, handler.common.Env.RadixDeploymentNamespace)
err := handler.common.HandlerApiV2.DeleteRadixBatch(ctx, batchName)
err := batch.DeleteRadixBatchByName(ctx, handler.common.Kube.RadixClient(), handler.common.Env.RadixDeploymentNamespace, batchName)
if err != nil {
return err
}
return handler.common.HandlerApiV2.GarbageCollectPayloadSecrets(ctx)
return internal.GarbageCollectPayloadSecrets(ctx, handler.common.Kube, handler.common.Env.RadixDeploymentNamespace, handler.common.Env.RadixComponentName)
}

// StopBatch Stop a batch
Expand All @@ -151,11 +152,6 @@ func (handler *batchHandler) StopBatchJob(ctx context.Context, batchName string,
return apiv1.StopJob(ctx, handler.common.HandlerApiV2, jobName)
}

// MaintainHistoryLimit Delete outdated batches
func (handler *batchHandler) MaintainHistoryLimit(ctx context.Context) error {
return handler.common.HandlerApiV2.MaintainHistoryLimit(ctx)
}

func setBatchJobEventMessages(radixBatchStatus *modelsv1.BatchStatus, batchJobPodsMap map[string]corev1.Pod, eventMessageForPods map[string]string) {
for i := 0; i < len(radixBatchStatus.JobStatuses); i++ {
apiv1.SetBatchJobEventMessageToBatchJobStatus(&radixBatchStatus.JobStatuses[i], batchJobPodsMap, eventMessageForPods)
Expand All @@ -166,6 +162,7 @@ func (handler *batchHandler) getBatchStatusFromRadixBatch(radixBatch *modelsv2.R
return &modelsv1.BatchStatus{
JobStatus: modelsv1.JobStatus{
Name: radixBatch.Name,
BatchId: getBatchId(radixBatch),
Created: radixBatch.CreationTime,
Started: radixBatch.Started,
Ended: radixBatch.Ended,
Expand All @@ -192,3 +189,7 @@ func (handler *batchHandler) getBatchStatus(radixBatch *modelsv2.RadixBatch) rad
})
return jobs.GetStatusFromStatusRules(jobStatusPhases, handler.common.RadixDeployJobComponent, radixBatch.Status)
}

func getBatchId(radixBatch *modelsv2.RadixBatch) string {
return utils.TernaryString(radixBatch.BatchType == string(kube.RadixBatchTypeJob), "", radixBatch.BatchId)
}
14 changes: 0 additions & 14 deletions api/v1/batches/mock/batch_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions api/v1/controllers/batches/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ func (controller *batchController) CreateBatch(c *gin.Context) {
return
}
logger.Info().Msgf("Batch %s has been created", batchState.Name)
err = controller.handler.MaintainHistoryLimit(c.Request.Context())
if err != nil {
logger.Warn().Err(err).Msg("failed to maintain batch history")
}

c.JSON(http.StatusOK, batchState)
}
Expand Down
74 changes: 0 additions & 74 deletions api/v1/controllers/batches/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,6 @@ func TestCreateBatch(t *testing.T) {
CreateBatch(test.RequestContextMatcher{}, &batchScheduleDescription).
Return(&createdBatch, nil).
Times(1)
batchHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Return(nil).
Times(1)
controllerTestUtils := setupTest(batchHandler)
responseChannel := controllerTestUtils.ExecuteRequestWithBody(ctx, http.MethodPost, "/api/v1/batches", nil)
response := <-responseChannel
Expand Down Expand Up @@ -283,58 +278,6 @@ func TestCreateBatch(t *testing.T) {
CreateBatch(test.RequestContextMatcher{}, &batchScheduleDescription).
Return(&createdBatch, nil).
Times(1)
batchHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Return(nil).
Times(1)
controllerTestUtils := setupTest(batchHandler)
responseChannel := controllerTestUtils.ExecuteRequestWithBody(ctx, http.MethodPost, "/api/v1/batches", batchScheduleDescription)
response := <-responseChannel
assert.NotNil(t, response)

if response != nil {
assert.Equal(t, http.StatusOK, response.StatusCode)
var returnedBatch modelsV1.BatchStatus
err := test.GetResponseBody(response, &returnedBatch)
require.NoError(t, err)
assert.Equal(t, createdBatch.Name, returnedBatch.Name)
assert.Equal(t, createdBatch.Started, returnedBatch.Started)
assert.Equal(t, createdBatch.Ended, returnedBatch.Ended)
assert.Equal(t, createdBatch.Status, returnedBatch.Status)
}
})

t.Run("valid payload body - error from MaintainHistoryLimit should not fail request", func(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
batchScheduleDescription := models.BatchScheduleDescription{
JobScheduleDescriptions: []models.JobScheduleDescription{
{Payload: "a_payload"},
},
}
createdBatch := modelsV1.BatchStatus{
JobStatus: modelsV1.JobStatus{
Name: "newbatch",
Started: commonUtils.FormatTimestamp(time.Now()),
Ended: commonUtils.FormatTimestamp(time.Now().Add(1 * time.Minute)),
Status: "batchstatus",
},
BatchType: string(kube.RadixBatchTypeBatch),
}
batchHandler := mock.NewMockBatchHandler(ctrl)
ctx := context.Background()
batchHandler.
EXPECT().
CreateBatch(test.RequestContextMatcher{}, &batchScheduleDescription).
Return(&createdBatch, nil).
Times(1)
batchHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Return(errors.New("an error")).
Times(1)
controllerTestUtils := setupTest(batchHandler)
responseChannel := controllerTestUtils.ExecuteRequestWithBody(ctx, http.MethodPost, "/api/v1/batches", batchScheduleDescription)
response := <-responseChannel
Expand Down Expand Up @@ -363,10 +306,6 @@ func TestCreateBatch(t *testing.T) {
EXPECT().
CreateBatch(test.RequestContextMatcher{}, gomock.Any()).
Times(0)
batchHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Times(0)
controllerTestUtils := setupTest(batchHandler)
responseChannel := controllerTestUtils.ExecuteRequestWithBody(ctx, http.MethodPost, "/api/v1/batches", struct{ JobScheduleDescriptions interface{} }{JobScheduleDescriptions: struct{}{}})
response := <-responseChannel
Expand Down Expand Up @@ -397,10 +336,6 @@ func TestCreateBatch(t *testing.T) {
CreateBatch(test.RequestContextMatcher{}, &batchScheduleDescription).
Return(nil, apiErrors.NewNotFound(anyKind, anyName)).
Times(1)
batchHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Times(0)
controllerTestUtils := setupTest(batchHandler)
responseChannel := controllerTestUtils.ExecuteRequest(ctx, http.MethodPost, "/api/v1/batches")
response := <-responseChannel
Expand Down Expand Up @@ -430,10 +365,6 @@ func TestCreateBatch(t *testing.T) {
CreateBatch(test.RequestContextMatcher{}, &batchScheduleDescription).
Return(nil, errors.New("any error")).
Times(1)
batchHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Times(0)
controllerTestUtils := setupTest(batchHandler)
responseChannel := controllerTestUtils.ExecuteRequest(ctx, http.MethodPost, "/api/v1/batches")
response := <-responseChannel
Expand Down Expand Up @@ -639,11 +570,6 @@ func TestStopBatchJob(t *testing.T) {
StopBatchJob(test.RequestContextMatcher{}, batchName, jobName).
Return(nil).
Times(1)
batchHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Return(nil).
AnyTimes()
controllerTestUtils := setupTest(batchHandler)
responseChannel := controllerTestUtils.ExecuteRequest(ctx, http.MethodPost, fmt.Sprintf("/api/v1/batches/%s/jobs/%s/stop", batchName, jobName))
response := <-responseChannel
Expand Down
4 changes: 0 additions & 4 deletions api/v1/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ func (controller *jobController) CreateJob(c *gin.Context) {
}

logger.Info().Msgf("Job %s has been created", jobState.Name)
err = controller.handler.MaintainHistoryLimit(c.Request.Context())
if err != nil {
logger.Warn().Err(err).Msg("failed to maintain job history")
}

c.JSON(http.StatusOK, jobState)
}
Expand Down
65 changes: 0 additions & 65 deletions api/v1/controllers/jobs/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,6 @@ func TestCreateJob(t *testing.T) {
CreateJob(test.RequestContextMatcher{}, &jobScheduleDescription).
Return(&createdJob, nil).
Times(1)
jobHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Return(nil).
Times(1)
controllerTestUtils := setupTest(jobHandler)
responseChannel := controllerTestUtils.ExecuteRequestWithBody(ctx, http.MethodPost, "/api/v1/jobs", nil)
response := <-responseChannel
Expand Down Expand Up @@ -269,54 +264,6 @@ func TestCreateJob(t *testing.T) {
CreateJob(test.RequestContextMatcher{}, &jobScheduleDescription).
Return(&createdJob, nil).
Times(1)
jobHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Return(nil).
Times(1)
controllerTestUtils := setupTest(jobHandler)
responseChannel := controllerTestUtils.ExecuteRequestWithBody(ctx, http.MethodPost, "/api/v1/jobs", jobScheduleDescription)
response := <-responseChannel
assert.NotNil(t, response)

if response != nil {
assert.Equal(t, http.StatusOK, response.StatusCode)
var returnedJob modelsV1.JobStatus
err := test.GetResponseBody(response, &returnedJob)
require.NoError(t, err)
assert.Equal(t, createdJob.Name, returnedJob.Name)
assert.Equal(t, "", returnedJob.BatchName)
assert.Equal(t, createdJob.Started, returnedJob.Started)
assert.Equal(t, createdJob.Ended, returnedJob.Ended)
assert.Equal(t, createdJob.Status, returnedJob.Status)
}
})

t.Run("valid payload body - error from MaintainHistoryLimit should not fail request", func(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
jobScheduleDescription := models.JobScheduleDescription{
Payload: "a_payload",
}
createdJob := modelsV1.JobStatus{
Name: "newjob",
Started: utils.FormatTimestamp(time.Now()),
Ended: utils.FormatTimestamp(time.Now().Add(1 * time.Minute)),
Status: "jobstatus",
}
jobHandler := mock.NewMockJobHandler(ctrl)
ctx := context.Background()
jobHandler.
EXPECT().
CreateJob(test.RequestContextMatcher{}, &jobScheduleDescription).
Return(&createdJob, nil).
Times(1)
jobHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Return(errors.New("an error")).
Times(1)
controllerTestUtils := setupTest(jobHandler)
responseChannel := controllerTestUtils.ExecuteRequestWithBody(ctx, http.MethodPost, "/api/v1/jobs", jobScheduleDescription)
response := <-responseChannel
Expand Down Expand Up @@ -346,10 +293,6 @@ func TestCreateJob(t *testing.T) {
EXPECT().
CreateJob(test.RequestContextMatcher{}, gomock.Any()).
Times(0)
jobHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Times(0)
controllerTestUtils := setupTest(jobHandler)
responseChannel := controllerTestUtils.ExecuteRequestWithBody(ctx, http.MethodPost, "/api/v1/jobs", struct{ Payload interface{} }{Payload: struct{}{}})
response := <-responseChannel
Expand Down Expand Up @@ -380,10 +323,6 @@ func TestCreateJob(t *testing.T) {
CreateJob(test.RequestContextMatcher{}, &jobScheduleDescription).
Return(nil, apiErrors.NewNotFound(anyKind, anyName)).
Times(1)
jobHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Times(0)
controllerTestUtils := setupTest(jobHandler)
responseChannel := controllerTestUtils.ExecuteRequest(ctx, http.MethodPost, "/api/v1/jobs")
response := <-responseChannel
Expand Down Expand Up @@ -413,10 +352,6 @@ func TestCreateJob(t *testing.T) {
CreateJob(test.RequestContextMatcher{}, &jobScheduleDescription).
Return(nil, errors.New("any error")).
Times(1)
jobHandler.
EXPECT().
MaintainHistoryLimit(test.RequestContextMatcher{}).
Times(0)
controllerTestUtils := setupTest(jobHandler)
responseChannel := controllerTestUtils.ExecuteRequest(ctx, http.MethodPost, "/api/v1/jobs")
response := <-responseChannel
Expand Down
8 changes: 6 additions & 2 deletions api/v1/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (

"github.com/equinor/radix-common/utils/slice"
modelsv1 "github.com/equinor/radix-job-scheduler/models/v1"
defaultsv1 "github.com/equinor/radix-job-scheduler/models/v1/defaults"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// k8sJobNameLabel A label that k8s automatically adds to a Pod created by a Job
k8sJobNameLabel = "job-name"
)

// GetLastEventMessageForPods returns the last event message for pods
func (handler *Handler) GetLastEventMessageForPods(ctx context.Context, pods []corev1.Pod) (map[string]string, error) {
podNamesMap := slice.Reduce(pods, make(map[string]struct{}), func(acc map[string]struct{}, pod corev1.Pod) map[string]struct{} {
Expand Down Expand Up @@ -59,7 +63,7 @@ func (handler *Handler) GetRadixBatchJobMessagesAndPodMaps(ctx context.Context,
return nil, nil, err
}
batchJobPodsMap := slice.Reduce(radixBatchesPods, make(map[string]corev1.Pod), func(acc map[string]corev1.Pod, pod corev1.Pod) map[string]corev1.Pod {
if batchJobName, ok := pod.GetLabels()[defaultsv1.K8sJobNameLabel]; ok {
if batchJobName, ok := pod.GetLabels()[k8sJobNameLabel]; ok {
acc[batchJobName] = pod
}
return acc
Expand Down
6 changes: 3 additions & 3 deletions api/v1/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func GetJobStatusFromRadixBatchJobsStatus(batchName string, jobStatus modelsv2.R
func GetJobStatusFromRadixBatchJobsStatuses(radixBatches ...modelsv2.RadixBatch) []modelsv1.JobStatus {
jobStatuses := make([]modelsv1.JobStatus, 0, len(radixBatches))
for _, radixBatch := range radixBatches {
jobStatusBatchName := getJobStatusBatchName(&radixBatch)
jobStatusBatchName := getBatchName(&radixBatch)
for _, jobStatus := range radixBatch.JobStatuses {
jobStatuses = append(jobStatuses, GetJobStatusFromRadixBatchJobsStatus(jobStatusBatchName, jobStatus))
}
Expand All @@ -62,7 +62,7 @@ func GetBatchJob(ctx context.Context, handlerApiV2 apiv2.Handler, batchName, job
if err != nil {
return nil, err
}
jobStatusBatchName := getJobStatusBatchName(radixBatch)
jobStatusBatchName := getBatchName(radixBatch)
for _, jobStatus := range radixBatch.JobStatuses {
if !strings.EqualFold(jobStatus.Name, jobName) {
continue
Expand Down Expand Up @@ -94,6 +94,6 @@ func GetPodStatus(podStatuses []modelsv2.RadixBatchJobPodStatus) []modelsv1.PodS
})
}

func getJobStatusBatchName(radixBatch *modelsv2.RadixBatch) string {
func getBatchName(radixBatch *modelsv2.RadixBatch) string {
return utils.TernaryString(radixBatch.BatchType == string(kube.RadixBatchTypeJob), "", radixBatch.Name)
}
Loading

0 comments on commit 070bd56

Please sign in to comment.