Skip to content

Commit

Permalink
Merge pull request #119 from hasbel/unsubscribe
Browse files Browse the repository at this point in the history
Add observable properties to mqtt binding
  • Loading branch information
danielpeintner authored Jul 3, 2019
2 parents a890adc + b9428c9 commit 71be695
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 8 deletions.
5 changes: 4 additions & 1 deletion packages/binding-http/src/http-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ export default class HttpClient implements ProtocolClient {
}
});
});
req.on("error", (err: any) => error(err));
req.on("error", (err: any) => {
if (error) error(err);
if (complete) complete();
});

req.flushHeaders();
req.end();
Expand Down
34 changes: 32 additions & 2 deletions packages/binding-mqtt/src/mqtt-broker-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,40 @@ export default class MqttBrokerServer implements ProtocolServer {
// TODO clean-up on destroy and stop
this.things.set(name, thing);

for (let propertyName in thing.properties) {
let topic = "/" + encodeURIComponent(name) + "/properties/" + encodeURIComponent(propertyName);
let property = thing.properties[propertyName];

let subscription = property.subscribe(
(data) => {
let content;
try {
content = ContentSerdes.get().valueToContent(data, property.data);
} catch(err) {
console.warn(`MqttServer cannot process data for Property '${propertyName}': ${err.message}`);
subscription.unsubscribe();
return;
}
console.log(`MqttBrokerServer at ${this.brokerURI} publishing to Property topic '${propertyName}' `);
this.broker.publish(topic, content.body);
}
);

let href = this.brokerURI + topic;
let form = new TD.Form(href, ContentSerdes.DEFAULT);
form.op = ["observeproperty", "unobserveproperty"];
thing.properties[propertyName].forms.push(form);
console.log(`MqttBrokerServer at ${this.brokerURI} assigns '${href}' to property '${propertyName}'`);
}

for (let actionName in thing.actions) {
let topic = "/" + encodeURIComponent(name) + "/actions/" + encodeURIComponent(actionName);
this.broker.subscribe(topic);

let href = this.brokerURI + topic;
thing.actions[actionName].forms.push(new TD.Form(href, ContentSerdes.DEFAULT));
let form = new TD.Form(href, ContentSerdes.DEFAULT);
form.op = ["invokeaction"];
thing.actions[actionName].forms.push(form);
console.log(`MqttBrokerServer at ${this.brokerURI} assigns '${href}' to Action '${actionName}'`);
}

Expand Down Expand Up @@ -150,7 +178,9 @@ export default class MqttBrokerServer implements ProtocolServer {
);

let href = this.brokerURI + topic;
event.forms.push(new TD.Form(href, ContentSerdes.DEFAULT));
let form = new TD.Form(href, ContentSerdes.DEFAULT);
form.op = ["subscribeevent", "unsubscribeevent"];
event.forms.push(form);
console.log(`MqttBrokerServer at ${this.brokerURI} assigns '${href}' to Event '${eventName}'`);
}

Expand Down
7 changes: 6 additions & 1 deletion packages/binding-mqtt/src/mqtt-client-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,21 @@ import MqttClient from "./mqtt-client";
export default class MqttClientFactory implements ProtocolClientFactory {

public readonly scheme: string = "mqtt";
private readonly clients: Array<ProtocolClient> = [];

getClient = (): ProtocolClient => {
return new MqttClient();
let client = new MqttClient();
this.clients.push(client);
return client;
}

init(): boolean {
return true;
}

destroy(): boolean {
console.log(`MqttClientFactory stopping all clients for '${this.scheme}'`);
this.clients.forEach((client) => client.stop())
return true;
}
}
18 changes: 14 additions & 4 deletions packages/binding-mqtt/src/mqtt-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ export default class MqttClient implements ProtocolClient {
error(error);
});


return new Subscription(()=>{this.client.end()});
return new Subscription(()=>{this.client.unsubscribe(topic)});
}


Expand Down Expand Up @@ -107,14 +106,25 @@ export default class MqttClient implements ProtocolClient {

});
}

unlinkResource = (form: Form): Promise<void> => {
//TODO: Implement
throw new Error('Method not implemented.');
let requestUri = url.parse(form['href']);
let topic = requestUri.pathname;

return new Promise<void>((resolve, reject) => {
if(this.client && this.client.connected) {
this.client.unsubscribe(topic);
console.log(`MqttClient unsubscribed from topic '${topic}'`);
}
resolve()
});
}

start = (): boolean => {
return true;
}
stop = (): boolean => {
if(this.client) this.client.end();
return true;
}

Expand Down

0 comments on commit 71be695

Please sign in to comment.