From 08e9abe3fb30c5a44a5f90cca4ed438d321a0284 Mon Sep 17 00:00:00 2001 From: Alexey Krainev Date: Fri, 14 Feb 2025 13:55:00 +0500 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=B1=D0=BD=D0=BE=D0=B2=D0=B8=D0=BB=20?= =?UTF-8?q?=D1=81=D0=BE=D0=BA=D0=B5=D1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/Device.mts | 20 ++-- library/client/home/devices/media.ts | 4 +- library/utils/websocket.ts | 159 ++++++++++++++------------- 3 files changed, 92 insertions(+), 91 deletions(-) diff --git a/drivers/Device.mts b/drivers/Device.mts index 015c2bd..2f83c82 100644 --- a/drivers/Device.mts +++ b/drivers/Device.mts @@ -191,16 +191,16 @@ export default class Device extends Homey.Device { if (trackId !== this.#lastTrackId) { const track = await this.#yandex.api.music.getTrack(trackId).catch(() => undefined); this.#lastTrackId = track?.id || trackId; - this.#lastTrackAlbumId = track?.albums?.[0]?.id || undefined; - this.#lastTrackLyrics = []; + this.#lastTrackAlbumId = track?.albums?.[0]?.id; const [likes, dislikes] = await Promise.all([ this.#yandex.api.music.getLikes(this.#userId), this.#yandex.api.music.getDislikes(this.#userId), - this.updateTrackCover(state, track), - this.updateTrackLyrics(state, track) + this.updateCover(state, track) ]); + if (track) await this.updateLyrics(track); + capabilities.speaker_track = track?.title || state.playerState?.title || ""; capabilities.speaker_artist = track?.artists?.map((a) => a.name)?.join(", ") || state.playerState?.subtitle || ""; capabilities.speaker_album = track?.albums?.[0]?.title || state.playerState?.playlistId || ""; @@ -208,8 +208,8 @@ export default class Device extends Homey.Device { capabilities.media_dislike = !!dislikes.find(dislike => dislike.id === track?.id); } - await this.handleLyricsSync(state); await this.handleAliceState(state); + await this.handleLyricsSync(state); await Promise.all( Object.entries(capabilities).map(async ([capability, value]) => { @@ -226,7 +226,7 @@ export default class Device extends Homey.Device { ); }; - private async updateTrackCover(state: Types.GlagolState, track?: any) { + private async updateCover(state: Types.GlagolState, track?: any) { const trackImage = track?.coverUri || track?.ogImage || state.playerState?.extra?.coverURI || ""; const imageQuality = this.getSetting("image_quality") || 500; @@ -236,12 +236,12 @@ export default class Device extends Homey.Device { await this.#image.update(); } - private async updateTrackLyrics(state: Types.GlagolState, track?: any) { - const trackId = state.playerState?.id || track?.id; + private async updateLyrics(track: Types.MusicTrack) { let lyricsValues = [{ id: "none", title: "Нет текста песни" }]; - + + this.#lastTrackLyrics = []; if (track?.lyricsInfo?.hasAvailableSyncLyrics) { - const lyrics = await this.#yandex.api.music.getLyrics(trackId).catch(() => ""); + const lyrics = await this.#yandex.api.music.getLyrics(track.id).catch(() => ""); const lyricsLines = lyrics.split("\n"); const values = lyricsLines.map(line => { const time = line.split("[")[1].split("] ")[0]; diff --git a/library/client/home/devices/media.ts b/library/client/home/devices/media.ts index 888a987..958059d 100644 --- a/library/client/home/devices/media.ts +++ b/library/client/home/devices/media.ts @@ -86,7 +86,7 @@ export class YandexMediaDevice extends EventEmitter { options: { rejectUnauthorized: false }, heartbeat: 10, message: { - transform: async (payload) => ({ + modify: async (payload) => ({ id: randomUUID(), sentTime: Date.now(), conversationToken: this.#conversationToken, @@ -94,7 +94,7 @@ export class YandexMediaDevice extends EventEmitter { }), encode: async (payload) => JSON.stringify(payload), decode: async (message) => parseJson(message.toString()), - identify: async (payload, message) => + match: async (payload, message) => message.requestId === payload.id && message.requestSentTime === payload.sentTime, }, }); diff --git a/library/utils/websocket.ts b/library/utils/websocket.ts index 4102f84..68734b5 100644 --- a/library/utils/websocket.ts +++ b/library/utils/websocket.ts @@ -8,41 +8,51 @@ export type Options = { options?: ClientOptions | ClientRequestArgs; closeCodes?: number[]; heartbeat?: number; - message?: { - transform?: (payload: any) => PromiseLike; - encode?: (payload: any) => PromiseLike; - decode?: (message: Buffer | ArrayBuffer | Buffer[]) => PromiseLike; - identify?: (payload: any, message: any) => PromiseLike; - }; + message?: MessageOptions; }; -const DefaultOptions = { - closeCodes: [1000, 1005, 1006], - heartbeat: 0, - message: { - transform: async (payload: any) => payload, - encode: async (payload: any) => payload, - decode: async (message: Buffer | ArrayBuffer | Buffer[]) => message, - identify: async () => true, - }, -}; +export type MessageOptions = { + modify?: (payload: any) => PromiseLike; + encode?: (payload: any) => PromiseLike; + decode?: (message: Buffer | ArrayBuffer | Buffer[]) => PromiseLike; + match?: (payload: any, message: any) => PromiseLike; +} + +type ReconnectSocketOptions = Required }>; export class ReconnectSocket extends EventEmitter { #websocket?: WebSocket; - #connectPromise?: Promise; + #websocketConnect?: Promise; + #websocketDisconnect?: Promise; #heartbeatTimeout?: NodeJS.Timeout; #reconnectTimeout?: NodeJS.Timeout; #reconnectAttempt = 0; - constructor(public options: Options) { + options: ReconnectSocketOptions; + + constructor(options: Options) { super(); + this.options = { + protocols: [], + options: {}, + closeCodes: [1000, 1005, 1006], + heartbeat: 0, + message: { + modify: async (payload) => payload, + encode: async (payload) => payload, + decode: async (message) => message, + match: async () => true, + ...options.message + }, + ...options + } as ReconnectSocketOptions; } async connect(timeoutSec = 10) { - if (this.#connectPromise) return this.#connectPromise; - if (this.#websocket?.readyState === WebSocket.OPEN) return Promise.resolve(); + if (this.#websocketConnect) return this.#websocketConnect; + if (this.#websocket?.readyState === WebSocket.OPEN) return; - this.#connectPromise = (async () => { + this.#websocketConnect = (async () => { try { const address = await this.options.address(); const { protocols, options } = this.options; @@ -50,7 +60,7 @@ export class ReconnectSocket extends EventEmitter { await new Promise((resolve, reject) => { this.#websocket = new WebSocket(address, protocols, options); this.#websocket.on("ping", () => this.#heartbeat()); - this.#websocket.on("error", console.error); + this.#websocket.on("error", reject); this.#websocket.once("open", async () => { this.#reconnectAttempt = 0; @@ -59,106 +69,97 @@ export class ReconnectSocket extends EventEmitter { resolve(); }); - this.#websocket.on("close", async (code, reason) => { - if (this.#reconnectAttempt === 0) { - this.emit("disconnect"); - return reject("Ошибка подключения"); - } - - const closeCodes = this.options.closeCodes ?? DefaultOptions.closeCodes; - if (this.#reconnectAttempt > 3 || closeCodes.includes(code)) { - this.#cleanup(); - this.emit("disconnect"); - } - await this.#reconnect(); + this.#websocket.once("close", async (code) => { + if (!this.options.closeCodes.includes(code)) return this.#reconnect(); + this.#cleanup(); + this.emit("disconnect"); + reject("Ошибка подключения"); }); this.#websocket.on("message", async (message) => { this.#heartbeat(); - - // Декодируем сообщение или возвращаем оригинал - const decode = this.options.message?.decode ?? DefaultOptions.message.decode; - await decode(message).then((decoded) => this.emit("message", decoded), console.error); + await this.options.message.decode(message) + .then((decoded) => this.emit("message", decoded), console.error); }); - setTimeout(() => reject(new Error("Таймаут подключения")), timeoutSec * 1000); + setTimeout(() => reject(new Error("Ошибка подключения: таймаут")), timeoutSec * 1000); }); } finally { - this.#connectPromise = undefined; + this.#websocketConnect = undefined; } })(); - return this.#connectPromise.catch((error) => { - this.#cleanup(); - return Promise.reject(error); - }); + return this.#websocketConnect; } async disconnect() { - return new Promise((resolve) => { - if (!this.#websocket) return resolve(); - this.#websocket.once("close", resolve); - this.#websocket.close(1000); - }); + if (this.#websocketDisconnect) return this.#websocketDisconnect; + if (!this.#websocket || this.#websocket.readyState === WebSocket.CLOSED) return; + + this.#websocketDisconnect = (async () => { + await new Promise((resolve) => { + this.once("disconnect", resolve); + this.#websocket?.close(1000); + }); + this.#websocketDisconnect = undefined; + })(); + + return this.#websocketDisconnect; } async send(payload: any, timeoutSec = 10) { await this.connect(); - const transform = this.options.message?.transform ?? DefaultOptions.message.transform; - const transformed = await transform(payload); - - const encode = this.options.message?.encode ?? DefaultOptions.message.encode; - const encoded = await encode(transformed); + const { modify, encode, decode, match } = this.options.message; + const modified = await modify(payload); + const encoded = await encode(modified); return new Promise((resolve, reject) => { const listener = async (message: RawData) => { - const decode = this.options.message?.decode ?? DefaultOptions.message.decode; - const decoded = await decode(message).then(undefined, reject); - - const identify = this.options.message?.identify ?? DefaultOptions.message.identify; - const identified = await identify(transformed, decoded).then(undefined, reject); - - if (identified) { - resolve(decoded); - this.#websocket?.off("message", listener); - } + await decode(message).then(decoded => { + return match(modified, decoded).then(matched => { + if (!matched) return; + resolve(decoded); + this.#websocket?.removeListener("message", listener); + }); + }, reject); }; - this.#websocket?.on("message", listener); - // @ts-ignore + this.#websocket?.addListener("message", listener); this.#websocket?.send(encoded); - setTimeout(() => reject(new Error("Таймаут отправки")), timeoutSec * 1000); + + setTimeout(() => { + reject(new Error("Ошибка отправки: таймаут")); + this.#websocket?.removeListener("message", listener); + }, timeoutSec * 1000); }); } #heartbeat() { - const timeoutSec = this.options.heartbeat ?? DefaultOptions.heartbeat; + clearTimeout(this.#heartbeatTimeout); + const timeoutSec = this.options.heartbeat; if (timeoutSec <= 0) return; - clearTimeout(this.#heartbeatTimeout); - this.#heartbeatTimeout = setTimeout(async () => { - await this.#reconnect().catch(console.error); + this.#heartbeatTimeout = setTimeout(() => { + this.#websocket?.terminate(); + this.#reconnect(); }, timeoutSec * 1000); } - async #reconnect() { + #reconnect() { clearTimeout(this.#reconnectTimeout); - const delay = Math.min(1000 * 2 ** (this.#reconnectAttempt - 1), 30000); - const jitter = Math.random() * 1000; + const timeoutMs = Math.min(1000 * 2 ** this.#reconnectAttempt, 30000); this.#reconnectAttempt += 1; this.#reconnectTimeout = setTimeout(async () => { await this.connect().catch(console.error); - }, delay + jitter); + }, timeoutMs); } #cleanup() { - if (this.#websocket) { - this.#websocket.removeAllListeners(); - this.#websocket = undefined; - } clearTimeout(this.#heartbeatTimeout); clearTimeout(this.#reconnectTimeout); + this.#websocket?.removeAllListeners(); + this.#websocket = undefined; } }