Skip to content

Commit

Permalink
Label port with direction and validate pipes using those labels
Browse files Browse the repository at this point in the history
  • Loading branch information
hovsep committed Nov 2, 2024
1 parent f383a20 commit abb0e00
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 113 deletions.
15 changes: 13 additions & 2 deletions common/labeled_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ type LabeledEntity struct {
labels LabelsCollection
}

var errLabelNotFound = errors.New("label not found")
var (
ErrLabelNotFound = errors.New("label not found")
)

// NewLabeledEntity constructor
func NewLabeledEntity(labels LabelsCollection) LabeledEntity {
Expand All @@ -28,12 +30,21 @@ func (e *LabeledEntity) Label(label string) (string, error) {
value, ok := e.labels[label]

if !ok {
return "", fmt.Errorf("%w , label: %s", errLabelNotFound, label)
return "", fmt.Errorf("label %s not found, %w", label, ErrLabelNotFound)
}

return value, nil
}

// LabelOrDefault returns label value or default value in case of any error
func (e *LabeledEntity) LabelOrDefault(label string, defaultValue string) string {
value, err := e.Label(label)
if err != nil {
return defaultValue
}
return value
}

// SetLabels overwrites labels collection
func (e *LabeledEntity) SetLabels(labels LabelsCollection) {
e.labels = labels
Expand Down
21 changes: 15 additions & 6 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ func New(name string) *Component {
DescribedEntity: common.NewDescribedEntity(""),
LabeledEntity: common.NewLabeledEntity(nil),
Chainable: common.NewChainable(),
inputs: port.NewCollection(),
outputs: port.NewCollection(),
inputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionIn,
}),
outputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionOut,
}),
}
}

Expand Down Expand Up @@ -144,9 +148,13 @@ func (c *Component) Outputs() *port.Collection {

// OutputByName is shortcut method
func (c *Component) OutputByName(name string) *port.Port {
if c.HasChainError() {
return port.New("").WithChainError(c.ChainError())
}
outputPort := c.Outputs().ByName(name)
if outputPort.HasChainError() {
return port.New("").WithChainError(outputPort.ChainError())
c.SetChainError(outputPort.ChainError())
return port.New("").WithChainError(c.ChainError())
}
return outputPort
}
Expand All @@ -158,7 +166,8 @@ func (c *Component) InputByName(name string) *port.Port {
}
inputPort := c.Inputs().ByName(name)
if inputPort.HasChainError() {
return port.New("").WithChainError(inputPort.ChainError())
c.SetChainError(inputPort.ChainError())
return port.New("").WithChainError(c.ChainError())
}
return inputPort
}
Expand Down Expand Up @@ -222,7 +231,7 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
return
}

