Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created two new nodes #12

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/fusion/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ data/*.json
db2/
db3/
db4/
db5/
db/
db2-1/

Expand Down
86 changes: 86 additions & 0 deletions src/fusion/index.NAIADES.braila.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// const StreamFusion = require('nrg-stream-fusion').streamFusion;
const StreamFusion = require('./main.js').streamFusion;

// water config
let smConf = {
"aggr": {
"braila_pressure": [
{ "field": "value", "tick": [
{"type": "winbuf", "winsize": 24 * 60 * 60 * 1000, "sub": [ // 24h sliding window
{"type": "ma" },
{"type": "max" },
{"type": "min" },
{"type": "variance" },
]}
]}
]
},
"fusion": {
"fusionModel": "braila_pressure5771_10d",
"connection": {
"type": "kafka"
},
"fusionTick": 90 * 1000,
"nodes": [
{
"type": "timevalue",
"nodeid": "braila_pressure5771",
"aggrConfigId": "braila_pressure",
"master": true,
"attributes": [
{ "time": 0, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -1, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -2, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -3, "attributes": [
{ type: "value", "name": "value|" }
]},
{ "time": -4, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -5, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -6, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -7, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -8, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -9, "attributes": [
{ type: "value", "name": "value" }
]}

]
}
]
}
};


// kafka connection config
let connectionConfig = {
// kafka: "172.29.12.94:9092",
kafka: "localhost:9092",
}

let brailaNodeid = ["braila_pressure5771",
"braila_pressure5771",
"braila_pressure5771",
"braila_pressure5771"];

for (var i = 0; i < 4; i++){

smConf["fusion"]["fusionModel"] = brailaNodeid[i] + '_10d';
smConf["fusion"]["nodes"][0]["nodeid"] = brailaNodeid[i];

const fusion = (new StreamFusion(connectionConfig, smConf["fusion"], smConf["aggr"]));
}
6 changes: 4 additions & 2 deletions src/fusion/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ const config = {
"connection": {
type: "kafka",
options: {
kafka: "192.168.99.100:9092",
zookeeper: "192.168.99.100:2181"
// kafka: "192.168.99.100:9092",
// zookeeper: "192.168.99.100:2181"
kafka: "loclahost:9092",
zookeeper: "localhost:2181"
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/fusion/index.smartlamp.EMA.1d.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,9 @@ for (let i = 1; i <= 1; i++) {
/*
// change weather feature according to time horizon
let horizon = horizons[i];

// handling changable weather features
let features = ['temperature', 'humidity', 'pressure', 'windSpeed', 'windBearing', 'cloudCover' ];
let attributes = [];

for (let i in features) {
let featureName = features[i] + horizon;
attributes.push({type: "value", name: featureName});
Expand Down
137 changes: 137 additions & 0 deletions src/fusion/nodes/streamingDebitmeterNode.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* streamingDebitmeterNode (template)
* Streaming noise node class for heterogeneous sensor stream data fusion.
*/
const streamingNode = require('./streamingNode.js');
class streamingDebitmeterNode extends streamingNode {
/**
* constructor
* @param {qm.Base} base QMiner base.
* @param {json} config Streaming node config.
* @param {json} aggrConfig Configuration of stream aggregates.
* @param {callback} porcessRecordCb Callback for invoking data fusion.
* @param {int} fusionNodeI Node id in fusion object.
* @param {object} parent Pointer to parent (for processRecordCb).
*/
constructor(base, connectionConfig, config, aggrConfigs, processRecordCb, fusionNodeI, parent) {
// call super constructor
super(base, connectionConfig, config, aggrConfigs, processRecordCb, fusionNodeI, parent);
// remembering callback and fusionNodeId
this.fusionNodeI = fusionNodeI;
this.processRecordCb = processRecordCb;
this.parent = parent;
// remember nodeid name
this.nodeId = config.nodeid;
this.storeName = this.nodeId.replace("-", "_");

// creating empty buffer of partial feature vectors
this.buffer = [];
// current position within buffer
this.position = 0;

// adding store
this.base.createStore({
name: this.storeName,
fields: [
{ name: "Time", type: "datetime" },
{ name: "flow_rate_value", type: "float" },
{ name: "totalizer1", type: "float" },
{ name: "totalizer2", type: "float" },
{ name: "consumer_totalizer", type: "float" },
{ name: "analog_input1", type: "float" },
{ name: "analog_input2", type: "float" },
{ name: "batery_capacity", type: "float" },
{ name: "alarms_in_decimal", type: "float" }
]
});
this.rawstore = this.base.store(this.storeName);

// initialize last timestamp
this.lastTimestamp = 0;

// create appropriate stream aggregates
// with selected stream aggregates definition
super.createAggregates(aggrConfigs[config.aggrConfigId]);
// run super postConstructor
super.postConstructor();
}

/**
* processRecord()
* @param {json} rec Raw record from data source.
*/
processRecord(rec) {
// extract record from rec (according to the store construction)
let record = {};

if (typeof rec == "string") {
rec = JSON.parse(rec);
}

// TODO: what if we used last-value interpolation instead of zero in the
// null?
let unixts = rec["time"] * 1000;

let flow_rate_value = (isNaN(rec["flow_rate_value"]) || rec["flow_rate_value"] == null) ? 0 : rec["flow_rate_value"];
let totalizer1 = (isNaN(rec["totalizer1"]) || rec["totalizer1"] == null) ? 0 : rec["totalizer1"];
let totalizer2 = (isNaN(rec["totalizer2"]) || rec["totalizer2"] == null) ? 0 : rec["totalizer2"];
let consumer_totalizer = (isNaN(rec["consumer_totalizer"]) || rec["consumer_totalizer"] == null) ? 0 : rec["consumer_totalizer"];
let analog_input1 = (isNaN(rec["analog_input1"]) || rec["analog_input1"] == null) ? 0 : rec["analog_input1"];
let analog_input2 = (isNaN(rec["analog_input2"]) || rec["analog_input2"] == null) ? 0 : rec["analog_input2"];
let batery_capacity = (isNaN(rec["batery_capacity"]) || rec["batery_capacity"] == null) ? 0 : rec["batery_capacity"];
let alarms_in_decimal = (isNaN(rec["alarms_in_decimal"]) || rec["alarms_in_decimal"] == null) ? 0 : rec["alarms_in_decimal"];

if (unixts <= this.lastTimestamp) {
console.log("Debitmeter - double timestamp.");
return;
}

if (isNaN(unixts)) {
console.log(this.nodeId, "Timestamp is NaN!");
return;
}

// create ghost store record
this.rawRecord = this.rawstore.newRecord({
Time: unixts,
flow_rate_value: flow_rate_value,
totalizer1: totalizer1,
totalizer2: totalizer2,
consumer_totalizer: consumer_totalizer,
analog_input1: analog_input1,
analog_input2: analog_input2,
batery_capacity: batery_capacity,
alarms_in_decimal: alarms_in_decimal
});

// trigger stream aggregates bound to Raw store - first stage of resampling
this.rawstore.triggerOnAddCallbacks(this.rawRecord);
this.lastTimestamp = unixts;

// reading current aggregates
let aggregates = super.getAggregates();
// combining it with current state vector
let combined = aggregates;
// update combined vector with current values
combined["flow_rate_value"] = flow_rate_value;
combined["totalizer1"] = totalizer1;
combined["totalizer2"] = totalizer2;
combined["consumer_totalizer"] = consumer_totalizer;
combined["analog_input1"] = analog_input1;
combined["analog_input2"] = analog_input2;
combined["batery_capacity"] = batery_capacity;
combined["alarms_in_decimal"] = alarms_in_decimal;

// push the vector in the buffer
this.buffer.push(combined);

// send aggregate to Kafka
super.broadcastAggregates(aggregates);

// call streamFusion hook for this sensor
this.processRecordCb(this.fusionNodeI, this.parent);
}

}

