Skip to content

Commit

Permalink
events: Add helpers for event listeners (#1051)
Browse files Browse the repository at this point in the history
* Simplify code
* Support EventTarget and EventEmitter
  • Loading branch information
sonnyp authored Jan 9, 2025
1 parent 3df2b08 commit 649ac4a
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 94 deletions.
68 changes: 21 additions & 47 deletions packages/connection/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventEmitter, promise } from "@xmpp/events";
import { EventEmitter, promise, listeners } from "@xmpp/events";
import jid from "@xmpp/jid";
import xml from "@xmpp/xml";
import StreamError from "./lib/StreamError.js";
Expand All @@ -8,13 +8,14 @@ const NS_STREAM = "urn:ietf:params:xml:ns:xmpp-streams";
const NS_JABBER_STREAM = "http://etherx.jabber.org/streams";

class Connection extends EventEmitter {
#socketListeners = null;
#parserListeners = null;

constructor(options = {}) {
super();
this.jid = null;
this.timeout = 2000;
this.options = options;
this.socketListeners = Object.create(null);
this.parserListeners = Object.create(null);
this.status = "offline";
this.socket = null;
this.parser = null;
Expand All @@ -40,7 +41,7 @@ class Connection extends EventEmitter {
this.parser.write(str);
}

_onParserError(error) {
#onParserError(error) {
// https://xmpp.org/rfcs/rfc6120.html#streams-error-conditions-bad-format
// "This error can be used instead of the more specific XML-related errors,
// such as <bad-namespace-prefix/>, <invalid-xml/>, <not-well-formed/>, <restricted-xml/>,
Expand All @@ -62,32 +63,17 @@ class Connection extends EventEmitter {

_attachSocket(socket) {
this.socket = socket;
const listeners = this.socketListeners;

listeners.data = this._onData.bind(this);

listeners.close = this.#onSocketClosed.bind(this);

listeners.connect = () => {
this._status("connect");
};

listeners.error = (error) => {
this.emit("error", error);
};

this.socket.on("close", listeners.close);
this.socket.on("data", listeners.data);
this.socket.on("error", listeners.error);
this.socket.on("connect", listeners.connect);
this.#socketListeners ??= listeners({
data: this._onData.bind(this),
close: this.#onSocketClosed.bind(this),
connect: () => this._status("connect"),
error: (error) => this.emit("error", error),
});
this.#socketListeners.subscribe(this.socket);
}

_detachSocket() {
const { socketListeners, socket } = this;
for (const k of Object.getOwnPropertyNames(socketListeners)) {
socket.removeListener(k, socketListeners[k]);
delete socketListeners[k];
}
this.socket && this.#socketListeners?.unsubscribe(this.socket);
this.socket = null;
}

Expand Down Expand Up @@ -143,29 +129,17 @@ class Connection extends EventEmitter {

_attachParser(parser) {
this.parser = parser;
const listeners = this.parserListeners;

listeners.element = this._onElement.bind(this);
listeners.error = this._onParserError.bind(this);

listeners.end = this.#onStreamClosed.bind(this);

listeners.start = (element) => {
this._status("open", element);
};

this.parser.on("error", listeners.error);
this.parser.on("element", listeners.element);
this.parser.on("end", listeners.end);
this.parser.on("start", listeners.start);
this.#parserListeners ??= listeners({
element: this._onElement.bind(this),
error: this.#onParserError.bind(this),
end: this.#onStreamClosed.bind(this),
start: (element) => this._status("open", element),
});
this.#parserListeners.subscribe(this.parser);
}

_detachParser() {
const listeners = this.parserListeners;
for (const k of Object.getOwnPropertyNames(listeners)) {
this.parser.removeListener(k, listeners[k]);
delete listeners[k];
}
this.parser && this.#parserListeners?.unsubscribe(this.parser);
this.parser = null;
this.root = null;
}
Expand Down
4 changes: 4 additions & 0 deletions packages/events/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import TimeoutError from "./lib/TimeoutError.js";
import promise from "./lib/promise.js";
import Deferred from "./lib/Deferred.js";
import procedure from "./lib/procedure.js";
import listeners from "./lib/listeners.js";
import onoff from "./lib/onoff.js";

export {
EventEmitter,
Expand All @@ -15,4 +17,6 @@ export {
promise,
Deferred,
procedure,
listeners,
onoff,
};
18 changes: 18 additions & 0 deletions packages/events/lib/listeners.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import onoff from "./onoff.js";

export default function listeners(events) {
return {
subscribe(target) {
const { on } = onoff(target);
for (const [event, handler] of Object.entries(events)) {
on(event, handler);
}
},
unsubscribe(target) {
const { off } = onoff(target);
for (const [event, handler] of Object.entries(events)) {
off(event, handler);
}
},
};
}
21 changes: 21 additions & 0 deletions packages/events/lib/onoff.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const map = new WeakMap();

export default function onoff(target) {
let m = map.get(target);

if (!m) {
const on = (target.addEventListener ?? target.addListener).bind(target);
const off = (target.removeEventListener ?? target.removeListener).bind(
target,
);
const once = (
target.once ??
((event, handler) =>
target.addEventListener(event, handler, { once: true }))
).bind(target);
m = { on, off, once };
map.set(target, m);
}

return m;
}
14 changes: 9 additions & 5 deletions packages/events/lib/promise.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import onoff from "./onoff.js";

import TimeoutError from "./TimeoutError.js";

export default function promise(EE, event, rejectEvent = "error", timeout) {
export default function promise(target, event, rejectEvent = "error", timeout) {
return new Promise((resolve, reject) => {
let timeoutId;

const { off, once } = onoff(target);

const cleanup = () => {
clearTimeout(timeoutId);
EE.removeListener(event, onEvent);
EE.removeListener(rejectEvent, onError);
off(event, onEvent);
off(rejectEvent, onError);
};

function onError(reason) {
Expand All @@ -20,9 +24,9 @@ export default function promise(EE, event, rejectEvent = "error", timeout) {
cleanup();
}

EE.once(event, onEvent);
once(event, onEvent);
if (rejectEvent) {
EE.once(rejectEvent, onError);
once(rejectEvent, onError);
}

if (timeout) {
Expand Down
72 changes: 30 additions & 42 deletions packages/websocket/lib/Socket.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import WS from "ws";
import { EventEmitter } from "@xmpp/events";
import { EventEmitter, listeners } from "@xmpp/events";
import { parseURI } from "@xmpp/connection/lib/util.js";

// eslint-disable-next-line n/no-unsupported-features/node-builtins
Expand All @@ -8,10 +8,9 @@ const WebSocket = globalThis.WebSocket || WS;
const CODE = "ECONNERROR";

export default class Socket extends EventEmitter {
constructor() {
super();
this.listeners = Object.create(null);
}
#listeners = null;
socket = null;
url = null;

isSecure() {
if (!this.url) return false;
Expand All @@ -28,47 +27,36 @@ export default class Socket extends EventEmitter {

_attachSocket(socket) {
this.socket = socket;
const { listeners } = this;
listeners.open = () => {
this.emit("connect");
};

listeners.message = ({ data }) => this.emit("data", data);
listeners.error = (event) => {
const { url } = this;
// WS
let { error } = event;
// DOM
if (!error) {
error = new Error(`WebSocket ${CODE} ${url}`);
error.errno = CODE;
error.code = CODE;
}
this.#listeners ??= listeners({
open: () => this.emit("connect"),
message: ({ data }) => this.emit("data", data),
error: (event) => {
const { url } = this;
// WS
let { error } = event;
// DOM
if (!error) {
error = new Error(event.message || `WebSocket ${CODE} ${url}`);
error.errno = CODE;
error.code = CODE;
}

error.event = event;
error.url = url;
this.emit("error", error);
};

listeners.close = (event) => {
this._detachSocket();
this.emit("close", !event.wasClean, event);
};

this.socket.addEventListener("open", listeners.open);
this.socket.addEventListener("message", listeners.message);
this.socket.addEventListener("error", listeners.error);
this.socket.addEventListener("close", listeners.close);
error.event = event;
error.url = url;
this.emit("error", error);
},
close: (event) => {
this._detachSocket();
this.emit("close", !event.wasClean, event);
},
});
this.#listeners.subscribe(this.socket);
}

_detachSocket() {
delete this.url;
const { socket, listeners } = this;
for (const k of Object.getOwnPropertyNames(listeners)) {
socket.removeEventListener(k, listeners[k]);
delete listeners[k];
}
delete this.socket;
this.url = null;
this.socket && this.#listeners?.unsubscribe(this.socket);
this.socket = null;
}

end() {
Expand Down

0 comments on commit 649ac4a

Please sign in to comment.