From 649ac4a03cd7805a9f6087ec3581234ab9264700 Mon Sep 17 00:00:00 2001 From: Sonny Date: Thu, 9 Jan 2025 20:43:22 +0100 Subject: [PATCH] events: Add helpers for event listeners (#1051) * Simplify code * Support EventTarget and EventEmitter --- packages/connection/index.js | 68 ++++++++++-------------------- packages/events/index.js | 4 ++ packages/events/lib/listeners.js | 18 ++++++++ packages/events/lib/onoff.js | 21 ++++++++++ packages/events/lib/promise.js | 14 ++++--- packages/websocket/lib/Socket.js | 72 +++++++++++++------------------- 6 files changed, 103 insertions(+), 94 deletions(-) create mode 100644 packages/events/lib/listeners.js create mode 100644 packages/events/lib/onoff.js diff --git a/packages/connection/index.js b/packages/connection/index.js index 01f0d329..262d9cc3 100644 --- a/packages/connection/index.js +++ b/packages/connection/index.js @@ -1,4 +1,4 @@ -import { EventEmitter, promise } from "@xmpp/events"; +import { EventEmitter, promise, listeners } from "@xmpp/events"; import jid from "@xmpp/jid"; import xml from "@xmpp/xml"; import StreamError from "./lib/StreamError.js"; @@ -8,13 +8,14 @@ const NS_STREAM = "urn:ietf:params:xml:ns:xmpp-streams"; const NS_JABBER_STREAM = "http://etherx.jabber.org/streams"; class Connection extends EventEmitter { + #socketListeners = null; + #parserListeners = null; + constructor(options = {}) { super(); this.jid = null; this.timeout = 2000; this.options = options; - this.socketListeners = Object.create(null); - this.parserListeners = Object.create(null); this.status = "offline"; this.socket = null; this.parser = null; @@ -40,7 +41,7 @@ class Connection extends EventEmitter { this.parser.write(str); } - _onParserError(error) { + #onParserError(error) { // https://xmpp.org/rfcs/rfc6120.html#streams-error-conditions-bad-format // "This error can be used instead of the more specific XML-related errors, // such as , , , , @@ -62,32 +63,17 @@ class Connection extends EventEmitter { _attachSocket(socket) { this.socket = socket; - const listeners = this.socketListeners; - - listeners.data = this._onData.bind(this); - - listeners.close = this.#onSocketClosed.bind(this); - - listeners.connect = () => { - this._status("connect"); - }; - - listeners.error = (error) => { - this.emit("error", error); - }; - - this.socket.on("close", listeners.close); - this.socket.on("data", listeners.data); - this.socket.on("error", listeners.error); - this.socket.on("connect", listeners.connect); + this.#socketListeners ??= listeners({ + data: this._onData.bind(this), + close: this.#onSocketClosed.bind(this), + connect: () => this._status("connect"), + error: (error) => this.emit("error", error), + }); + this.#socketListeners.subscribe(this.socket); } _detachSocket() { - const { socketListeners, socket } = this; - for (const k of Object.getOwnPropertyNames(socketListeners)) { - socket.removeListener(k, socketListeners[k]); - delete socketListeners[k]; - } + this.socket && this.#socketListeners?.unsubscribe(this.socket); this.socket = null; } @@ -143,29 +129,17 @@ class Connection extends EventEmitter { _attachParser(parser) { this.parser = parser; - const listeners = this.parserListeners; - - listeners.element = this._onElement.bind(this); - listeners.error = this._onParserError.bind(this); - - listeners.end = this.#onStreamClosed.bind(this); - - listeners.start = (element) => { - this._status("open", element); - }; - - this.parser.on("error", listeners.error); - this.parser.on("element", listeners.element); - this.parser.on("end", listeners.end); - this.parser.on("start", listeners.start); + this.#parserListeners ??= listeners({ + element: this._onElement.bind(this), + error: this.#onParserError.bind(this), + end: this.#onStreamClosed.bind(this), + start: (element) => this._status("open", element), + }); + this.#parserListeners.subscribe(this.parser); } _detachParser() { - const listeners = this.parserListeners; - for (const k of Object.getOwnPropertyNames(listeners)) { - this.parser.removeListener(k, listeners[k]); - delete listeners[k]; - } + this.parser && this.#parserListeners?.unsubscribe(this.parser); this.parser = null; this.root = null; } diff --git a/packages/events/index.js b/packages/events/index.js index e952ad71..cbe287f7 100644 --- a/packages/events/index.js +++ b/packages/events/index.js @@ -6,6 +6,8 @@ import TimeoutError from "./lib/TimeoutError.js"; import promise from "./lib/promise.js"; import Deferred from "./lib/Deferred.js"; import procedure from "./lib/procedure.js"; +import listeners from "./lib/listeners.js"; +import onoff from "./lib/onoff.js"; export { EventEmitter, @@ -15,4 +17,6 @@ export { promise, Deferred, procedure, + listeners, + onoff, }; diff --git a/packages/events/lib/listeners.js b/packages/events/lib/listeners.js new file mode 100644 index 00000000..53aa7254 --- /dev/null +++ b/packages/events/lib/listeners.js @@ -0,0 +1,18 @@ +import onoff from "./onoff.js"; + +export default function listeners(events) { + return { + subscribe(target) { + const { on } = onoff(target); + for (const [event, handler] of Object.entries(events)) { + on(event, handler); + } + }, + unsubscribe(target) { + const { off } = onoff(target); + for (const [event, handler] of Object.entries(events)) { + off(event, handler); + } + }, + }; +} diff --git a/packages/events/lib/onoff.js b/packages/events/lib/onoff.js new file mode 100644 index 00000000..6d4c10a7 --- /dev/null +++ b/packages/events/lib/onoff.js @@ -0,0 +1,21 @@ +const map = new WeakMap(); + +export default function onoff(target) { + let m = map.get(target); + + if (!m) { + const on = (target.addEventListener ?? target.addListener).bind(target); + const off = (target.removeEventListener ?? target.removeListener).bind( + target, + ); + const once = ( + target.once ?? + ((event, handler) => + target.addEventListener(event, handler, { once: true })) + ).bind(target); + m = { on, off, once }; + map.set(target, m); + } + + return m; +} diff --git a/packages/events/lib/promise.js b/packages/events/lib/promise.js index 69fb61b4..c741b9b7 100644 --- a/packages/events/lib/promise.js +++ b/packages/events/lib/promise.js @@ -1,13 +1,17 @@ +import onoff from "./onoff.js"; + import TimeoutError from "./TimeoutError.js"; -export default function promise(EE, event, rejectEvent = "error", timeout) { +export default function promise(target, event, rejectEvent = "error", timeout) { return new Promise((resolve, reject) => { let timeoutId; + const { off, once } = onoff(target); + const cleanup = () => { clearTimeout(timeoutId); - EE.removeListener(event, onEvent); - EE.removeListener(rejectEvent, onError); + off(event, onEvent); + off(rejectEvent, onError); }; function onError(reason) { @@ -20,9 +24,9 @@ export default function promise(EE, event, rejectEvent = "error", timeout) { cleanup(); } - EE.once(event, onEvent); + once(event, onEvent); if (rejectEvent) { - EE.once(rejectEvent, onError); + once(rejectEvent, onError); } if (timeout) { diff --git a/packages/websocket/lib/Socket.js b/packages/websocket/lib/Socket.js index 3227a11b..1d551bb6 100644 --- a/packages/websocket/lib/Socket.js +++ b/packages/websocket/lib/Socket.js @@ -1,5 +1,5 @@ import WS from "ws"; -import { EventEmitter } from "@xmpp/events"; +import { EventEmitter, listeners } from "@xmpp/events"; import { parseURI } from "@xmpp/connection/lib/util.js"; // eslint-disable-next-line n/no-unsupported-features/node-builtins @@ -8,10 +8,9 @@ const WebSocket = globalThis.WebSocket || WS; const CODE = "ECONNERROR"; export default class Socket extends EventEmitter { - constructor() { - super(); - this.listeners = Object.create(null); - } + #listeners = null; + socket = null; + url = null; isSecure() { if (!this.url) return false; @@ -28,47 +27,36 @@ export default class Socket extends EventEmitter { _attachSocket(socket) { this.socket = socket; - const { listeners } = this; - listeners.open = () => { - this.emit("connect"); - }; - - listeners.message = ({ data }) => this.emit("data", data); - listeners.error = (event) => { - const { url } = this; - // WS - let { error } = event; - // DOM - if (!error) { - error = new Error(`WebSocket ${CODE} ${url}`); - error.errno = CODE; - error.code = CODE; - } + this.#listeners ??= listeners({ + open: () => this.emit("connect"), + message: ({ data }) => this.emit("data", data), + error: (event) => { + const { url } = this; + // WS + let { error } = event; + // DOM + if (!error) { + error = new Error(event.message || `WebSocket ${CODE} ${url}`); + error.errno = CODE; + error.code = CODE; + } - error.event = event; - error.url = url; - this.emit("error", error); - }; - - listeners.close = (event) => { - this._detachSocket(); - this.emit("close", !event.wasClean, event); - }; - - this.socket.addEventListener("open", listeners.open); - this.socket.addEventListener("message", listeners.message); - this.socket.addEventListener("error", listeners.error); - this.socket.addEventListener("close", listeners.close); + error.event = event; + error.url = url; + this.emit("error", error); + }, + close: (event) => { + this._detachSocket(); + this.emit("close", !event.wasClean, event); + }, + }); + this.#listeners.subscribe(this.socket); } _detachSocket() { - delete this.url; - const { socket, listeners } = this; - for (const k of Object.getOwnPropertyNames(listeners)) { - socket.removeEventListener(k, listeners[k]); - delete listeners[k]; - } - delete this.socket; + this.url = null; + this.socket && this.#listeners?.unsubscribe(this.socket); + this.socket = null; } end() {