Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: niolabs/nio
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 3.3.0
Choose a base ref
...
head repository: niolabs/nio
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Loading
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Framework changelog

## [3.4.2](https://github.com/niolabs/nio/tree/3.4.2) (2019-02-12)
[Full Changelog](https://github.com/niolabs/nio/compare/3.4.1...3.4.2)

## [3.4.1](https://github.com/niolabs/nio/tree/3.4.1) (2019-01-23)
[Full Changelog](https://github.com/niolabs/nio/compare/3.4.0...3.4.1)

## [3.4.0](https://github.com/niolabs/nio/tree/3.4.0) (2019-01-16)
[Full Changelog](https://github.com/niolabs/nio/compare/3.3.0...3.4.0)

## [3.3.0](https://github.com/niolabs/nio/tree/3.3.0) (2018-11-14)
[Full Changelog](https://github.com/niolabs/nio/compare/3.3.0rc1...3.3.0)

## [3.3.0rc1](https://github.com/niolabs/nio/tree/3.3.0rc1) (2018-11-08)
[Full Changelog](https://github.com/niolabs/nio/compare/3.2.1...3.3.0rc1)

2 changes: 1 addition & 1 deletion nio/__init__.py
Original file line number Diff line number Diff line change
@@ -5,4 +5,4 @@
from nio.modules.module import Module
from nio.util.discovery import discoverable

__version__ = '3.3.0'
__version__ = '3.4.2'
50 changes: 48 additions & 2 deletions nio/block/base.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
a custom block, extend this Block class and override the appropriate methods.
"""
from collections import defaultdict
from inspect import getargspec

from nio.block.context import BlockContext
from nio.block.terminals import Terminal, TerminalType, input, output, \
@@ -61,6 +62,8 @@ def __init__(self, status_change_callback=None):
self.__class__, TerminalType.input)
self._default_output = Terminal.get_default_terminal_on_class(
self.__class__, TerminalType.output)
self._process_signal_includes_input_id = \
len(getargspec(self.process_signal).args) == 3
self._messages = defaultdict(str)

def configure(self, context):
@@ -259,14 +262,57 @@ def process_signals(self, signals, input_id=DEFAULT_TERMINAL):
This method will be called by the block router whenever signals
are sent to the block. The method should not return the modified
signals, but rather call `notify_signals` so that the router
can route them properly.
can route them properly. To return signals see the helper method
process_signal which acts on individual incoming signals.
Args:
signals (list): A list of signals to be processed by the block
input_id: The identifier of the input terminal the signals are
being delivered to
"""
pass # pragma: no cover
out_sigs = []
for signal in signals:
# Determine whether the process_signal method has an argument
# for input_id too, or if it's just signal
if self._process_signal_includes_input_id:
res = self.process_signal(signal, input_id)
else:
res = self.process_signal(signal)
# Ignore None values or falsey ones
if not res:
continue
if isinstance(res, Signal):
res = [res]
if not isinstance(res, list):
raise TypeError(
"process_signal must return a Signal or list of Signals")
for out_sig in res:
if isinstance(out_sig, Signal):
out_sigs.append(out_sig)
if out_sigs:
self.notify_signals(out_sigs)

def process_signal(self, signal, input_id=DEFAULT_TERMINAL):
"""An optional helper method to process one signal at a time.
If your block only operates on one signal at a time it is often easier
to just implement the process_signal method, rather than the plural
process_signals. Implementing this method and not process_signals will
cause your block to keep incoming signal lists as outgoing signal lists
and allows you to return signals rather than notify them.
You can still call notify_signals from this method but any signals you
return will be grouped into a list with the other returns from the
incoming list and notified as an outgoing list.
Args:
signal (Signal): An individual signal that this block is processing
input_id: The id of the input the signal came in on
Returns:
out (Signal): A signal or list of signals to notify from this block
"""
pass

@classmethod
def get_description(cls):
63 changes: 63 additions & 0 deletions nio/block/mixins/collector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Collector Mixin

Collect or buffer output signals in your block.

If you have a block that can potentially emit high-volume signal streams, it may be desirable to collect or buffer the output streams into lists rather than bombarding subsequent blocks. Including this mixin allows a block developer to add a configurable collection window to their block.

The mixin adds a `collect` property to the block that determines how long to collect signals for before notifying.

## Quick Usage

Include the `Collector` mixin in your block and notify signals like normal. The mixin will take care of performing the actual signal emission at the right time.

```python
from nio import Block, Signal
from nio.block.mixins import Collector

class MyBlock(Collector, Block):

def start(self):
super().start()
self._job = Job(self._notify, timedelta(seconds=0.1, repeatable=True)

def _notify(self):
self.notify_signals([Signal()])

def stop(self):
self._job.cancel()
super().stop()
```

## Parameters

* Collect Timeout - How long to group signals. If set to 0 then no collection will occur, effectively disabling the mixin

## Timing

Imagine a block that is designed to notify 1 signal every second. Without this mixin, the notify timing would look like this, where `*` indicates a signal:
```
[*] [*] [*] [*] [*] [*]
|-------|-------|-------|-------|-------|-------|---
t=0 1 2 3 4 5 6
```

If we wanted to "slow the block down" without losing signals, we could add the `Collector` mixin and configure a collect timeout of 2 seconds. This would collect signals every 2 seconds and then notify. The block's timing would now start to look like this:

```
[*,*] [*,*] [*,*]
|-------|-------|-------|-------|-------|-------|---
t=0 1 2 3 4 5 6
```

Notice that no signals got lost, they were just grouped together at a slower interval.

It is also important to note that the collection timeout sets the notification timing of the block. This means that even though we are calling `notify_signals` every second in the block, the mixin is changing the notification timing to happen every time a collect window ends.

Imagine the same block that notified every second, but now with a collect timeout of 1.5 seconds. The block's timing now looks like this:
```
[1] [2,3] [4] [5,6]
|-------|-------|-------|-------|-------|-------|---
t=0 1 2 3 4 5 6
```

The signals have been numbered now to show the time that the notify was called from the block. Notice that the signal notified at the 1 second mark wasn't actually emitted from the block until the 1.5 second mark, when the collect window expired.
94 changes: 94 additions & 0 deletions nio/block/mixins/enrich/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# EnrichSignals Mixin

Enrich incoming signals with new attributes.

When a block creates some new data based on an incoming signal there are a few different ways to emit that data in an outgoing signal. The data can be its own signal, it can be merged with the incoming signal, or it can be added to the incoming signal as an attribute. The EnrichSignals mixin takes away this complexity for the block developer and instead lets the service designer make the decision on how the data should be produced.

## Quick Usage

Include the `EnrichSignals` mixin in your block inheritance and then notify the results of `get_output_signal()`.

```python
from nio import Block
from nio.block.mixins import EnrichSignals

class MyBlock(EnrichSignals, Block):

def process_signal(self, signal):
outgoing_data = {"key": "value"}
return self.get_output_signal(outgoing_data, signal)
```

This example will allow the service designer to decide how to best represent the data from the block (`{"key": "value"}`) in the outgoing signals from the block.

## Properties

All properties will be nested in an advanced property on the block called "Signal Enrichment"

* Exclude existing? - boolean - whether to exclude the incoming data on outgoing signals
* Results field - string - The name of the signal attribute to put the new data on

## Modes of Use

### Signal Data Replacement

Discard all incoming signal data and only notify the new data from the block. This is the default behavior of the mixin.

* Exclude existing? - `true`
* Results field - any/ignored

### Signal Data Merge

Merge the new data into the incoming signal data.

* Exclude existing? - `false`
* Results field - empty string (`""`)

### New Signal Attribute

Place the new data onto the incoming signal with on a given attribute

* Exclude existing? - `false`
* Results field - the name of the attribute

### Examples

For all of these examples, assume the block code is notifying a signal returned by `self.get_output_signal({"new key": "new val"}, incoming_signal)`

| Block Config | Incoming Signal | Outgoing Signal |
| ------------------------------------------------------------ | ------------------------------------------------------------ | ------------------------------------------------------------ |
| exclude existing: `true`<br />results field: `""` | {"old_key": "old val"} | {<br />"new key", "new val"<br />} |
| exclude existing: `false`<br />results field: `""` | {"old_key": "old val"} | {<br />"old_key": "old val",<br />"new key", "new val"<br />} |
| exclude existing: `false`<br />results field: `"attr"` | {"old_key": "old val"} | {<br />"old_key": "old val",<br />"attr": {"new key", "new val"}<br />} |
| exclude existing: `false`<br />results field: `""`<br />_(merge overwrite)_ | {<br />"old_key": "old val",<br />"new_key": "other val"<br />} | {<br />"old_key": "old val",<br />"new key", "new val"<br />} |

## Reference

The mixin exposes two methods to use in your block

### get_output_signal

Get an output signal based on the block's configuration of how to merge in some data

```python
def get_output_signal(self, signal_data, incoming_signal copy=True)
```

* signal_data (dict) - This is a required dictionary of the new data to merge on the signal
* incoming_signal (Signal) - The original incoming signal
* copy (bool) - defaults to `True` - whether to make a copy of the incoming signal. This will normally be true unless you want to overwrite references to the actual incoming signal when performing the merge.
* _returns:_ Signal - An outgoing signal with the `signal_data` merged in per the block's configuration

### notify_output_signals

A helper method to notify a list of signals after properly enriching them

```python
def notify_output_signals(self, signals_data, incoming_signal, copy=True, output_id=None)
```

* signals_data (dict/list) - Data about the output signals. If this is a list, it will be assumed to be a list of dictionaries whose corresponding Signals will be notified in a batch. Otherwise, it should be a single dict containing the signal data to notify.
* incoming_signal (Signal) - The original incoming signal
* copy (bool) - defaults to `True` - whether to make a copy of the incoming signal. This will normally be true unless you want to overwrite references to the actual incoming signal when performing the merge.
* output_id (str) - The output ID of the block to notify on. If not included then the default output ID will be used.
* _returns:_ None - it just notifies the signals
8 changes: 4 additions & 4 deletions nio/block/mixins/enrich/enrich_signals.py
Original file line number Diff line number Diff line change
@@ -5,17 +5,17 @@


class EnrichProperties(PropertyHolder):
enrich_field = StringProperty(
title="Results Field", default="", visible=False)
enrich_field = StringProperty(title="Results Field", default="")
exclude_existing = BoolProperty(title="Exclude Existing?", default=True)


class EnrichSignals(object):

enrich = ObjectProperty(EnrichProperties, title='Signal Enrichment',
default=EnrichProperties(), order=100)
default=EnrichProperties(), order=100,
advanced=True)

def get_output_signal(self, signal_data, incoming_signal, copy=True):
def get_output_signal(self, signal_data, incoming_signal, copy=False):
""" Get an output signal based on the block configuration.
This method will return a single Signal that consists of the fields
26 changes: 5 additions & 21 deletions nio/block/mixins/enrich/tests/test_enrich_signals_mixin.py
Original file line number Diff line number Diff line change
@@ -43,6 +43,9 @@ def test_add_to_field(self):
self.assertEqual(out_sig.key1, 'val1')
self.assertEqual(out_sig.key2, 'val2')

# the incoming signal was not copied
self.assertEqual(incoming_signal, out_sig)

def test_signal_merge(self):
""" Make sure we can add results to a new field on the signal """
blk = EnrichingBlock()
@@ -86,28 +89,9 @@ def test_copy(self):
self.assertEqual(out_sig.key1, 'val1')
self.assertEqual(incoming_signal.key1, 'updated val1')

def test_no_copy(self):
""" Make sure that incoming signals can be copied """
blk = EnrichingBlock()
self.set_up_block(blk, False, 'results')
incoming_signal = Signal({
'key1': 'val1',
'key2': 'val2'
})

result_dict = {'a': 1, 'b': 2}
out_sig = blk.get_output_signal(result_dict, incoming_signal,
copy=False)

incoming_signal.key1 = 'updated val1'

# Both signals should have the results now
self.assertTrue(hasattr(out_sig, 'results'))
self.assertTrue(hasattr(incoming_signal, 'results'))
# the incoming signal was copied
self.assertNotEqual(incoming_signal, out_sig)

# The updated value should also afect our output signal
self.assertEqual(out_sig.key1, 'updated val1')
self.assertEqual(incoming_signal.key1, 'updated val1')

def test_notified(self):
""" Make sure we notify the signals properly """
Loading