Skip to content

Commit

Permalink
filter webhook by job component name (#109)
Browse files Browse the repository at this point in the history
* filter webhook by job component name

* update tests

* update module versions

* remove deprecated functions

* remove statik.go from .gitignore

* rename field

* use RadixDeploymentJobRef for filtering job component name

* remove nil check
  • Loading branch information
nilsgstrabo authored Dec 29, 2023
1 parent 5d65a4f commit 74add41
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 82 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ __debug_bin
# Dependency directories (remove the comment below to include it)
# vendor/

swaggerui/statik.go
swaggerui_src/swagger.json

.idea
6 changes: 1 addition & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,7 @@ func runApiServer(kubeUtil *kube.Kube, env *models.Env) {
}

func getRadixBatchWatcher(kubeUtil *kube.Kube, radixDeployJobComponent *radixv1.RadixDeployJobComponent, env *models.Env) (*notifications.Watcher, error) {
notifier, err := notifications.NewWebhookNotifier(radixDeployJobComponent.Notifications)
if err != nil {
return notifications.NullRadixBatchWatcher(), err
}

notifier := notifications.NewWebhookNotifier(radixDeployJobComponent)
log.Infof("Created notifier: %s", notifier.String())
if !notifier.Enabled() {
log.Infoln("Notifiers are not enabled, RadixBatch event and changes watcher is skipped.")
Expand Down
34 changes: 17 additions & 17 deletions models/notifications/webhook_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,56 +4,56 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/equinor/radix-job-scheduler/models/v1/events"
"net/http"

"github.com/equinor/radix-job-scheduler/models/v1/events"

radixv1 "github.com/equinor/radix-operator/pkg/apis/radix/v1"
)

type webhookNotifier struct {
enabled bool
webhook string
webhookURL string
jobComponentName string
}

func NewWebhookNotifier(notifications *radixv1.Notifications) (Notifier, error) {
notifier := webhookNotifier{}
if notifications != nil && webhookIsNotEmpty(notifications.Webhook) {
notifier.enabled = true
notifier.webhook = *notifications.Webhook
func NewWebhookNotifier(jobComponent *radixv1.RadixDeployJobComponent) Notifier {
notifier := webhookNotifier{jobComponentName: jobComponent.Name}
if jobComponent.Notifications != nil && webhookIsNotEmpty(jobComponent.Notifications.Webhook) {
notifier.webhookURL = *jobComponent.Notifications.Webhook
}
return &notifier, nil
return &notifier
}

func (notifier *webhookNotifier) Enabled() bool {
return notifier.enabled
return len(notifier.webhookURL) > 0
}

func (notifier *webhookNotifier) String() string {
if notifier.enabled {
return fmt.Sprintf("Webhook notifier is enabled. Webhook: %s", notifier.webhook)
if notifier.Enabled() {
return fmt.Sprintf("Webhook notifier is enabled. Webhook: %s", notifier.webhookURL)
}
return "Webhook notifier is disabled"
}

func (notifier *webhookNotifier) Notify(event events.Event, newRadixBatch *radixv1.RadixBatch, updatedJobStatuses []radixv1.RadixBatchJobStatus, errChan chan error) (done chan struct{}) {
func (notifier *webhookNotifier) Notify(event events.Event, radixBatch *radixv1.RadixBatch, updatedJobStatuses []radixv1.RadixBatchJobStatus, errChan chan error) (done chan struct{}) {
done = make(chan struct{})
go func() {
if !notifier.Enabled() || len(notifier.webhook) == 0 {
if !notifier.Enabled() || len(notifier.webhookURL) == 0 || radixBatch.Spec.RadixDeploymentJobRef.Job != notifier.jobComponentName {
done <- struct{}{}
close(done)
return
}
// RadixBatch status and only changed job statuses
batchStatus := getRadixBatchEventFromRadixBatch(event, newRadixBatch, updatedJobStatuses)
batchStatus := getRadixBatchEventFromRadixBatch(event, radixBatch, updatedJobStatuses)
statusesJson, err := json.Marshal(batchStatus)
if err != nil {
errChan <- fmt.Errorf("failed serialize updated JobStatuses %v", err)
return
}
buf := bytes.NewReader(statusesJson)
_, err = http.Post(notifier.webhook, "application/json", buf)
_, err = http.Post(notifier.webhookURL, "application/json", buf)
if err != nil {
errChan <- fmt.Errorf("failed to notify on RadixBatch object create or change %s: %v", newRadixBatch.GetName(), err)
errChan <- fmt.Errorf("failed to notify on RadixBatch object create or change %s: %v", radixBatch.GetName(), err)
return
}
done <- struct{}{}
Expand Down
142 changes: 83 additions & 59 deletions models/notifications/webhook_notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package notifications
import (
"encoding/json"
"fmt"
"github.com/equinor/radix-job-scheduler/models/v1/events"
"io"
"net/http"
"testing"
"time"

"github.com/equinor/radix-job-scheduler/models/v1/events"

commonUtils "github.com/equinor/radix-common/utils"
"github.com/equinor/radix-common/utils/pointers"
modelsv1 "github.com/equinor/radix-job-scheduler/models/v1"
Expand All @@ -22,49 +23,42 @@ import (
func TestNewWebhookNotifier(t *testing.T) {
tests := []struct {
name string
notifications *radixv1.Notifications
jobComponent *radixv1.RadixDeployJobComponent
expectedEnabled bool
expectedWebhook string
expectedErr bool
}{
{
name: "No notification",
notifications: nil,
expectedEnabled: false, expectedWebhook: "", expectedErr: false,
jobComponent: &radixv1.RadixDeployJobComponent{},
expectedEnabled: false,
expectedWebhook: "",
},
{
name: "Empty notification",
notifications: &radixv1.Notifications{},
expectedEnabled: false, expectedWebhook: "", expectedErr: false,
jobComponent: &radixv1.RadixDeployJobComponent{Notifications: &radixv1.Notifications{}},
expectedEnabled: false,
expectedWebhook: "",
},
{
name: "Empty webhook in the notification",
notifications: &radixv1.Notifications{Webhook: pointers.Ptr("")},
expectedEnabled: false, expectedWebhook: "", expectedErr: false,
jobComponent: &radixv1.RadixDeployJobComponent{Notifications: &radixv1.Notifications{Webhook: pointers.Ptr("")}},
expectedEnabled: false,
expectedWebhook: "",
},
{
name: "Set webhook in the notification",
notifications: &radixv1.Notifications{Webhook: pointers.Ptr("http://job1:8080")},
expectedEnabled: true, expectedWebhook: "http://job1:8080", expectedErr: false,
jobComponent: &radixv1.RadixDeployJobComponent{Notifications: &radixv1.Notifications{Webhook: pointers.Ptr("http://job1:8080")}},
expectedEnabled: true,
expectedWebhook: "http://job1:8080",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotNotifier, err := NewWebhookNotifier(tt.notifications)
if err != nil && !tt.expectedErr {
t.Errorf("not expected error %v", err)
return
} else if err == nil && tt.expectedErr {
t.Errorf("missing expected error")
return
}
if err != nil {
return
}
gotNotifier := NewWebhookNotifier(tt.jobComponent)
notifier := gotNotifier.(*webhookNotifier)
assert.Equal(t, tt.expectedEnabled, notifier.enabled)
assert.Equal(t, tt.expectedWebhook, notifier.webhook)
assert.Equal(t, tt.expectedEnabled, notifier.Enabled())
assert.Equal(t, tt.expectedWebhook, notifier.webhookURL)
})
}
}
Expand All @@ -83,14 +77,14 @@ func (t *testTransport) RoundTrip(request *http.Request) (*http.Response, error)

func Test_webhookNotifier_Notify(t *testing.T) {
type fields struct {
enabled bool
webhook string
jobComponentName string
webhookURL string
expectedRequest bool
expectedError bool
expectedBatchNameInJobs string
}
type args struct {
newRadixBatch *radixv1.RadixBatch
radixBatch *radixv1.RadixBatch
updatedJobStatuses []radixv1.RadixBatchJobStatus
event events.Event
}
Expand All @@ -100,33 +94,62 @@ func Test_webhookNotifier_Notify(t *testing.T) {
startJobTime3 := metav1.NewTime(activeTime.Add(2 * time.Minute))
endJobTime1 := metav1.NewTime(activeTime.Add(10 * time.Minute))
endJobTime3 := metav1.NewTime(activeTime.Add(20 * time.Minute))
jobComponentName := "anyjobcomponent"

tests := []struct {
name string
fields fields
args args
}{
{name: "No request for not enabled notifier",
fields: fields{enabled: false, webhook: "http://job1:8080", expectedRequest: false, expectedError: false},
{name: "No request for notifier with empty webhook",
fields: fields{jobComponentName: jobComponentName, webhookURL: "", expectedRequest: false, expectedError: false},
args: args{
event: events.Update,
newRadixBatch: &radixv1.RadixBatch{ObjectMeta: metav1.ObjectMeta{Name: "batch1", Labels: labels.ForBatchType(kube.RadixBatchTypeBatch)},
Status: radixv1.RadixBatchStatus{Condition: radixv1.RadixBatchCondition{Type: radixv1.BatchConditionTypeWaiting}}},
radixBatch: &radixv1.RadixBatch{
ObjectMeta: metav1.ObjectMeta{Name: "batch1", Labels: labels.ForBatchType(kube.RadixBatchTypeBatch)},
Status: radixv1.RadixBatchStatus{Condition: radixv1.RadixBatchCondition{Type: radixv1.BatchConditionTypeWaiting}}},
updatedJobStatuses: []radixv1.RadixBatchJobStatus{{Name: "job1"}}},
},
{name: "No request for enabled notifier with empty webhook",
fields: fields{enabled: true, webhook: "", expectedRequest: false, expectedError: false},
{name: "No webhook request for other job component",
fields: fields{jobComponentName: jobComponentName, webhookURL: "http://job1:8080", expectedRequest: false, expectedError: false, expectedBatchNameInJobs: "batch1"},
args: args{
event: events.Update,
newRadixBatch: &radixv1.RadixBatch{ObjectMeta: metav1.ObjectMeta{Name: "batch1", Labels: labels.ForBatchType(kube.RadixBatchTypeBatch)},
Status: radixv1.RadixBatchStatus{Condition: radixv1.RadixBatchCondition{Type: radixv1.BatchConditionTypeWaiting}}},
updatedJobStatuses: []radixv1.RadixBatchJobStatus{{Name: "job1"}}},
radixBatch: &radixv1.RadixBatch{
ObjectMeta: metav1.ObjectMeta{Name: "batch1", Labels: labels.ForBatchType(kube.RadixBatchTypeBatch)},
Spec: radixv1.RadixBatchSpec{RadixDeploymentJobRef: radixv1.RadixDeploymentJobComponentSelector{Job: "otherjobcomponent"}},
Status: radixv1.RadixBatchStatus{
Condition: radixv1.RadixBatchCondition{
Type: radixv1.BatchConditionTypeWaiting,
Reason: "some reason",
Message: "some message",
},
},
},
updatedJobStatuses: []radixv1.RadixBatchJobStatus{}},
},
{name: "No webhook request when missing RadixDeploymentJobRef",
fields: fields{jobComponentName: jobComponentName, webhookURL: "http://job1:8080", expectedRequest: false, expectedError: false, expectedBatchNameInJobs: "batch1"},
args: args{
event: events.Update,
radixBatch: &radixv1.RadixBatch{
ObjectMeta: metav1.ObjectMeta{Name: "batch1", Labels: labels.ForBatchType(kube.RadixBatchTypeBatch)},
Status: radixv1.RadixBatchStatus{
Condition: radixv1.RadixBatchCondition{
Type: radixv1.BatchConditionTypeWaiting,
Reason: "some reason",
Message: "some message",
},
},
},
updatedJobStatuses: []radixv1.RadixBatchJobStatus{}},
},
{name: "Waiting batch, no jobs",
fields: fields{enabled: true, webhook: "http://job1:8080", expectedRequest: true, expectedError: false, expectedBatchNameInJobs: "batch1"},
fields: fields{jobComponentName: jobComponentName, webhookURL: "http://job1:8080", expectedRequest: true, expectedError: false, expectedBatchNameInJobs: "batch1"},
args: args{
event: events.Update,
newRadixBatch: &radixv1.RadixBatch{ObjectMeta: metav1.ObjectMeta{Name: "batch1", Labels: labels.ForBatchType(kube.RadixBatchTypeBatch)},
radixBatch: &radixv1.RadixBatch{
ObjectMeta: metav1.ObjectMeta{Name: "batch1", Labels: labels.ForBatchType(kube.RadixBatchTypeBatch)},
Spec: radixv1.RadixBatchSpec{RadixDeploymentJobRef: radixv1.RadixDeploymentJobComponentSelector{Job: jobComponentName}},
Status: radixv1.RadixBatchStatus{
Condition: radixv1.RadixBatchCondition{
Type: radixv1.BatchConditionTypeWaiting,
Expand All @@ -138,10 +161,12 @@ func Test_webhookNotifier_Notify(t *testing.T) {
updatedJobStatuses: []radixv1.RadixBatchJobStatus{}},
},
{name: "Active batch",
fields: fields{enabled: true, webhook: "http://job1:8080", expectedRequest: true, expectedError: false, expectedBatchNameInJobs: "batch1"},
fields: fields{jobComponentName: jobComponentName, webhookURL: "http://job1:8080", expectedRequest: true, expectedError: false, expectedBatchNameInJobs: "batch1"},
args: args{
event: events.Update,
newRadixBatch: &radixv1.RadixBatch{ObjectMeta: metav1.ObjectMeta{Name: "batch1", Labels: labels.ForBatchType(kube.RadixBatchTypeBatch)},
radixBatch: &radixv1.RadixBatch{
ObjectMeta: metav1.ObjectMeta{Name: "batch1", Labels: labels.ForBatchType(kube.RadixBatchTypeBatch)},
Spec: radixv1.RadixBatchSpec{RadixDeploymentJobRef: radixv1.RadixDeploymentJobComponentSelector{Job: jobComponentName}},
Status: radixv1.RadixBatchStatus{
Condition: radixv1.RadixBatchCondition{
Type: radixv1.BatchConditionTypeActive,
Expand All @@ -155,13 +180,14 @@ func Test_webhookNotifier_Notify(t *testing.T) {
updatedJobStatuses: []radixv1.RadixBatchJobStatus{}},
},
{name: "Completed Batch with multiple jobs",
fields: fields{enabled: true, webhook: "http://job1:8080", expectedRequest: true, expectedError: false, expectedBatchNameInJobs: "batch1"},
fields: fields{jobComponentName: jobComponentName, webhookURL: "http://job1:8080", expectedRequest: true, expectedError: false, expectedBatchNameInJobs: "batch1"},
args: args{
event: events.Update,
newRadixBatch: &radixv1.RadixBatch{
radixBatch: &radixv1.RadixBatch{
ObjectMeta: metav1.ObjectMeta{Name: "batch1", Labels: labels.ForBatchType(kube.RadixBatchTypeBatch)},
Spec: radixv1.RadixBatchSpec{
Jobs: []radixv1.RadixBatchJob{{Name: "job1"}, {Name: "job2"}, {Name: "job3"}},
RadixDeploymentJobRef: radixv1.RadixDeploymentJobComponentSelector{Job: jobComponentName},
Jobs: []radixv1.RadixBatchJob{{Name: "job1"}, {Name: "job2"}, {Name: "job3"}},
},
Status: radixv1.RadixBatchStatus{
Condition: radixv1.RadixBatchCondition{
Expand Down Expand Up @@ -193,13 +219,14 @@ func Test_webhookNotifier_Notify(t *testing.T) {
}},
},
{name: "Completed single job batch",
fields: fields{enabled: true, webhook: "http://job1:8080", expectedRequest: true, expectedError: false, expectedBatchNameInJobs: ""},
fields: fields{jobComponentName: jobComponentName, webhookURL: "http://job1:8080", expectedRequest: true, expectedError: false, expectedBatchNameInJobs: ""},
args: args{
event: events.Update,
newRadixBatch: &radixv1.RadixBatch{
radixBatch: &radixv1.RadixBatch{
ObjectMeta: metav1.ObjectMeta{Name: "batch1", Labels: labels.ForBatchType(kube.RadixBatchTypeJob)},
Spec: radixv1.RadixBatchSpec{
Jobs: []radixv1.RadixBatchJob{{Name: "job1"}},
RadixDeploymentJobRef: radixv1.RadixDeploymentJobComponentSelector{Job: jobComponentName},
Jobs: []radixv1.RadixBatchJob{{Name: "job1"}},
},
Status: radixv1.RadixBatchStatus{
Condition: radixv1.RadixBatchCondition{
Expand All @@ -225,11 +252,11 @@ func Test_webhookNotifier_Notify(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
notifier := &webhookNotifier{
enabled: tt.fields.enabled,
webhook: tt.fields.webhook,
jobComponent := &radixv1.RadixDeployJobComponent{
Name: tt.fields.jobComponentName,
Notifications: &radixv1.Notifications{Webhook: pointers.Ptr(tt.fields.webhookURL)},
}

notifier := NewWebhookNotifier(jobComponent)
var receivedRequest *http.Request
http.DefaultClient = &http.Client{
Transport: &testTransport{
Expand All @@ -240,17 +267,14 @@ func Test_webhookNotifier_Notify(t *testing.T) {
}

errChan := make(chan error)
doneChan := notifier.Notify(tt.args.event, tt.args.newRadixBatch, tt.args.updatedJobStatuses, errChan)
doneChan := notifier.Notify(tt.args.event, tt.args.radixBatch, tt.args.updatedJobStatuses, errChan)

var notificationErr error
select {
case <-doneChan:
break
case notificationErr = <-errChan:
break
case <-time.After(1 * time.Minute):
assert.Fail(t, "unexpected long request timeout")
break
}

if tt.fields.expectedRequest && receivedRequest == nil {
Expand All @@ -268,17 +292,17 @@ func Test_webhookNotifier_Notify(t *testing.T) {
return
}
if receivedRequest != nil {
assert.Equal(t, tt.fields.webhook, fmt.Sprintf("%s://%s", receivedRequest.URL.Scheme, receivedRequest.Host))
assert.Equal(t, tt.fields.webhookURL, fmt.Sprintf("%s://%s", receivedRequest.URL.Scheme, receivedRequest.Host))
var batchStatus modelsv1.BatchStatus
if body, _ := io.ReadAll(receivedRequest.Body); len(body) > 0 {
if err := json.Unmarshal(body, &batchStatus); err != nil {
assert.Fail(t, fmt.Sprintf("failed to decerialize the request body: %v", err))
assert.Fail(t, fmt.Sprintf("failed to deserialize the request body: %v", err))
return
}
}
assert.Equal(t, tt.args.newRadixBatch.Name, batchStatus.Name, "Not matching batch name")
assertTimesEqual(t, tt.args.newRadixBatch.Status.Condition.ActiveTime, batchStatus.Started, "batchStatus.Started")
assertTimesEqual(t, tt.args.newRadixBatch.Status.Condition.CompletionTime, batchStatus.Ended, "batchStatus.Ended")
assert.Equal(t, tt.args.radixBatch.Name, batchStatus.Name, "Not matching batch name")
assertTimesEqual(t, tt.args.radixBatch.Status.Condition.ActiveTime, batchStatus.Started, "batchStatus.Started")
assertTimesEqual(t, tt.args.radixBatch.Status.Condition.CompletionTime, batchStatus.Ended, "batchStatus.Ended")
if len(tt.args.updatedJobStatuses) != len(batchStatus.JobStatuses) {
assert.Fail(t, fmt.Sprintf("Not matching amount of updatedJobStatuses %d and JobStatuses %d", len(tt.args.updatedJobStatuses), len(batchStatus.JobStatuses)))
return
Expand Down

0 comments on commit 74add41

Please sign in to comment.