if !c.inputs.AnyHasSignals() {
if !c.Inputs().AnyHasSignals() {
//No inputs set, stop here
activationResult = c.newActivationResultNoInput()
return
Expand Down Expand Up @@ -251,7 +260,7 @@ func (c *Component) FlushOutputs() *Component {
return c
}

ports, err := c.outputs.Ports()
ports, err := c.Outputs().Ports()
if err != nil {
return c.WithChainError(err)
}
Expand Down
78 changes: 56 additions & 22 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func TestComponent_FlushOutputs(t *testing.T) {
{
name: "happy path",
getComponent: func() *Component {
sink := port.New("sink")
sink := port.New("sink").WithLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionIn,
})
c := New("c1").WithOutputs("o1", "o2")
c.Outputs().ByNames("o1").PutSignals(signal.New(777))
c.Outputs().ByNames("o2").PutSignals(signal.New(888))
Expand Down Expand Up @@ -118,12 +120,16 @@ func TestComponent_Inputs(t *testing.T) {
{
name: "no inputs",
component: New("c1"),
want: port.NewCollection(),
want: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionIn,
}),
},
{
name: "with inputs",
component: New("c1").WithInputs("i1", "i2"),
want: port.NewCollection().With(port.New("i1"), port.New("i2")),
want: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionIn,
}).With(port.New("i1"), port.New("i2")),
},
}
for _, tt := range tests {
Expand All @@ -142,12 +148,16 @@ func TestComponent_Outputs(t *testing.T) {
{
name: "no outputs",
component: New("c1"),
want: port.NewCollection(),
want: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionOut,
}),
},
{
name: "with outputs",
component: New("c1").WithOutputs("o1", "o2"),
want: port.NewCollection().With(port.New("o1"), port.New("o2")),
want: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionOut,
}).With(port.New("o1"), port.New("o2")),
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -219,9 +229,13 @@ func TestComponent_WithDescription(t *testing.T) {
DescribedEntity: common.NewDescribedEntity("descr"),
LabeledEntity: common.NewLabeledEntity(nil),
Chainable: common.NewChainable(),
inputs: port.NewCollection(),
outputs: port.NewCollection(),
f: nil,
inputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionIn,
}),
outputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionOut,
}),
f: nil,
},
},
}
Expand Down Expand Up @@ -253,9 +267,13 @@ func TestComponent_WithInputs(t *testing.T) {
DescribedEntity: common.NewDescribedEntity(""),
LabeledEntity: common.NewLabeledEntity(nil),
Chainable: common.NewChainable(),
inputs: port.NewCollection().With(port.New("p1"), port.New("p2")),
outputs: port.NewCollection(),
f: nil,
inputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionIn,
}).With(port.New("p1"), port.New("p2")),
outputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionOut,
}),
f: nil,
},
},
{
Expand All @@ -269,9 +287,13 @@ func TestComponent_WithInputs(t *testing.T) {
DescribedEntity: common.NewDescribedEntity(""),
LabeledEntity: common.NewLabeledEntity(nil),
Chainable: common.NewChainable(),
inputs: port.NewCollection(),
outputs: port.NewCollection(),
f: nil,
inputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionIn,
}),
outputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionOut,
}),
f: nil,
},
},
}
Expand Down Expand Up @@ -303,9 +325,13 @@ func TestComponent_WithOutputs(t *testing.T) {
DescribedEntity: common.NewDescribedEntity(""),
LabeledEntity: common.NewLabeledEntity(nil),
Chainable: common.NewChainable(),
inputs: port.NewCollection(),
outputs: port.NewCollection().With(port.New("p1"), port.New("p2")),
f: nil,
inputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionIn,
}),
outputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionOut,
}).With(port.New("p1"), port.New("p2")),
f: nil,
},
},
{
Expand All @@ -319,9 +345,13 @@ func TestComponent_WithOutputs(t *testing.T) {
DescribedEntity: common.NewDescribedEntity(""),
LabeledEntity: common.NewLabeledEntity(nil),
Chainable: common.NewChainable(),
inputs: port.NewCollection(),
outputs: port.NewCollection(),
f: nil,
inputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionIn,
}),
outputs: port.NewCollection().WithDefaultLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionOut,
}),
f: nil,
},
},
}
Expand Down Expand Up @@ -668,12 +698,16 @@ func TestComponent_WithLabels(t *testing.T) {
func TestComponent_ShortcutMethods(t *testing.T) {
t.Run("InputByName", func(t *testing.T) {
c := New("c").WithInputs("a", "b", "c")
assert.Equal(t, port.New("b"), c.InputByName("b"))
assert.Equal(t, port.New("b").WithLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionIn,
}), c.InputByName("b"))
})

t.Run("OutputByName", func(t *testing.T) {
c := New("c").WithOutputs("a", "b", "c")
assert.Equal(t, port.New("b"), c.OutputByName("b"))
assert.Equal(t, port.New("b").WithLabels(common.LabelsCollection{
port.DirectionLabel: port.DirectionOut,
}), c.OutputByName("b"))
})
}

Expand Down
22 changes: 10 additions & 12 deletions export/dot/dot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ type dotExporter struct {
}

const (
nodeIDLabel = "export/dot/id"
portKindInput = "input"
portKindOutput = "output"
nodeIDLabel = "export/dot/id"
)

