Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V.0.1.0 #48

Merged
merged 2 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions component/activation_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package component
import (
"errors"
"fmt"
"github.com/hovsep/fmesh/port"
)

// ActivationResult defines the result (possibly an error) of the activation of given component in given cycle
type ActivationResult struct {
componentName string
activated bool
inputsMetadata port.MetadataMap //Contains the info about length of input ports during the activation (required for correct i2i piping)
code ActivationResultCode
err error
componentName string
activated bool
stateBefore *StateSnapshot //Contains the info about length of input ports during the activation (required for correct i2i piping)
stateAfter *StateSnapshot
code ActivationResultCode
err error
}

// ActivationResultCode denotes a specific info about how a component been activated or why not activated at all
Expand Down Expand Up @@ -94,61 +94,67 @@ func (ar *ActivationResult) WithError(err error) *ActivationResult {
return ar
}

func (ar *ActivationResult) WithInputsMetadata(meta port.MetadataMap) *ActivationResult {
ar.inputsMetadata = meta
func (ar *ActivationResult) WithStateBefore(snapshot *StateSnapshot) *ActivationResult {
ar.stateBefore = snapshot
return ar
}

func (ar *ActivationResult) InputsMetadata() port.MetadataMap {
return ar.inputsMetadata
func (ar *ActivationResult) StateBefore() *StateSnapshot {
return ar.stateBefore
}

func (ar *ActivationResult) WithStateAfter(snapshot *StateSnapshot) *ActivationResult {
ar.stateAfter = snapshot
return ar
}

func (ar *ActivationResult) StateAfter() *StateSnapshot {
return ar.stateAfter
}

// newActivationResultOK builds a specific activation result
func (c *Component) newActivationResultOK() *ActivationResult {
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(ActivationCodeOK).
WithInputsMetadata(c.Inputs().GetPortsMetadata())
WithActivationCode(ActivationCodeOK)

}

// newActivationCodeNoInput builds a specific activation result
func (c *Component) newActivationCodeNoInput() *ActivationResult {
// newActivationResultNoInput builds a specific activation result
func (c *Component) newActivationResultNoInput() *ActivationResult {
return NewActivationResult(c.Name()).
SetActivated(false).
WithActivationCode(ActivationCodeNoInput)
}

// newActivationCodeNoFunction builds a specific activation result
func (c *Component) newActivationCodeNoFunction() *ActivationResult {
// newActivationResultNoFunction builds a specific activation result
func (c *Component) newActivationResultNoFunction() *ActivationResult {
return NewActivationResult(c.Name()).
SetActivated(false).
WithActivationCode(ActivationCodeNoFunction)
}

// newActivationCodeWaitingForInput builds a specific activation result
func (c *Component) newActivationCodeWaitingForInput() *ActivationResult {
// newActivationResultWaitingForInput builds a specific activation result
func (c *Component) newActivationResultWaitingForInput() *ActivationResult {
return NewActivationResult(c.Name()).
SetActivated(false).
WithActivationCode(ActivationCodeWaitingForInput)
}

// newActivationCodeReturnedError builds a specific activation result
func (c *Component) newActivationCodeReturnedError(err error) *ActivationResult {
// newActivationResultReturnedError builds a specific activation result
func (c *Component) newActivationResultReturnedError(err error) *ActivationResult {
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(ActivationCodeReturnedError).
WithError(fmt.Errorf("component returned an error: %w", err)).
WithInputsMetadata(c.Inputs().GetPortsMetadata())
WithError(fmt.Errorf("component returned an error: %w", err))
}

// newActivationCodePanicked builds a specific activation result
func (c *Component) newActivationCodePanicked(err error) *ActivationResult {
// newActivationResultPanicked builds a specific activation result
func (c *Component) newActivationResultPanicked(err error) *ActivationResult {
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(ActivationCodePanicked).
WithError(err).
WithInputsMetadata(c.Inputs().GetPortsMetadata())
WithError(err)
}

// isWaitingForInput tells whether component is waiting for specific inputs
Expand Down
6 changes: 3 additions & 3 deletions component/activation_result_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestActivationResultCollection_Add(t *testing.T) {
args: args{
activationResults: []*ActivationResult{
NewComponent("c1").newActivationResultOK(),
NewComponent("c2").newActivationCodeReturnedError(errors.New("oops")),
NewComponent("c2").newActivationResultReturnedError(errors.New("oops")),
},
},
assertions: func(t *testing.T, collection ActivationResultCollection) {
Expand All @@ -53,8 +53,8 @@ func TestActivationResultCollection_Add(t *testing.T) {
),
args: args{
activationResults: []*ActivationResult{
NewComponent("c4").newActivationCodeNoInput(),
NewComponent("c5").newActivationCodePanicked(errors.New("panic")),
NewComponent("c4").newActivationResultNoInput(),
NewComponent("c5").newActivationResultPanicked(errors.New("panic")),
},
},
assertions: func(t *testing.T, collection ActivationResultCollection) {
Expand Down
48 changes: 37 additions & 11 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import (
"github.com/hovsep/fmesh/port"
)

// @TODO add getter\setter and constructor
type StateSnapshot struct {
InputPorts port.MetadataMap
OutputPorts port.MetadataMap
}

type ActivationFunc func(inputs port.Collection, outputs port.Collection) error

// Component defines a main building block of FMesh
Expand Down Expand Up @@ -87,57 +93,77 @@ func (c *Component) hasActivationFunction() bool {
return c.f != nil
}

func (c *Component) getStateSnapshot() *StateSnapshot {
return &StateSnapshot{
InputPorts: c.Inputs().GetPortsMetadata(),
OutputPorts: c.Outputs().GetPortsMetadata(),
}
}

// MaybeActivate tries to run the activation function if all required conditions are met
// @TODO: hide this method from user
func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
stateBeforeActivation := c.getStateSnapshot()

defer func() {
if r := recover(); r != nil {
activationResult = c.newActivationCodePanicked(fmt.Errorf("panicked with: %v", r))
activationResult = c.newActivationResultPanicked(fmt.Errorf("panicked with: %v", r)).
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())
}
}()

if !c.hasActivationFunction() {
//Activation function is not set (maybe useful while the mesh is under development)
activationResult = c.newActivationCodeNoFunction()
activationResult = c.newActivationResultNoFunction().
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())

return
}

if !c.inputs.AnyHasSignals() {
//No inputs set, stop here
activationResult = c.newActivationCodeNoInput()

activationResult = c.newActivationResultNoInput().
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())
return
}

//Invoke the activation func
err := c.f(c.Inputs(), c.Outputs())

if errors.Is(err, errWaitingForInputs) {
activationResult = c.newActivationCodeWaitingForInput()
activationResult = c.newActivationResultWaitingForInput().
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())

return
}

if err != nil {
activationResult = c.newActivationCodeReturnedError(err)
activationResult = c.newActivationResultReturnedError(err).
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())

return
}

activationResult = c.newActivationResultOK()
activationResult = c.newActivationResultOK().
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())

return
}

// FlushInputs ...
// @TODO: hide this method from user
func (c *Component) FlushInputs() {
c.inputs.Flush(false)
c.Inputs().Flush(false)
}

// FlushAndClearOutputs flushes output ports and clears flushed ones
// FlushOutputs ...
// @TODO: hide this method from user
func (c *Component) FlushAndClearOutputs() {
c.outputs.Flush(true)
func (c *Component) FlushOutputs(activationResult *ActivationResult) {
c.Outputs().FlushProcessedSignals(activationResult.StateAfter().OutputPorts)
}
68 changes: 53 additions & 15 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,53 +99,91 @@ func TestComponent_Description(t *testing.T) {
}
}

func TestComponent_FlushAndClearOutputs(t *testing.T) {
func TestComponent_FlushOutputs(t *testing.T) {
sink := port.New("sink")

componentWithNoOutputs := NewComponent("c1")
componentWithCleanOutputs := NewComponent("c1").WithOutputs("o1", "o2")

componentWithAllOutputsSet := NewComponent("c1").WithOutputs("o1", "o2")
componentWithAllOutputsSet.Outputs().ByName("o1").PutSignals(signal.New(777))
componentWithAllOutputsSet.Outputs().ByName("o1").PutSignals(signal.New(888))
componentWithAllOutputsSet.Outputs().ByName("o1").PipeTo(sink)
componentWithAllOutputsSet.Outputs().ByName("o2").PipeTo(sink)
componentWithAllOutputsSet.Outputs().ByNames("o1").PutSignals(signal.New(777))
componentWithAllOutputsSet.Outputs().ByNames("o2").PutSignals(signal.New(888))
componentWithAllOutputsSet.Outputs().ByNames("o1", "o2").PipeTo(sink)

tests := []struct {
name string
component *Component
destPort *port.Port //Where the component flushes ALL it's inputs
assertions func(t *testing.T, componentAfterFlush *Component, destPort *port.Port)
name string
component *Component
activationResult *ActivationResult
destPort *port.Port //Where the component flushes ALL it's inputs
assertions func(t *testing.T, componentAfterFlush *Component, destPort *port.Port)
}{
{
name: "no outputs",
component: NewComponent("c1"),
destPort: nil,
component: componentWithNoOutputs,
activationResult: componentWithNoOutputs.newActivationResultOK().
WithStateBefore(&StateSnapshot{
InputPorts: port.MetadataMap{},
OutputPorts: port.MetadataMap{},
}).
WithStateAfter(&StateSnapshot{
InputPorts: port.MetadataMap{},
OutputPorts: port.MetadataMap{},
}),
destPort: nil,
assertions: func(t *testing.T, componentAfterFlush *Component, destPort *port.Port) {
assert.NotNil(t, componentAfterFlush.Outputs())
assert.Empty(t, componentAfterFlush.Outputs())
},
},
{
name: "output has no signal set",
component: NewComponent("c1").WithOutputs("o1", "o2"),
destPort: nil,
component: componentWithCleanOutputs,
activationResult: componentWithCleanOutputs.newActivationResultOK().
WithStateBefore(&StateSnapshot{
InputPorts: port.MetadataMap{},
OutputPorts: port.MetadataMap{},
}).
WithStateAfter(&StateSnapshot{
InputPorts: port.MetadataMap{},
OutputPorts: port.MetadataMap{},
}),
destPort: nil,
assertions: func(t *testing.T, componentAfterFlush *Component, destPort *port.Port) {
assert.False(t, componentAfterFlush.Outputs().AnyHasSignals())
},
},
{
name: "happy path",
component: componentWithAllOutputsSet,
destPort: sink,
activationResult: componentWithAllOutputsSet.newActivationResultOK().
WithStateBefore(&StateSnapshot{
InputPorts: port.MetadataMap{},
OutputPorts: port.MetadataMap{},
}).
WithStateAfter(&StateSnapshot{
InputPorts: port.MetadataMap{},
OutputPorts: port.MetadataMap{
"o1": &port.Metadata{
SignalBufferLen: 1,
},
"o2": &port.Metadata{
SignalBufferLen: 1,
},
},
}),
destPort: sink,
assertions: func(t *testing.T, componentAfterFlush *Component, destPort *port.Port) {
assert.Contains(t, destPort.Signals().AllPayloads(), 777)
assert.Contains(t, destPort.Signals().AllPayloads(), 888)
assert.Len(t, destPort.Signals().AllPayloads(), 2)
// Signals are disposed when port is flushed
assert.False(t, componentAfterFlush.Outputs().AnyHasSignals())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.component.FlushAndClearOutputs()
tt.component.FlushOutputs(tt.activationResult)
tt.assertions(t, tt.component, tt.destPort)
})
}
Expand Down
19 changes: 14 additions & 5 deletions fmesh.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package fmesh

import (
"errors"
"github.com/hovsep/fmesh/component"
"github.com/hovsep/fmesh/cycle"
"sync"
)

// @TODO: move this to fm.Config
const maxCyclesAllowed = 100 //Dev mode

// FMesh is the functional mesh
type FMesh struct {
name string
Expand Down Expand Up @@ -88,8 +92,9 @@
continue
}

c.FlushAndClearOutputs() // Just flush and clear
c.FlushInputs() // Inputs are a bit trickier
c.FlushOutputs(activationResult)

c.FlushInputs() // Inputs are a bit trickier

//Check if a component wait for inputs and wants to keep existing input
keepInputs := c.WantsToKeepInputs(activationResult)
Expand All @@ -98,7 +103,7 @@
// Inputs can not be just cleared, instead we remove signals which
// have been used (been set on inputs) during the last activation cycle
// thus not affecting ones the component could have been received from i2i pipes
c.Inputs().DisposeProcessedSignals(activationResult.InputsMetadata())
c.Inputs().DisposeProcessedSignals(activationResult.StateBefore().InputPorts)
}

}
Expand All @@ -111,7 +116,7 @@
cycleResult := fm.runCycle()
allCycles = allCycles.Add(cycleResult)

mustStop, err := fm.mustStop(cycleResult)
mustStop, err := fm.mustStop(cycleResult, len(allCycles))
if mustStop {
return allCycles, err
}
Expand All @@ -120,7 +125,11 @@
}
}

func (fm *FMesh) mustStop(cycleResult *cycle.Cycle) (bool, error) {
func (fm *FMesh) mustStop(cycleResult *cycle.Cycle, cycleNum int) (bool, error) {
if cycleNum >= maxCyclesAllowed {
return true, errors.New("reached max allowed cycles")

Check warning on line 130 in fmesh.go

View check run for this annotation

Codecov / codecov/patch

fmesh.go#L130

Added line #L130 was not covered by tests
}

//Check if we are done (no components activated during the cycle => all inputs are processed)
if !cycleResult.HasActivatedComponents() {
return true, nil
Expand Down
Loading
Loading