Skip to content

Commit

Permalink
Обновил сокет
Browse files Browse the repository at this point in the history
  • Loading branch information
kraineff committed Feb 14, 2025
1 parent 4954dab commit 08e9abe
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 91 deletions.
20 changes: 10 additions & 10 deletions drivers/Device.mts
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,25 @@ export default class Device extends Homey.Device {
if (trackId !== this.#lastTrackId) {
const track = await this.#yandex.api.music.getTrack(trackId).catch(() => undefined);
this.#lastTrackId = track?.id || trackId;
this.#lastTrackAlbumId = track?.albums?.[0]?.id || undefined;
this.#lastTrackLyrics = [];
this.#lastTrackAlbumId = track?.albums?.[0]?.id;

const [likes, dislikes] = await Promise.all([
this.#yandex.api.music.getLikes(this.#userId),
this.#yandex.api.music.getDislikes(this.#userId),
this.updateTrackCover(state, track),
this.updateTrackLyrics(state, track)
this.updateCover(state, track)
]);

if (track) await this.updateLyrics(track);

capabilities.speaker_track = track?.title || state.playerState?.title || "";
capabilities.speaker_artist = track?.artists?.map((a) => a.name)?.join(", ") || state.playerState?.subtitle || "";
capabilities.speaker_album = track?.albums?.[0]?.title || state.playerState?.playlistId || "";
capabilities.media_like = !!likes.find(like => like.id === track?.id);
capabilities.media_dislike = !!dislikes.find(dislike => dislike.id === track?.id);
}

await this.handleLyricsSync(state);
await this.handleAliceState(state);
await this.handleLyricsSync(state);

await Promise.all(
Object.entries(capabilities).map(async ([capability, value]) => {
Expand All @@ -226,7 +226,7 @@ export default class Device extends Homey.Device {
);
};

private async updateTrackCover(state: Types.GlagolState, track?: any) {
private async updateCover(state: Types.GlagolState, track?: any) {
const trackImage = track?.coverUri || track?.ogImage || state.playerState?.extra?.coverURI || "";
const imageQuality = this.getSetting("image_quality") || 500;

Expand All @@ -236,12 +236,12 @@ export default class Device extends Homey.Device {
await this.#image.update();
}

private async updateTrackLyrics(state: Types.GlagolState, track?: any) {
const trackId = state.playerState?.id || track?.id;
private async updateLyrics(track: Types.MusicTrack) {
let lyricsValues = [{ id: "none", title: "Нет текста песни" }];


this.#lastTrackLyrics = [];
if (track?.lyricsInfo?.hasAvailableSyncLyrics) {
const lyrics = await this.#yandex.api.music.getLyrics(trackId).catch(() => "");
const lyrics = await this.#yandex.api.music.getLyrics(track.id).catch(() => "");
const lyricsLines = lyrics.split("\n");
const values = lyricsLines.map(line => {
const time = line.split("[")[1].split("] ")[0];
Expand Down
4 changes: 2 additions & 2 deletions library/client/home/devices/media.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ export class YandexMediaDevice extends EventEmitter {
options: { rejectUnauthorized: false },
heartbeat: 10,
message: {
transform: async (payload) => ({
modify: async (payload) => ({
id: randomUUID(),
sentTime: Date.now(),
conversationToken: this.#conversationToken,
payload,
}),
encode: async (payload) => JSON.stringify(payload),
decode: async (message) => parseJson(message.toString()),
identify: async (payload, message) =>
match: async (payload, message) =>
message.requestId === payload.id && message.requestSentTime === payload.sentTime,
},
});
Expand Down
159 changes: 80 additions & 79 deletions library/utils/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,59 @@ export type Options = {
options?: ClientOptions | ClientRequestArgs;
closeCodes?: number[];
heartbeat?: number;
message?: {
transform?: (payload: any) => PromiseLike<any>;
encode?: (payload: any) => PromiseLike<any>;
decode?: (message: Buffer | ArrayBuffer | Buffer[]) => PromiseLike<any>;
identify?: (payload: any, message: any) => PromiseLike<boolean>;
};
message?: MessageOptions;
};

const DefaultOptions = {
closeCodes: [1000, 1005, 1006],
heartbeat: 0,
message: {
transform: async (payload: any) => payload,
encode: async (payload: any) => payload,
decode: async (message: Buffer | ArrayBuffer | Buffer[]) => message,
identify: async () => true,
},
};
export type MessageOptions = {
modify?: (payload: any) => PromiseLike<any>;
encode?: (payload: any) => PromiseLike<any>;
decode?: (message: Buffer | ArrayBuffer | Buffer[]) => PromiseLike<any>;
match?: (payload: any, message: any) => PromiseLike<boolean>;
}

type ReconnectSocketOptions = Required<Options & { message: Required<MessageOptions> }>;

export class ReconnectSocket extends EventEmitter {
#websocket?: WebSocket;
#connectPromise?: Promise<void>;
#websocketConnect?: Promise<void>;
#websocketDisconnect?: Promise<void>;
#heartbeatTimeout?: NodeJS.Timeout;
#reconnectTimeout?: NodeJS.Timeout;
#reconnectAttempt = 0;

constructor(public options: Options) {
options: ReconnectSocketOptions;

constructor(options: Options) {
super();
this.options = {
protocols: [],
options: {},
closeCodes: [1000, 1005, 1006],
heartbeat: 0,
message: {
modify: async (payload) => payload,
encode: async (payload) => payload,
decode: async (message) => message,
match: async () => true,
...options.message
},
...options
} as ReconnectSocketOptions;
}

async connect(timeoutSec = 10) {
if (this.#connectPromise) return this.#connectPromise;
if (this.#websocket?.readyState === WebSocket.OPEN) return Promise.resolve();
if (this.#websocketConnect) return this.#websocketConnect;
if (this.#websocket?.readyState === WebSocket.OPEN) return;

this.#connectPromise = (async () => {
this.#websocketConnect = (async () => {
try {
const address = await this.options.address();
const { protocols, options } = this.options;

await new Promise<void>((resolve, reject) => {
this.#websocket = new WebSocket(address, protocols, options);
this.#websocket.on("ping", () => this.#heartbeat());
this.#websocket.on("error", console.error);
this.#websocket.on("error", reject);

this.#websocket.once("open", async () => {
this.#reconnectAttempt = 0;
Expand All @@ -59,106 +69,97 @@ export class ReconnectSocket extends EventEmitter {
resolve();
});

this.#websocket.on("close", async (code, reason) => {
if (this.#reconnectAttempt === 0) {
this.emit("disconnect");
return reject("Ошибка подключения");
}

const closeCodes = this.options.closeCodes ?? DefaultOptions.closeCodes;
if (this.#reconnectAttempt > 3 || closeCodes.includes(code)) {
this.#cleanup();
this.emit("disconnect");
}
await this.#reconnect();
this.#websocket.once("close", async (code) => {
if (!this.options.closeCodes.includes(code)) return this.#reconnect();
this.#cleanup();
this.emit("disconnect");
reject("Ошибка подключения");
});

this.#websocket.on("message", async (message) => {
this.#heartbeat();

// Декодируем сообщение или возвращаем оригинал
const decode = this.options.message?.decode ?? DefaultOptions.message.decode;
await decode(message).then((decoded) => this.emit("message", decoded), console.error);
await this.options.message.decode(message)
.then((decoded) => this.emit("message", decoded), console.error);
});

setTimeout(() => reject(new Error("Таймаут подключения")), timeoutSec * 1000);
setTimeout(() => reject(new Error("Ошибка подключения: таймаут")), timeoutSec * 1000);
});
} finally {
this.#connectPromise = undefined;
this.#websocketConnect = undefined;
}
})();

return this.#connectPromise.catch((error) => {
this.#cleanup();
return Promise.reject(error);
});
return this.#websocketConnect;
}

async disconnect() {
return new Promise<void>((resolve) => {
if (!this.#websocket) return resolve();
this.#websocket.once("close", resolve);
this.#websocket.close(1000);
});
if (this.#websocketDisconnect) return this.#websocketDisconnect;
if (!this.#websocket || this.#websocket.readyState === WebSocket.CLOSED) return;

this.#websocketDisconnect = (async () => {
await new Promise<void>((resolve) => {
this.once("disconnect", resolve);
this.#websocket?.close(1000);
});
this.#websocketDisconnect = undefined;
})();

return this.#websocketDisconnect;
}

async send(payload: any, timeoutSec = 10) {
await this.connect();

const transform = this.options.message?.transform ?? DefaultOptions.message.transform;
const transformed = await transform(payload);

const encode = this.options.message?.encode ?? DefaultOptions.message.encode;
const encoded = await encode(transformed);
const { modify, encode, decode, match } = this.options.message;
const modified = await modify(payload);
const encoded = await encode(modified);

return new Promise<any>((resolve, reject) => {
const listener = async (message: RawData) => {
const decode = this.options.message?.decode ?? DefaultOptions.message.decode;
const decoded = await decode(message).then(undefined, reject);

const identify = this.options.message?.identify ?? DefaultOptions.message.identify;
const identified = await identify(transformed, decoded).then(undefined, reject);

if (identified) {
resolve(decoded);
this.#websocket?.off("message", listener);
}
await decode(message).then(decoded => {
return match(modified, decoded).then(matched => {
if (!matched) return;
resolve(decoded);
this.#websocket?.removeListener("message", listener);
});
}, reject);
};

this.#websocket?.on("message", listener);
// @ts-ignore
this.#websocket?.addListener("message", listener);
this.#websocket?.send(encoded);
setTimeout(() => reject(new Error("Таймаут отправки")), timeoutSec * 1000);

setTimeout(() => {
reject(new Error("Ошибка отправки: таймаут"));
this.#websocket?.removeListener("message", listener);
}, timeoutSec * 1000);
});
}

#heartbeat() {
const timeoutSec = this.options.heartbeat ?? DefaultOptions.heartbeat;
clearTimeout(this.#heartbeatTimeout);
const timeoutSec = this.options.heartbeat;
if (timeoutSec <= 0) return;

clearTimeout(this.#heartbeatTimeout);
this.#heartbeatTimeout = setTimeout(async () => {
await this.#reconnect().catch(console.error);
this.#heartbeatTimeout = setTimeout(() => {
this.#websocket?.terminate();
this.#reconnect();
}, timeoutSec * 1000);
}

async #reconnect() {
#reconnect() {
clearTimeout(this.#reconnectTimeout);
const delay = Math.min(1000 * 2 ** (this.#reconnectAttempt - 1), 30000);
const jitter = Math.random() * 1000;
const timeoutMs = Math.min(1000 * 2 ** this.#reconnectAttempt, 30000);

this.#reconnectAttempt += 1;
this.#reconnectTimeout = setTimeout(async () => {
await this.connect().catch(console.error);
}, delay + jitter);
}, timeoutMs);
}

#cleanup() {
if (this.#websocket) {
this.#websocket.removeAllListeners();
this.#websocket = undefined;
}
clearTimeout(this.#heartbeatTimeout);
clearTimeout(this.#reconnectTimeout);
this.#websocket?.removeAllListeners();
this.#websocket = undefined;
}
}

0 comments on commit 08e9abe

Please sign in to comment.