Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created two new nodes #12

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions src/fusion/index.NAIADES.braila.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// const StreamFusion = require('nrg-stream-fusion').streamFusion;
const StreamFusion = require('./main.js').streamFusion;

// water config
let smConf = {
"aggr": {
"braila_pressure": [
{ "field": "value", "tick": [
{"type": "winbuf", "winsize": 24 * 60 * 60 * 1000, "sub": [ // 24h sliding window
{"type": "ma" },
{"type": "max" },
{"type": "min" },
{"type": "variance" },
]}
]}
]
},
"fusion": {
"fusionModel": "braila_pressure5771_10d",
"connection": {
"type": "kafka"
},
"fusionTick": 90 * 1000,
"nodes": [
{
"type": "timevalue",
"nodeid": "braila_pressure5771",
"aggrConfigId": "braila_pressure",
"master": true,
"attributes": [
{ "time": 0, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -1, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -2, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -3, "attributes": [
{ type: "value", "name": "value|" }
]},
{ "time": -4, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -5, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -6, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -7, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -8, "attributes": [
{ type: "value", "name": "value" }
]},
{ "time": -9, "attributes": [
{ type: "value", "name": "value" }
]}

]
}
]
}
};


// kafka connection config
let connectionConfig = {
// kafka: "172.29.12.94:9092",
kafka: "localhost:9092",
}

let brailaNodeid = ["braila_pressure5771",
"braila_pressure5771",
"braila_pressure5771",
"braila_pressure5771"];

for (var i = 0; i < 4; i++){

smConf["fusion"]["fusionModel"] = brailaNodeid[i] + '_10d';
smConf["fusion"]["nodes"][0]["nodeid"] = brailaNodeid[i];

const fusion = (new StreamFusion(connectionConfig, smConf["fusion"], smConf["aggr"]));
}
6 changes: 4 additions & 2 deletions src/fusion/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ const config = {
"connection": {
type: "kafka",
options: {
kafka: "192.168.99.100:9092",
zookeeper: "192.168.99.100:2181"
// kafka: "192.168.99.100:9092",
// zookeeper: "192.168.99.100:2181"
kafka: "loclahost:9092",
zookeeper: "localhost:2181"
}
}
}
Expand Down
119 changes: 0 additions & 119 deletions src/fusion/index.smartlamp.EMA.1d.js

This file was deleted.

96 changes: 96 additions & 0 deletions src/fusion/nodes/streamingNaiadesWeatherNode.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* streamingWeatherNode
mcerin marked this conversation as resolved.
Show resolved Hide resolved
* Weather streaming node class for heterogeneous sensor stream data fusion.
*/
const streamingNode = require('./streamingNode.js');

class streamingNaiadesWeatherNode extends streamingNode {
mcerin marked this conversation as resolved.
Show resolved Hide resolved
/**
* constructor
* @param {qm.Base} base QMiner base.
* @param {json} config Streaming node config.
* @param {json} aggrConfig Configuration of stream aggregates.
* @param {callback} porcessRecordCb Callback for invoking data fusion.
* @param {int} fusionNodeI Node id in fusion object.
* @param {object} parent Pointer to parent (for processRecordCb).
*/
constructor(base, connectionConfig, config, aggrConfigs, processRecordCb, fusionNodeI, parent) {
mcerin marked this conversation as resolved.
Show resolved Hide resolved
// call super constructor
super(base, connectionConfig, config, aggrConfigs, processRecordCb, fusionNodeI, parent);

// read config options
this.datasize = config["datasize"] === undefined ? 48 : config["datasize"];
this.datatype = config["datatype"] === undefined ? "hourly" : config["datatype"];

// adding store
// generating fields
this.fields = [];
this.fieldTypes = config["fieldtypes"] === undefined ?
[ "temperature", "humidity", "pressure", "windSpeed", "windBearing", "dewPoint", "percipitation", "illuminance", "pressureTendency" ] :
config["fieldtypes"];

for (let i = 0; i < this.datasize; i++) {
for (let j in this.fieldTypes) {
let fieldName = this.fieldTypes[j] + i;
this.fields.push({ name: fieldName, type: "float" });
}
}

// creating store
this.base.createStore({
name: this.nodeId,
fields: this.fields
});
this.rawstore = this.base.store(this.nodeId);

// create appropriate stream aggregates
// with selected stream aggregates definition
super.createAggregates(aggrConfigs[config.aggrConfigId]);

// run super postConstructor
super.postConstructor();
}

/**
* processRecord()
* @param {json} rec Raw record from data source.
*/
processRecord(rec) {
// we DO NOT deal with aggregates with weather data (yet)
// TODO: we can start using streamaggregates on this type of weather
if (typeof rec == "string") {
rec = JSON.parse(rec);
}

// extract record from rec (according to the store construction)
let record = {};

if ((this.datatype in rec) && ("data" in rec[this.datatype]) && (rec[this.datatype].data.length >= this.datasize)) {
// setting stampm manually since we do not have getAggregates function
record["time"] = rec.currently.time * 1000;

// populate other record properties
// this.fieldTypes is already set from constructor
for (let i = 0; i < this.datasize; i++) {
for (let j in this.fieldTypes) {
let fieldName = this.fieldTypes[j] + i;
record[fieldName] = rec[this.datatype].data[i][this.fieldTypes[j]];
// convert potential null value to 0
if (record[fieldName] == null) rec[fieldName] = 0;
}
}

let combined = record;

// push the vector in the buffer
this.buffer.push(combined);

// call streamFusion hook for this sensor
this.processRecordCb(this.fusionNodeI, this.parent);
} else {
console.log("NO WEATHER/WEATHER RECORD TOO SHORT (" + this.datasize + " " + this.datatype + " records needed)!");
}
}
}

module.exports = streamingNaiadesWeatherNode;
Loading