diff --git a/charts/nginx-gateway-fabric/templates/tmp-nginx-agent-conf.yaml b/charts/nginx-gateway-fabric/templates/tmp-nginx-agent-conf.yaml index 586a8d38d0..e2126ee9e1 100644 --- a/charts/nginx-gateway-fabric/templates/tmp-nginx-agent-conf.yaml +++ b/charts/nginx-gateway-fabric/templates/tmp-nginx-agent-conf.yaml @@ -18,5 +18,8 @@ data: - configuration - certificates - metrics + {{- if .Values.nginx.plus }} + - api-action + {{- end }} log: level: debug diff --git a/deploy/experimental-nginx-plus/deploy.yaml b/deploy/experimental-nginx-plus/deploy.yaml index a6964b19f2..54bb27fc93 100644 --- a/deploy/experimental-nginx-plus/deploy.yaml +++ b/deploy/experimental-nginx-plus/deploy.yaml @@ -163,6 +163,7 @@ data: - configuration - certificates - metrics + - api-action log: level: debug kind: ConfigMap diff --git a/deploy/nginx-plus/deploy.yaml b/deploy/nginx-plus/deploy.yaml index 8274bbc7e8..7d545dce91 100644 --- a/deploy/nginx-plus/deploy.yaml +++ b/deploy/nginx-plus/deploy.yaml @@ -158,6 +158,7 @@ data: - configuration - certificates - metrics + - api-action log: level: debug kind: ConfigMap diff --git a/deploy/snippets-filters-nginx-plus/deploy.yaml b/deploy/snippets-filters-nginx-plus/deploy.yaml index 5a5d69827e..6a7a6640c6 100644 --- a/deploy/snippets-filters-nginx-plus/deploy.yaml +++ b/deploy/snippets-filters-nginx-plus/deploy.yaml @@ -160,6 +160,7 @@ data: - configuration - certificates - metrics + - api-action log: level: debug kind: ConfigMap diff --git a/internal/mode/static/handler.go b/internal/mode/static/handler.go index a5f9142d4d..6c86e2f7ee 100644 --- a/internal/mode/static/handler.go +++ b/internal/mode/static/handler.go @@ -185,7 +185,12 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log h.setLatestConfiguration(&cfg) if h.cfg.plus { - h.cfg.nginxUpdater.UpdateUpstreamServers() + // TODO(sberman): hardcode this deployment name until we support provisioning data planes + deployment := types.NamespacedName{ + Name: "tmp-nginx-deployment", + Namespace: h.cfg.gatewayPodConfig.Namespace, + } + err = h.cfg.nginxUpdater.UpdateUpstreamServers(ctx, deployment, cfg) } else { err = h.updateNginxConf(ctx, cfg) } @@ -314,7 +319,9 @@ func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.C // If using NGINX Plus, update upstream servers using the API. if h.cfg.plus { - h.cfg.nginxUpdater.UpdateUpstreamServers() + if err := h.cfg.nginxUpdater.UpdateUpstreamServers(ctx, deployment, conf); err != nil { + return err + } } return nil diff --git a/internal/mode/static/nginx/agent/agent.go b/internal/mode/static/nginx/agent/agent.go index a13df4cb71..527a7305d7 100644 --- a/internal/mode/static/nginx/agent/agent.go +++ b/internal/mode/static/nginx/agent/agent.go @@ -2,13 +2,19 @@ package agent import ( "context" + "errors" "fmt" "github.com/go-logr/logr" + pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "google.golang.org/protobuf/types/known/structpb" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast" agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/resolver" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate @@ -17,8 +23,16 @@ import ( // NginxUpdater is an interface for updating NGINX using the NGINX agent. type NginxUpdater interface { - UpdateConfig(context.Context, types.NamespacedName, []File) error - UpdateUpstreamServers() + UpdateConfig( + ctx context.Context, + deploymentNsName types.NamespacedName, + files []File, + ) error + UpdateUpstreamServers( + ctx context.Context, + deploymentNsName types.NamespacedName, + conf dataplane.Configuration, + ) error } // NginxUpdaterImpl implements the NginxUpdater interface. @@ -76,10 +90,76 @@ func (n *NginxUpdaterImpl) UpdateConfig(ctx context.Context, nsName types.Namesp // UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API. // Only applicable when using NGINX Plus. -func (n *NginxUpdaterImpl) UpdateUpstreamServers() { +func (n *NginxUpdaterImpl) UpdateUpstreamServers( + ctx context.Context, + nsName types.NamespacedName, + conf dataplane.Configuration, +) error { if !n.plus { - return + return nil } n.logger.Info("Updating upstream servers using NGINX Plus API") + + deployment := n.nginxDeployments.GetOrStore(ctx, nsName) + if deployment == nil { + return fmt.Errorf("failed to register nginx deployment %q", nsName.Name) + } + broadcaster := deployment.GetBroadcaster() + + var updateErr error + for _, upstream := range conf.Upstreams { + msg := broadcast.NginxAgentMessage{ + Type: broadcast.APIRequest, + NGINXPlusAction: &pb.NGINXPlusAction{ + Action: &pb.NGINXPlusAction_UpdateHttpUpstreamServers{ + UpdateHttpUpstreamServers: buildUpstreamServers(upstream), + }, + }, + } + + if err := broadcaster.Send(msg); err != nil { + updateErr = errors.Join(updateErr, fmt.Errorf( + "couldn't update upstream %q via the API: %w", upstream.Name, err)) + } + } + + return updateErr +} + +func buildUpstreamServers(upstream dataplane.Upstream) *pb.UpdateHTTPUpstreamServers { + servers := make([]*structpb.Struct, 0, len(upstream.Endpoints)) + + for _, endpoint := range upstream.Endpoints { + port, format := getPortAndIPFormat(endpoint) + value := fmt.Sprintf(format, endpoint.Address, port) + + server := &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "server": structpb.NewStringValue(value), + }, + } + + servers = append(servers, server) + } + + return &pb.UpdateHTTPUpstreamServers{ + HttpUpstreamName: upstream.Name, + Servers: servers, + } +} + +func getPortAndIPFormat(ep resolver.Endpoint) (string, string) { + var port string + + if ep.Port != 0 { + port = fmt.Sprintf(":%d", ep.Port) + } + + format := "%s%s" + if ep.IPv6 { + format = "[%s]%s" + } + + return port, format } diff --git a/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go b/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go index 6ed045639b..77e1b5438d 100644 --- a/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go +++ b/internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane" "k8s.io/apimachinery/pkg/types" ) @@ -23,9 +24,18 @@ type FakeNginxUpdater struct { updateConfigReturnsOnCall map[int]struct { result1 error } - UpdateUpstreamServersStub func() + UpdateUpstreamServersStub func(context.Context, types.NamespacedName, dataplane.Configuration) error updateUpstreamServersMutex sync.RWMutex updateUpstreamServersArgsForCall []struct { + arg1 context.Context + arg2 types.NamespacedName + arg3 dataplane.Configuration + } + updateUpstreamServersReturns struct { + result1 error + } + updateUpstreamServersReturnsOnCall map[int]struct { + result1 error } invocations map[string][][]interface{} invocationsMutex sync.RWMutex @@ -99,16 +109,25 @@ func (fake *FakeNginxUpdater) UpdateConfigReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeNginxUpdater) UpdateUpstreamServers() { +func (fake *FakeNginxUpdater) UpdateUpstreamServers(arg1 context.Context, arg2 types.NamespacedName, arg3 dataplane.Configuration) error { fake.updateUpstreamServersMutex.Lock() + ret, specificReturn := fake.updateUpstreamServersReturnsOnCall[len(fake.updateUpstreamServersArgsForCall)] fake.updateUpstreamServersArgsForCall = append(fake.updateUpstreamServersArgsForCall, struct { - }{}) + arg1 context.Context + arg2 types.NamespacedName + arg3 dataplane.Configuration + }{arg1, arg2, arg3}) stub := fake.UpdateUpstreamServersStub - fake.recordInvocation("UpdateUpstreamServers", []interface{}{}) + fakeReturns := fake.updateUpstreamServersReturns + fake.recordInvocation("UpdateUpstreamServers", []interface{}{arg1, arg2, arg3}) fake.updateUpstreamServersMutex.Unlock() if stub != nil { - fake.UpdateUpstreamServersStub() + return stub(arg1, arg2, arg3) } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 } func (fake *FakeNginxUpdater) UpdateUpstreamServersCallCount() int { @@ -117,12 +136,42 @@ func (fake *FakeNginxUpdater) UpdateUpstreamServersCallCount() int { return len(fake.updateUpstreamServersArgsForCall) } -func (fake *FakeNginxUpdater) UpdateUpstreamServersCalls(stub func()) { +func (fake *FakeNginxUpdater) UpdateUpstreamServersCalls(stub func(context.Context, types.NamespacedName, dataplane.Configuration) error) { fake.updateUpstreamServersMutex.Lock() defer fake.updateUpstreamServersMutex.Unlock() fake.UpdateUpstreamServersStub = stub } +func (fake *FakeNginxUpdater) UpdateUpstreamServersArgsForCall(i int) (context.Context, types.NamespacedName, dataplane.Configuration) { + fake.updateUpstreamServersMutex.RLock() + defer fake.updateUpstreamServersMutex.RUnlock() + argsForCall := fake.updateUpstreamServersArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeNginxUpdater) UpdateUpstreamServersReturns(result1 error) { + fake.updateUpstreamServersMutex.Lock() + defer fake.updateUpstreamServersMutex.Unlock() + fake.UpdateUpstreamServersStub = nil + fake.updateUpstreamServersReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeNginxUpdater) UpdateUpstreamServersReturnsOnCall(i int, result1 error) { + fake.updateUpstreamServersMutex.Lock() + defer fake.updateUpstreamServersMutex.Unlock() + fake.UpdateUpstreamServersStub = nil + if fake.updateUpstreamServersReturnsOnCall == nil { + fake.updateUpstreamServersReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.updateUpstreamServersReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeNginxUpdater) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() diff --git a/internal/mode/static/nginx/agent/broadcast/broadcast.go b/internal/mode/static/nginx/agent/broadcast/broadcast.go index 877bc06cf3..ca71958c61 100644 --- a/internal/mode/static/nginx/agent/broadcast/broadcast.go +++ b/internal/mode/static/nginx/agent/broadcast/broadcast.go @@ -3,20 +3,24 @@ package broadcast import ( "context" "errors" - "time" + "sync" pb "github.com/nginx/agent/v3/api/grpc/mpi/v1" ) +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate + +//counterfeiter:generate . Broadcaster + // Broadcaster defines an interface for consumers to subscribe to File updates. type Broadcaster interface { - Subscribe() (chan FileOverviewMessage, chan error) - Send(FileOverviewMessage) error - CancelSubscription(chan FileOverviewMessage) + Subscribe() (chan NginxAgentMessage, chan<- error) + Send(NginxAgentMessage) error + CancelSubscription(chan NginxAgentMessage) } type subscriberChannels struct { - listenCh chan FileOverviewMessage + listenCh chan NginxAgentMessage responseCh chan error } @@ -25,19 +29,21 @@ type subscriberChannels struct { // about this Deployment. The agent Subscription will then send a response of whether or not // the configuration was successfully applied. type DeploymentBroadcaster struct { - publishCh chan FileOverviewMessage + publishCh chan NginxAgentMessage subCh chan subscriberChannels - unsubCh chan chan FileOverviewMessage - listeners map[chan FileOverviewMessage]chan error + unsubCh chan chan NginxAgentMessage + listeners map[chan NginxAgentMessage]chan error + errorCh chan error } // NewDeploymentBroadcaster returns a new instance of a DeploymentBroadcaster. func NewDeploymentBroadcaster(ctx context.Context) *DeploymentBroadcaster { broadcaster := &DeploymentBroadcaster{ - listeners: make(map[chan FileOverviewMessage]chan error), - publishCh: make(chan FileOverviewMessage), + listeners: make(map[chan NginxAgentMessage]chan error), + publishCh: make(chan NginxAgentMessage), subCh: make(chan subscriberChannels), - unsubCh: make(chan chan FileOverviewMessage), + unsubCh: make(chan chan NginxAgentMessage), + errorCh: make(chan error), } go broadcaster.run(ctx) @@ -46,8 +52,8 @@ func NewDeploymentBroadcaster(ctx context.Context) *DeploymentBroadcaster { // Subscribe allows a listener to subscribe to broadcast messages. It returns the channel // to listen on for messages, as well as a channel to respond on. -func (b *DeploymentBroadcaster) Subscribe() (chan FileOverviewMessage, chan error) { - listenCh := make(chan FileOverviewMessage) +func (b *DeploymentBroadcaster) Subscribe() (chan NginxAgentMessage, chan<- error) { + listenCh := make(chan NginxAgentMessage) responseCh := make(chan error) sc := subscriberChannels{ listenCh: listenCh, @@ -58,42 +64,16 @@ func (b *DeploymentBroadcaster) Subscribe() (chan FileOverviewMessage, chan erro return listenCh, responseCh } -// Send the fileOverviews to all listeners. Wait for a response from each listener +// Send the message to all listeners. Wait for a response from each listener // or until a timeout. -func (b *DeploymentBroadcaster) Send(fileOverviews FileOverviewMessage) error { - b.publishCh <- fileOverviews - - timeout := 15 * time.Second - timeoutErr := errors.New("timeout waiting for data plane response") - - done := make(chan error, len(b.listeners)) - - for _, responseCh := range b.listeners { - go func(ch chan error) { - select { - case res := <-ch: - done <- res - case <-time.After(timeout): - done <- timeoutErr - } - }(responseCh) - } - - var err error - for range len(b.listeners) { - select { - case res := <-done: - err = errors.Join(err, res) - case <-time.After(timeout): - err = errors.Join(err, timeoutErr) - } - } +func (b *DeploymentBroadcaster) Send(message NginxAgentMessage) error { + b.publishCh <- message - return err + return <-b.errorCh } // CancelSubscription removes a Subscriber from the channel list. -func (b *DeploymentBroadcaster) CancelSubscription(channel chan FileOverviewMessage) { +func (b *DeploymentBroadcaster) CancelSubscription(channel chan NginxAgentMessage) { b.unsubCh <- channel } @@ -120,20 +100,52 @@ func (b *DeploymentBroadcaster) run(ctx context.Context) { case msgCh := <-b.unsubCh: delete(b.listeners, msgCh) case msg := <-b.publishCh: - for msgCh := range b.listeners { - select { - case msgCh <- msg: - default: - } + var wg sync.WaitGroup + wg.Add(len(b.listeners)) + + responses := make(chan error, len(b.listeners)) + for msgCh, responseCh := range b.listeners { + go func() { + defer wg.Done() + + // send message and wait for it to be read + msgCh <- msg + // wait for response + res := <-responseCh + // add response to the list of responses + responses <- res + }() + } + wg.Wait() + + var err error + for range len(b.listeners) { + err = errors.Join(err, <-responses) } + b.errorCh <- err } } } -// FileOverviewMessage is sent to all subscribers to send to the nginx agents for a ConfigApplyRequest. -type FileOverviewMessage struct { +// MessageType is the type of message to be sent. +type MessageType int + +const ( + // ConfigApplyRequest sends files to update nginx configuration. + ConfigApplyRequest MessageType = iota + // APIRequest sends an NGINX Plus API request to update configuration. + APIRequest +) + +// NginxAgentMessage is sent to all subscribers to send to the nginx agents for either a ConfigApplyRequest +// or an APIActionRequest. +type NginxAgentMessage struct { // ConfigVersion is the hashed configuration version of the included files. ConfigVersion string + // NGINXPlusAction is an NGINX Plus API action to be sent. + NGINXPlusAction *pb.NGINXPlusAction // FileOverviews contain the overviews of all files to be sent. FileOverviews []*pb.File + // Type defines the type of message to be sent. + Type MessageType } diff --git a/internal/mode/static/nginx/agent/broadcast/broadcastfakes/fake_broadcaster.go b/internal/mode/static/nginx/agent/broadcast/broadcastfakes/fake_broadcaster.go new file mode 100644 index 0000000000..97c02b203f --- /dev/null +++ b/internal/mode/static/nginx/agent/broadcast/broadcastfakes/fake_broadcaster.go @@ -0,0 +1,220 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package broadcastfakes + +import ( + "sync" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast" +) + +type FakeBroadcaster struct { + CancelSubscriptionStub func(chan broadcast.NginxAgentMessage) + cancelSubscriptionMutex sync.RWMutex + cancelSubscriptionArgsForCall []struct { + arg1 chan broadcast.NginxAgentMessage + } + SendStub func(broadcast.NginxAgentMessage) error + sendMutex sync.RWMutex + sendArgsForCall []struct { + arg1 broadcast.NginxAgentMessage + } + sendReturns struct { + result1 error + } + sendReturnsOnCall map[int]struct { + result1 error + } + SubscribeStub func() (chan broadcast.NginxAgentMessage, chan<- error) + subscribeMutex sync.RWMutex + subscribeArgsForCall []struct { + } + subscribeReturns struct { + result1 chan broadcast.NginxAgentMessage + result2 chan<- error + } + subscribeReturnsOnCall map[int]struct { + result1 chan broadcast.NginxAgentMessage + result2 chan<- error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeBroadcaster) CancelSubscription(arg1 chan broadcast.NginxAgentMessage) { + fake.cancelSubscriptionMutex.Lock() + fake.cancelSubscriptionArgsForCall = append(fake.cancelSubscriptionArgsForCall, struct { + arg1 chan broadcast.NginxAgentMessage + }{arg1}) + stub := fake.CancelSubscriptionStub + fake.recordInvocation("CancelSubscription", []interface{}{arg1}) + fake.cancelSubscriptionMutex.Unlock() + if stub != nil { + fake.CancelSubscriptionStub(arg1) + } +} + +func (fake *FakeBroadcaster) CancelSubscriptionCallCount() int { + fake.cancelSubscriptionMutex.RLock() + defer fake.cancelSubscriptionMutex.RUnlock() + return len(fake.cancelSubscriptionArgsForCall) +} + +func (fake *FakeBroadcaster) CancelSubscriptionCalls(stub func(chan broadcast.NginxAgentMessage)) { + fake.cancelSubscriptionMutex.Lock() + defer fake.cancelSubscriptionMutex.Unlock() + fake.CancelSubscriptionStub = stub +} + +func (fake *FakeBroadcaster) CancelSubscriptionArgsForCall(i int) chan broadcast.NginxAgentMessage { + fake.cancelSubscriptionMutex.RLock() + defer fake.cancelSubscriptionMutex.RUnlock() + argsForCall := fake.cancelSubscriptionArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeBroadcaster) Send(arg1 broadcast.NginxAgentMessage) error { + fake.sendMutex.Lock() + ret, specificReturn := fake.sendReturnsOnCall[len(fake.sendArgsForCall)] + fake.sendArgsForCall = append(fake.sendArgsForCall, struct { + arg1 broadcast.NginxAgentMessage + }{arg1}) + stub := fake.SendStub + fakeReturns := fake.sendReturns + fake.recordInvocation("Send", []interface{}{arg1}) + fake.sendMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeBroadcaster) SendCallCount() int { + fake.sendMutex.RLock() + defer fake.sendMutex.RUnlock() + return len(fake.sendArgsForCall) +} + +func (fake *FakeBroadcaster) SendCalls(stub func(broadcast.NginxAgentMessage) error) { + fake.sendMutex.Lock() + defer fake.sendMutex.Unlock() + fake.SendStub = stub +} + +func (fake *FakeBroadcaster) SendArgsForCall(i int) broadcast.NginxAgentMessage { + fake.sendMutex.RLock() + defer fake.sendMutex.RUnlock() + argsForCall := fake.sendArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeBroadcaster) SendReturns(result1 error) { + fake.sendMutex.Lock() + defer fake.sendMutex.Unlock() + fake.SendStub = nil + fake.sendReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeBroadcaster) SendReturnsOnCall(i int, result1 error) { + fake.sendMutex.Lock() + defer fake.sendMutex.Unlock() + fake.SendStub = nil + if fake.sendReturnsOnCall == nil { + fake.sendReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.sendReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeBroadcaster) Subscribe() (chan broadcast.NginxAgentMessage, chan<- error) { + fake.subscribeMutex.Lock() + ret, specificReturn := fake.subscribeReturnsOnCall[len(fake.subscribeArgsForCall)] + fake.subscribeArgsForCall = append(fake.subscribeArgsForCall, struct { + }{}) + stub := fake.SubscribeStub + fakeReturns := fake.subscribeReturns + fake.recordInvocation("Subscribe", []interface{}{}) + fake.subscribeMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeBroadcaster) SubscribeCallCount() int { + fake.subscribeMutex.RLock() + defer fake.subscribeMutex.RUnlock() + return len(fake.subscribeArgsForCall) +} + +func (fake *FakeBroadcaster) SubscribeCalls(stub func() (chan broadcast.NginxAgentMessage, chan<- error)) { + fake.subscribeMutex.Lock() + defer fake.subscribeMutex.Unlock() + fake.SubscribeStub = stub +} + +func (fake *FakeBroadcaster) SubscribeReturns(result1 chan broadcast.NginxAgentMessage, result2 chan<- error) { + fake.subscribeMutex.Lock() + defer fake.subscribeMutex.Unlock() + fake.SubscribeStub = nil + fake.subscribeReturns = struct { + result1 chan broadcast.NginxAgentMessage + result2 chan<- error + }{result1, result2} +} + +func (fake *FakeBroadcaster) SubscribeReturnsOnCall(i int, result1 chan broadcast.NginxAgentMessage, result2 chan<- error) { + fake.subscribeMutex.Lock() + defer fake.subscribeMutex.Unlock() + fake.SubscribeStub = nil + if fake.subscribeReturnsOnCall == nil { + fake.subscribeReturnsOnCall = make(map[int]struct { + result1 chan broadcast.NginxAgentMessage + result2 chan<- error + }) + } + fake.subscribeReturnsOnCall[i] = struct { + result1 chan broadcast.NginxAgentMessage + result2 chan<- error + }{result1, result2} +} + +func (fake *FakeBroadcaster) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.cancelSubscriptionMutex.RLock() + defer fake.cancelSubscriptionMutex.RUnlock() + fake.sendMutex.RLock() + defer fake.sendMutex.RUnlock() + fake.subscribeMutex.RLock() + defer fake.subscribeMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeBroadcaster) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ broadcast.Broadcaster = new(FakeBroadcaster) diff --git a/internal/mode/static/nginx/agent/command.go b/internal/mode/static/nginx/agent/command.go index c6d294b680..7dd79c8f63 100644 --- a/internal/mode/static/nginx/agent/command.go +++ b/internal/mode/static/nginx/agent/command.go @@ -19,6 +19,7 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast" agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc" grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/meta" @@ -72,6 +73,8 @@ func (cs *commandService) CreateConnection( instances := resource.GetInstances() if len(instances) != 2 { + // TODO(sberman): wait for DataPlaneStatusRequest to send us the instance data. + // Somehow utilize the connections map to track the instanceID? return nil, status.Errorf(codes.InvalidArgument, "connection request does not contain agent and nginx instances") } @@ -139,9 +142,19 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error case <-ctx.Done(): return ctx.Err() case msg := <-listenerCh: - if err := in.Send(buildRequest(msg.FileOverviews, conn.InstanceID, msg.ConfigVersion)); err != nil { + var req *pb.ManagementPlaneRequest + switch msg.Type { + case broadcast.ConfigApplyRequest: + req = buildRequest(msg.FileOverviews, conn.InstanceID, msg.ConfigVersion) + case broadcast.APIRequest: + req = buildPlusAPIRequest(msg.NGINXPlusAction, conn.InstanceID) + default: + panic(fmt.Sprintf("unknown request type %d", msg.Type)) + } + + if err := in.Send(req); err != nil { cs.logger.Error(err, "error sending request to agent") - sendResponse(responseCh, err) + responseCh <- err return err } @@ -188,7 +201,7 @@ func (cs *commandService) waitForConnection( func (cs *commandService) listenForDataPlaneResponse( ctx context.Context, in pb.CommandService_SubscribeServer, - responseCh chan error, + responseCh chan<- error, ) { for { select { @@ -204,9 +217,9 @@ func (cs *commandService) listenForDataPlaneResponse( res := dataPlaneResponse.GetCommandResponse() if res.GetStatus() != pb.CommandResponse_COMMAND_STATUS_OK { err := fmt.Errorf("bad response from agent: %s; error: %s", res.GetMessage(), res.GetError()) - sendResponse(responseCh, err) + responseCh <- err } else { - sendResponse(responseCh, nil) + responseCh <- nil } } } @@ -233,13 +246,21 @@ func buildRequest(fileOverviews []*pb.File, instanceID, version string) *pb.Mana } } -// sendResponse either sends an error or nil back to the broadcaster so it -// can relay the status of the ConfigApply back to the caller. -// We do not wait for the value to be read before returning. -func sendResponse(responseCh chan error, err error) { - select { - case responseCh <- err: - default: +func buildPlusAPIRequest(action *pb.NGINXPlusAction, instanceID string) *pb.ManagementPlaneRequest { + return &pb.ManagementPlaneRequest{ + MessageMeta: &pb.MessageMeta{ + MessageId: meta.GenerateMessageID(), + CorrelationId: meta.GenerateMessageID(), + Timestamp: timestamppb.Now(), + }, + Request: &pb.ManagementPlaneRequest_ActionRequest{ + ActionRequest: &pb.APIActionRequest{ + InstanceId: instanceID, + Action: &pb.APIActionRequest_NginxPlusAction{ + NginxPlusAction: action, + }, + }, + }, } } diff --git a/internal/mode/static/nginx/agent/deployment.go b/internal/mode/static/nginx/agent/deployment.go index b542deac97..34d5a2fd0d 100644 --- a/internal/mode/static/nginx/agent/deployment.go +++ b/internal/mode/static/nginx/agent/deployment.go @@ -61,7 +61,7 @@ func (d *Deployment) GetFile(name, hash string) []byte { } // SetFiles updates the nginx files and fileOverviews for the deployment and returns the message to send. -func (d *Deployment) SetFiles(files []File) broadcast.FileOverviewMessage { +func (d *Deployment) SetFiles(files []File) broadcast.NginxAgentMessage { d.lock.Lock() defer d.lock.Unlock() @@ -91,7 +91,8 @@ func (d *Deployment) SetFiles(files []File) broadcast.FileOverviewMessage { d.configVersion = filesHelper.GenerateConfigVersion(fileOverviews) d.fileOverviews = fileOverviews - return broadcast.FileOverviewMessage{ + return broadcast.NginxAgentMessage{ + Type: broadcast.ConfigApplyRequest, FileOverviews: fileOverviews, ConfigVersion: d.configVersion, }