From 85880f11b54aa39ad36a35f838fffffdf3ff2506 Mon Sep 17 00:00:00 2001 From: hovsep Date: Thu, 24 Oct 2024 02:47:09 +0300 Subject: [PATCH] Add integration test --- component/activation_result.go | 1 + component/collection.go | 2 +- component/component.go | 20 ++- fmesh.go | 22 ++- .../error_handling/chainable_api_test.go | 154 ++++++++++++++++++ port/collection.go | 4 +- port/errors.go | 7 + port/port.go | 3 + signal/errors.go | 8 + signal/group.go | 5 +- 10 files changed, 217 insertions(+), 9 deletions(-) create mode 100644 integration_tests/error_handling/chainable_api_test.go create mode 100644 port/errors.go create mode 100644 signal/errors.go diff --git a/component/activation_result.go b/component/activation_result.go index 30422f7..8ec3619 100644 --- a/component/activation_result.go +++ b/component/activation_result.go @@ -72,6 +72,7 @@ const ( func NewActivationResult(componentName string) *ActivationResult { return &ActivationResult{ componentName: componentName, + Chainable: common.NewChainable(), } } diff --git a/component/collection.go b/component/collection.go index 87c2038..fa9d3c1 100644 --- a/component/collection.go +++ b/component/collection.go @@ -24,7 +24,7 @@ func NewCollection() *Collection { // ByName returns a component by its name func (c *Collection) ByName(name string) *Component { if c.HasChainError() { - return nil + return New("").WithChainError(c.ChainError()) } component, ok := c.components[name] diff --git a/component/component.go b/component/component.go index 15a0d13..5fffd7f 100644 --- a/component/component.go +++ b/component/component.go @@ -138,10 +138,12 @@ func (c *Component) OutputByName(name string) *port.Port { // InputByName is shortcut method func (c *Component) InputByName(name string) *port.Port { + if c.HasChainError() { + return port.New("").WithChainError(c.ChainError()) + } inputPort := c.Inputs().ByName(name) if inputPort.HasChainError() { c.SetChainError(inputPort.ChainError()) - return nil } return inputPort } @@ -159,6 +161,22 @@ func (c *Component) hasActivationFunction() bool { // @TODO: hide this method from user // @TODO: can we remove named return ? func (c *Component) MaybeActivate() (activationResult *ActivationResult) { + //Bubble up chain errors from ports + for _, p := range c.Inputs().PortsOrNil() { + if p.HasChainError() { + c.Inputs().SetChainError(p.ChainError()) + c.SetChainError(c.Inputs().ChainError()) + break + } + } + for _, p := range c.Outputs().PortsOrNil() { + if p.HasChainError() { + c.Outputs().SetChainError(p.ChainError()) + c.SetChainError(c.Outputs().ChainError()) + break + } + } + if c.HasChainError() { activationResult = NewActivationResult(c.Name()).WithChainError(c.ChainError()) return diff --git a/fmesh.go b/fmesh.go index 0b265d5..43304ce 100644 --- a/fmesh.go +++ b/fmesh.go @@ -46,7 +46,7 @@ func New(name string) *FMesh { // Components getter func (fm *FMesh) Components() *component.Collection { if fm.HasChainError() { - return nil + return component.NewCollection().WithChainError(fm.ChainError()) } return fm.components } @@ -106,6 +106,9 @@ func (fm *FMesh) runCycle() (*cycle.Cycle, error) { } for _, c := range components { + if c.HasChainError() { + fm.SetChainError(c.ChainError()) + } wg.Add(1) go func(component *component.Component, cycle *cycle.Cycle) { @@ -135,6 +138,10 @@ func (fm *FMesh) drainComponents(cycle *cycle.Cycle) error { for _, c := range components { activationResult := cycle.ActivationResults().ByComponentName(c.Name()) + if activationResult.HasChainError() { + return activationResult.ChainError() + } + if !activationResult.Activated() { // Component did not activate, so it did not create new output signals, hence nothing to drain continue @@ -173,6 +180,15 @@ func (fm *FMesh) Run() (cycle.Collection, error) { cycleNumber := 0 for { cycleResult, err := fm.runCycle() + + //Bubble up chain errors from activation results + for _, ar := range cycleResult.ActivationResults() { + if ar.HasChainError() { + fm.SetChainError(ar.ChainError()) + break + } + } + if err != nil { return nil, err } @@ -180,6 +196,10 @@ func (fm *FMesh) Run() (cycle.Collection, error) { allCycles = allCycles.With(cycleResult) mustStop, err := fm.mustStop(cycleResult) + if err != nil { + return nil, err + } + if mustStop { return allCycles, err } diff --git a/integration_tests/error_handling/chainable_api_test.go b/integration_tests/error_handling/chainable_api_test.go new file mode 100644 index 0000000..6f61e27 --- /dev/null +++ b/integration_tests/error_handling/chainable_api_test.go @@ -0,0 +1,154 @@ +package error_handling + +import ( + "errors" + "github.com/hovsep/fmesh" + "github.com/hovsep/fmesh/component" + "github.com/hovsep/fmesh/port" + "github.com/hovsep/fmesh/signal" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_Signal(t *testing.T) { + tests := []struct { + name string + test func(t *testing.T) + }{ + { + name: "no errors", + test: func(t *testing.T) { + sig := signal.New(123) + _, err := sig.Payload() + assert.False(t, sig.HasChainError()) + assert.NoError(t, err) + + _ = sig.PayloadOrDefault(555) + assert.False(t, sig.HasChainError()) + + _ = sig.PayloadOrNil() + assert.False(t, sig.HasChainError()) + }, + }, + { + name: "error propagated from group to signal", + test: func(t *testing.T) { + emptyGroup := signal.NewGroup() + + sig := emptyGroup.First() + assert.True(t, sig.HasChainError()) + assert.Error(t, sig.ChainError()) + + _, err := sig.Payload() + assert.Error(t, err) + assert.EqualError(t, err, signal.ErrNoSignalsInGroup.Error()) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, tt.test) + } +} + +func Test_FMesh(t *testing.T) { + tests := []struct { + name string + test func(t *testing.T) + }{ + { + name: "no errors", + test: func(t *testing.T) { + fm := fmesh.New("test").WithComponents( + component.New("c1").WithInputs("num1", "num2"). + WithOutputs("sum").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + num1 := inputs.ByName("num1").FirstSignalPayloadOrDefault(0).(int) + num2 := inputs.ByName("num2").FirstSignalPayloadOrDefault(0).(int) + outputs.ByName("sum").PutSignals(signal.New(num1 + num2)) + return nil + }), + ) + + fm.Components().ByName("c1").InputByName("num1").PutSignals(signal.New(10)) + fm.Components().ByName("c1").InputByName("num2").PutSignals(signal.New(5)) + + _, err := fm.Run() + assert.False(t, fm.HasChainError()) + assert.NoError(t, err) + }, + }, + { + name: "error propagated from component", + test: func(t *testing.T) { + fm := fmesh.New("test").WithComponents( + component.New("c1"). + WithInputs("num1", "num2"). + WithOutputs("sum"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + num1 := inputs.ByName("num1").FirstSignalPayloadOrDefault(0).(int) + num2 := inputs.ByName("num2").FirstSignalPayloadOrDefault(0).(int) + outputs.ByName("sum").PutSignals(signal.New(num1 + num2)) + return nil + }). + WithChainError(errors.New("some error in component")), + ) + + fm.Components().ByName("c1").InputByName("num1").PutSignals(signal.New(10)) + fm.Components().ByName("c1").InputByName("num2").PutSignals(signal.New(5)) + + _, err := fm.Run() + assert.True(t, fm.HasChainError()) + assert.Error(t, err) + assert.EqualError(t, err, "some error in component") + }, + }, + { + name: "error propagated from port", + test: func(t *testing.T) { + fm := fmesh.New("test").WithComponents( + component.New("c1"). + WithInputs("num1", "num2"). + WithOutputs("sum"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + num1 := inputs.ByName("num1").FirstSignalPayloadOrDefault(0).(int) + num2 := inputs.ByName("num2").FirstSignalPayloadOrDefault(0).(int) + outputs.ByName("sum").PutSignals(signal.New(num1 + num2)) + return nil + }), + ) + + fm.Components().ByName("c1").InputByName("num777").PutSignals(signal.New(10)) + fm.Components().ByName("c1").InputByName("num2").PutSignals(signal.New(5)) + + _, err := fm.Run() + assert.True(t, fm.HasChainError()) + assert.Error(t, err) + assert.EqualError(t, err, "port not found") + }, + }, + { + name: "error propagated from signal", + test: func(t *testing.T) { + fm := fmesh.New("test").WithComponents( + component.New("c1").WithInputs("num1", "num2"). + WithOutputs("sum").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + num1 := inputs.ByName("num1").FirstSignalPayloadOrDefault(0).(int) + num2 := inputs.ByName("num2").FirstSignalPayloadOrDefault(0).(int) + outputs.ByName("sum").PutSignals(signal.New(num1 + num2)) + return nil + }), + ) + + fm.Components().ByName("c1").InputByName("num1").PutSignals(signal.New(10).WithChainError(errors.New("some error in input signal"))) + fm.Components().ByName("c1").InputByName("num2").PutSignals(signal.New(5)) + + _, err := fm.Run() + assert.True(t, fm.HasChainError()) + assert.Error(t, err) + assert.EqualError(t, err, "some error in input signal") + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, tt.test) + } +} diff --git a/port/collection.go b/port/collection.go index 2bc6d7b..2b3d444 100644 --- a/port/collection.go +++ b/port/collection.go @@ -1,7 +1,6 @@ package port import ( - "errors" "github.com/hovsep/fmesh/common" "github.com/hovsep/fmesh/signal" ) @@ -31,8 +30,7 @@ func (collection *Collection) ByName(name string) *Port { } port, ok := collection.ports[name] if !ok { - collection.SetChainError(errors.New("port not found")) - return nil + return New("").WithChainError(ErrPortNotFoundInCollection) } return port } diff --git a/port/errors.go b/port/errors.go new file mode 100644 index 0000000..f69c5d5 --- /dev/null +++ b/port/errors.go @@ -0,0 +1,7 @@ +package port + +import "errors" + +var ( + ErrPortNotFoundInCollection = errors.New("port not found") +) diff --git a/port/port.go b/port/port.go index 8c336a1..2d2223b 100644 --- a/port/port.go +++ b/port/port.go @@ -45,6 +45,9 @@ func (p *Port) Pipes() *Group { // setSignals sets buffer field func (p *Port) setSignals(signals *signal.Group) { + if signals.HasChainError() { + p.SetChainError(signals.ChainError()) + } p.buffer = signals } diff --git a/signal/errors.go b/signal/errors.go new file mode 100644 index 0000000..7091e37 --- /dev/null +++ b/signal/errors.go @@ -0,0 +1,8 @@ +package signal + +import "errors" + +var ( + ErrNoSignalsInGroup = errors.New("group has no signals") + ErrInvalidSignal = errors.New("signal is invalid") +) diff --git a/signal/group.go b/signal/group.go index 7f94e96..7316734 100644 --- a/signal/group.go +++ b/signal/group.go @@ -1,7 +1,6 @@ package signal import ( - "errors" "github.com/hovsep/fmesh/common" ) @@ -33,7 +32,7 @@ func (g *Group) First() *Signal { } if len(g.signals) == 0 { - return New(nil).WithChainError(errors.New("group has no signals")) + return New(nil).WithChainError(ErrNoSignalsInGroup) } return g.signals[0] @@ -76,7 +75,7 @@ func (g *Group) With(signals ...*Signal) *Group { copy(newSignals, g.signals) for i, sig := range signals { if sig == nil { - return g.WithChainError(errors.New("signal is nil")) + return g.WithChainError(ErrInvalidSignal) } if sig.HasChainError() {