// NewDotExporter returns exporter with default configuration
Expand Down Expand Up @@ -148,7 +146,7 @@ func (d *dotExporter) addPipes(graph *dot.Graph, components fmeshcomponent.Compo
destPort.DeleteLabel(nodeIDLabel)

// Any source port in any pipe is always output port, so we can build its node ID
srcPortNode := graph.FindNodeByID(getPortID(c.Name(), portKindOutput, srcPort.Name()))
srcPortNode := graph.FindNodeByID(getPortID(c.Name(), port.DirectionOut, srcPort.Name()))
destPortNode := graph.FindNodeByID(destPortID)

graph.Edge(srcPortNode, destPortNode, func(a *dot.AttributesMap) {
Expand Down Expand Up @@ -177,7 +175,7 @@ func (d *dotExporter) addComponents(graph *dot.Graph, components fmeshcomponent.
return err
}
for _, p := range inputPorts {
portNode := d.getPortNode(c, p, portKindInput, componentSubgraph)
portNode := d.getPortNode(c, p, componentSubgraph)
componentSubgraph.Edge(portNode, componentNode)
}

Expand All @@ -187,23 +185,23 @@ func (d *dotExporter) addComponents(graph *dot.Graph, components fmeshcomponent.
return err
}
for _, p := range outputPorts {
portNode := d.getPortNode(c, p, portKindOutput, componentSubgraph)
portNode := d.getPortNode(c, p, componentSubgraph)
componentSubgraph.Edge(componentNode, portNode)
}
}
return nil
}

// getPortNode creates and returns a node representing one port
func (d *dotExporter) getPortNode(c *fmeshcomponent.Component, port *port.Port, portKind string, componentSubgraph *dot.Graph) *dot.Node {
portID := getPortID(c.Name(), portKind, port.Name())
func (d *dotExporter) getPortNode(c *fmeshcomponent.Component, p *port.Port, componentSubgraph *dot.Graph) *dot.Node {
portID := getPortID(c.Name(), p.LabelOrDefault(port.DirectionLabel, ""), p.Name())

//Mark ports to be able to find their respective nodes later when adding pipes
port.AddLabel(nodeIDLabel, portID)
p.AddLabel(nodeIDLabel, portID)

portNode := componentSubgraph.NodeWithID(portID, func(a *dot.AttributesMap) {
setAttrMap(a, d.config.Port.Node)
a.Attr("label", port.Name()).Attr("group", c.Name())
a.Attr("label", p.Name()).Attr("group", c.Name())
})

return portNode
Expand Down Expand Up @@ -329,8 +327,8 @@ func getCycleStats(activationCycle *cycle.Cycle) []*statEntry {
}

// getPortID returns unique ID used to locate ports while building pipe edges
func getPortID(componentName string, portKind string, portName string) string {
return fmt.Sprintf("component/%s/%s/%s", componentName, portKind, portName)
func getPortID(componentName string, portDirection string, portName string) string {
return fmt.Sprintf("component/%s/%s/%s", componentName, portDirection, portName)
}

// setAttrMap sets all attributes to target
Expand Down
1 change: 0 additions & 1 deletion integration_tests/computation/math_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func Test_Math(t *testing.T) {
})

c1.OutputByName("res").PipeTo(c2.InputByName("num"))

return fmesh.New("fm").WithComponents(c1, c2).WithConfig(fmesh.Config{
ErrorHandlingStrategy: fmesh.StopOnFirstErrorOrPanic,
CyclesLimit: 10,
Expand Down
20 changes: 14 additions & 6 deletions port/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ type PortMap map[string]*Port
type Collection struct {
*common.Chainable
ports PortMap
// Labels added by default to each port in collection
defaultLabels common.LabelsCollection
}

// NewCollection creates empty collection
func NewCollection() *Collection {
return &Collection{
Chainable: common.NewChainable(),
ports: make(PortMap),
Chainable: common.NewChainable(),
ports: make(PortMap),
defaultLabels: common.LabelsCollection{},
}
}

Expand All @@ -32,7 +35,7 @@ func (collection *Collection) ByName(name string) *Port {
port, ok := collection.ports[name]
if !ok {
collection.SetChainError(ErrPortNotFoundInCollection)
return New("").WithChainError(ErrPortNotFoundInCollection)
return New("").WithChainError(collection.ChainError())
}
return port
}
Expand All @@ -43,7 +46,7 @@ func (collection *Collection) ByNames(names ...string) *Collection {
return NewCollection().WithChainError(collection.ChainError())
}

selectedPorts := NewCollection()
selectedPorts := NewCollection().WithDefaultLabels(collection.defaultLabels)

for _, name := range names {
if p, ok := collection.ports[name]; ok {
Expand Down Expand Up @@ -131,7 +134,7 @@ func (collection *Collection) Flush() *Collection {
// PipeTo creates pipes from each port in collection to given destination ports
func (collection *Collection) PipeTo(destPorts ...*Port) *Collection {
for _, p := range collection.ports {
p.PipeTo(destPorts...)
p = p.PipeTo(destPorts...)

if p.HasChainError() {
return collection.WithChainError(p.ChainError())
Expand All @@ -151,7 +154,7 @@ func (collection *Collection) With(ports ...*Port) *Collection {
if port.HasChainError() {
return collection.WithChainError(port.ChainError())
}

port.AddLabels(collection.defaultLabels)
collection.ports[port.Name()] = port
}

Expand Down Expand Up @@ -225,3 +228,8 @@ func (collection *Collection) WithChainError(err error) *Collection {
func (collection *Collection) Len() int {
return len(collection.ports)
}

func (collection *Collection) WithDefaultLabels(labels common.LabelsCollection) *Collection {
collection.defaultLabels = labels
return collection
}
Loading

0 comments on commit abb0e00

Please sign in to comment.