Skip to content

Commit

Permalink
impl channel refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
shinyoshiaki committed May 21, 2024
1 parent 85946c0 commit ffea3b6
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 19 deletions.
12 changes: 9 additions & 3 deletions packages/ice/examples/turn_turn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ const password = "";

turn1.turn.onData.subscribe((data, addr) => {
console.log("turn1 onData", data.toString(), addr);
turn1.sendData(Buffer.from("pong"), turn2.turn.mappedAddress);
turn1.sendData(
Buffer.from("pong " + new Date().toISOString()),
turn2.turn.relayedAddress,
);
});
turn2.turn.onData.subscribe((data, addr) => {
console.log("turn2 onData", data.toString(), addr);
});

setInterval(() => {
turn2.sendData(Buffer.from("ping"), turn1.turn.mappedAddress);
}, 1000);
turn2.sendData(
Buffer.from("ping " + new Date().toISOString()),
turn1.turn.relayedAddress,
);
}, 3000);
})();
73 changes: 57 additions & 16 deletions packages/ice/src/turn/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import PCancelable from "p-cancelable";
import Event from "rx.mini";
import { setTimeout } from "timers/promises";

import { int } from "../../../common/src";
import type { InterfaceAddresses } from "../../../common/src/network";
import type { Candidate } from "../candidate";
import { TransactionFailed } from "../exceptions";
Expand All @@ -18,6 +19,7 @@ import type { Address, Protocol } from "../types/model";

const log = debug("werift-ice:packages/ice/src/turn/protocol.ts");

const DEFAULT_CHANNEL_REFRESH_TIME = 500;
const DEFAULT_ALLOCATION_LIFETIME = 600;
const TCP_TRANSPORT = 0x06000000;
const UDP_TRANSPORT = 0x11000000;
Expand Down Expand Up @@ -94,19 +96,28 @@ class TurnClient implements Protocol {
mappedAddress!: Address;
refreshHandle?: Future;
channelNumber = 0x4000;
channel?: { number: number; address: Address };
channelByAddr: { [addr: string]: { number: number; address: Address } } = {};
addrByChannel: { [channel: number]: Address } = {};
localCandidate!: Candidate;
/**sec */
channelRefreshTime =
this.options.channelRefreshTime ?? DEFAULT_CHANNEL_REFRESH_TIME;

onDatagramReceived: (data: Buffer, addr: Address) => void = () => {};

private channelBinding?: Promise<void>;
channelRefreshAt = 0;

constructor(
public server: Address,
public username: string,
public password: string,
public lifetime: number,
public transport: Transport,
public options: {
/**sec */
channelRefreshTime?: number;
} = {},
) {}

async connectionMade() {
Expand All @@ -116,12 +127,13 @@ class TurnClient implements Protocol {
}

private handleChannelData(data: Buffer) {
const [, length] = jspack.Unpack("!HH", data.slice(0, 4));
const [channel, length] = jspack.Unpack("!HH", data.slice(0, 4));
const addr = this.addrByChannel[channel];

if (this.channel?.address) {
const payload = data.slice(4, 4 + length);
this.onDatagramReceived(payload, this.channel.address);
this.onData.execute(payload, this.channel.address);
if (addr) {
const payload = data.subarray(4, 4 + length);
this.onDatagramReceived(payload, addr);
this.onData.execute(payload, addr);
}
}

Expand Down Expand Up @@ -174,6 +186,7 @@ class TurnClient implements Protocol {
this.relayedAddress = response.getAttributeValue("XOR-RELAYED-ADDRESS");
this.mappedAddress = response.getAttributeValue("XOR-MAPPED-ADDRESS");
const exp = response.getAttributeValue("LIFETIME");
log("connect", this.relayedAddress, this.mappedAddress, { exp });

this.refreshHandle = future(this.refresh(exp));
}
Expand Down Expand Up @@ -202,14 +215,20 @@ class TurnClient implements Protocol {

while (run) {
// refresh before expire
await setTimeout((5 / 6) * exp * 1000);
const delay = (5 / 6) * exp * 1000;
log("refresh delay", delay, { exp });
await setTimeout(delay);

const request = new Message(methods.REFRESH, classes.REQUEST);
request.setAttribute("LIFETIME", exp);

await this.requestWithRetry(request, this.server).catch((e) => {
log("refresh error", e);
});
try {
const [message] = await this.requestWithRetry(request, this.server);
exp = message.getAttributeValue("LIFETIME");
log("refresh", { exp });
} catch (error) {
log("refresh error", error);
}
}
});

Expand Down Expand Up @@ -297,26 +316,48 @@ class TurnClient implements Protocol {
if (this.channelBinding) {
await this.channelBinding;
}
if (!this.channel) {
this.channel = { number: this.channelNumber++, address: addr };

this.channelBinding = this.channelBind(this.channel.number, addr);
let channel = this.channelByAddr[addr.join("")];

if (!channel) {
this.channelByAddr[addr.join("")] = {
number: this.channelNumber++,
address: addr,
};
channel = this.channelByAddr[addr.join("")];
this.addrByChannel[channel.number] = addr;

this.channelBinding = this.channelBind(channel.number, addr);
await this.channelBinding.catch((e) => {
log("channelBind error", e);
throw e;
});
this.channelRefreshAt = int(Date.now() / 1000) + this.channelRefreshTime;
this.channelBinding = undefined;
log("channelBind", channel);
} else if (this.channelRefreshAt < int(Date.now() / 1000)) {
this.channelBinding = this.channelBind(channel.number, addr);
this.channelRefreshAt = int(Date.now() / 1000) + this.channelRefreshTime;
await this.channelBinding.catch((e) => {
// [
// 400,
// "You cannot use the same channel number with different peer\u0000\u0000",
// ]
log("channelBind error", e);
throw e;
});
this.channelBinding = undefined;
log("channelBind", this.channel);
log("channelBind refresh", channel);
}
return this.channel;
return channel;
}

private async channelBind(channelNumber: number, addr: Address) {
const request = new Message(methods.CHANNEL_BIND, classes.REQUEST);
request
.setAttribute("CHANNEL-NUMBER", channelNumber)
.setAttribute("XOR-PEER-ADDRESS", addr);
const [response] = await this.request(request, this.server);
const [response] = await this.requestWithRetry(request, this.server);
if (response.messageMethod !== methods.CHANNEL_BIND) {
throw new Error("should be CHANNEL_BIND");
}
Expand Down

0 comments on commit ffea3b6

Please sign in to comment.