module.exports = streamingDebitmeterNode;
117 changes: 117 additions & 0 deletions src/fusion/nodes/streamingNoiseNode.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* streamingNoiseNode (template)
* Streaming noise node class for heterogeneous sensor stream data fusion.
*/
const streamingNode = require('./streamingNode.js');
class streamingNoiseNode extends streamingNode {
/**
* constructor
* @param {qm.Base} base QMiner base.
* @param {json} config Streaming node config.
* @param {json} aggrConfig Configuration of stream aggregates.
* @param {callback} porcessRecordCb Callback for invoking data fusion.
* @param {int} fusionNodeI Node id in fusion object.
* @param {object} parent Pointer to parent (for processRecordCb).
*/
constructor(base, connectionConfig, config, aggrConfigs, processRecordCb, fusionNodeI, parent) {
// call super constructor
super(base, connectionConfig, config, aggrConfigs, processRecordCb, fusionNodeI, parent);
// remembering callback and fusionNodeId
this.fusionNodeI = fusionNodeI;
this.processRecordCb = processRecordCb;
this.parent = parent;
// remember nodeid name
this.nodeId = config.nodeid;
this.storeName = this.nodeId.replace("-", "_");

// creating empty buffer of partial feature vectors
this.buffer = [];
// current position within buffer
this.position = 0;

// adding store
this.base.createStore({
name: this.storeName,
fields: [
{ name: "Time", type: "datetime" },
{ name: "leak_state", type: "float" },
{ name: "noise_db", type: "float" },
{ name: "spre_db", type: "float" }
]
});
this.rawstore = this.base.store(this.storeName);

// initialize last timestamp
this.lastTimestamp = 0;

// create appropriate stream aggregates
// with selected stream aggregates definition
super.createAggregates(aggrConfigs[config.aggrConfigId]);
// run super postConstructor
super.postConstructor();
}

/**
* processRecord()
* @param {json} rec Raw record from data source.
*/
processRecord(rec) {
// extract record from rec (according to the store construction)
let record = {};

if (typeof rec == "string") {
rec = JSON.parse(rec);
}

// TODO: what if we used last-value interpolation instead of zero in the
// null?
let unixts = rec["time"] * 1000;

let noise_db = (isNaN(rec["noise_db"]) || rec["noise_db"] == null) ? 0 : rec["noise_db"];
let leak_state = (isNaN(rec["leak_state"]) || rec["leak_state"] == null) ? 0 : rec["leak_state"];
let spre_db = (isNaN(rec["spre_db"]) || rec["spre_db"] == null) ? 0 : rec["spre_db"];

if (unixts <= this.lastTimestamp) {
console.log("Noise - double timestamp.");
return;
}

if (isNaN(unixts)) {
console.log(this.nodeId, "Timestamp is NaN!");
return;
}

// create ghost store record
this.rawRecord = this.rawstore.newRecord({
Time: unixts,
noise_db: noise_db,
leak_state: leak_state,
spre_db: spre_db
});

// trigger stream aggregates bound to Raw store - first stage of resampling
this.rawstore.triggerOnAddCallbacks(this.rawRecord);
this.lastTimestamp = unixts;

// reading current aggregates
let aggregates = super.getAggregates();
// combining it with current state vector
let combined = aggregates;
// update combined vector with current values
combined["noise_db"] = noise_db;
combined["leak_state"] = leak_state;
combined["spre_db"] = spre_db;

// push the vector in the buffer
this.buffer.push(combined);

// send aggregate to Kafka
super.broadcastAggregates(aggregates);

// call streamFusion hook for this sensor
this.processRecordCb(this.fusionNodeI, this.parent);
}

}

module.exports = streamingNoiseNode;
Loading