Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing network authentication and segregation logic to the nodes domain #804

Draft
wants to merge 12 commits into
base: staging
Choose a base branch
from
3 changes: 2 additions & 1 deletion src/client/ClientService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class ClientService {
const conn = evt.detail;
const streamHandler = (evt: wsEvents.EventWebSocketConnectionStream) => {
const stream = evt.detail;
// If the RPCServer is stopping or stopped then we want to reject new streams outright
if (!this.rpcServer[running] || this.rpcServer[status] === 'stopping') {
stream.cancel(Error('TMP RPCServer not running'));
stream.cancel(new errors.ErrorClientServiceNotRunning());
return;
}
this.rpcServer.handleStream(stream);
Expand Down
98 changes: 62 additions & 36 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,18 @@ const abortPendingConnectionsReason = Symbol(
'abort pending connections reason',
);

const timerCancellationReason = Symbol('timer cancellation reason');

const activePunchCancellationReason = Symbol(
'active punch cancellation reason',
);

/**
* NodeConnectionManager is a server that manages all node connections.
* It manages both initiated and received connections.
*
* It acts like a phone call system.
* It can maintain mulitple calls to other nodes.
* It can maintain multiple calls to other nodes.
* There's no guarantee that we need to make it.
*
* Node connections make use of the QUIC protocol.
Expand Down Expand Up @@ -144,7 +150,7 @@ class NodeConnectionManager {
public readonly connectionHolePunchIntervalTime: number;

/**
* Max parse buffer size before RPC parser throws an parse error.
* Max parse buffer size before RPC parser throws a parse error.
*/
public readonly rpcParserBufferSize: number;

Expand Down Expand Up @@ -187,8 +193,8 @@ class NodeConnectionManager {
protected quicServer: QUICServer;

/**
* Data structure to store all NodeConnections. If a connection to a node n does
* not exist, no entry for n will exist in the map. Alternatively, if a
* Data structure to store all NodeConnections. If a connection to a node `N` does
* not exist, no entry for `N` will exist in the map. Alternatively, if a
* connection is currently being instantiated by some thread, an entry will
* exist in the map, but only with the lock (no connection object). Once a
* connection is instantiated, the entry in the map is updated to include the
Expand Down Expand Up @@ -244,7 +250,7 @@ class NodeConnectionManager {
const connectionAndTimer = connectionsEntry.connections[connectionId];
if (connectionAndTimer == null) utils.never('should have a connection');
connectionAndTimer.usageCount += 1;
connectionAndTimer.timer?.cancel();
connectionAndTimer.timer?.cancel(timerCancellationReason);
connectionAndTimer.timer = null;
void stream.closedP.finally(() => {
connectionAndTimer.usageCount -= 1;
Expand All @@ -262,6 +268,8 @@ class NodeConnectionManager {
await this.destroyConnection(nodeId, false, connectionId),
delay,
});
// Prevent unhandled exceptions when cancelling
connectionAndTimer.timer.catch(() => {});
}
});
};
Expand All @@ -282,7 +290,7 @@ class NodeConnectionManager {
};

/**
* Redispatches `QUICSOcket` or `QUICServer` error events as `NodeConnectionManager` error events.
* Redispatches `QUICSocket` or `QUICServer` error events as `NodeConnectionManager` error events.
* This should trigger the destruction of the `NodeConnection` through the
* `EventNodeConnectionError` -> `EventNodeConnectionClose` event path.
*/
Expand All @@ -298,7 +306,7 @@ class NodeConnectionManager {

/**
* Handle unexpected stoppage of the QUICSocket. Not expected to happen
* without error but we have it just in case.
* without error, but we have it just in case.
*/
protected handleEventQUICSocketStopped = (
_evt: quicEvents.EventQUICSocketStopped,
Expand All @@ -313,7 +321,7 @@ class NodeConnectionManager {

/**
* Handle unexpected stoppage of the QUICServer. Not expected to happen
* without error but we have it just in case.
* without error, but we have it just in case.
*/
protected handleEventQUICServerStopped = (
_evt: quicEvents.EventQUICServerStopped,
Expand All @@ -328,8 +336,8 @@ class NodeConnectionManager {

/**
* Handles `EventQUICServerConnection` events. These are reverser or server
* peer initated connections that needs to be handled and added to the
* connectio map.
* peer initiated connections that needs to be handled and added to the
* connection map.
*/
protected handleEventQUICServerConnection = (
evt: quicEvents.EventQUICServerConnection,
Expand Down Expand Up @@ -372,7 +380,6 @@ class NodeConnectionManager {
.nodesConnectionHolePunchIntervalTime,
rpcParserBufferSize = config.defaultsSystem.rpcParserBufferSize,
rpcCallTimeoutTime = config.defaultsSystem.rpcCallTimeoutTime,

logger,
}: {
keyRing: KeyRing;
Expand Down Expand Up @@ -571,25 +578,26 @@ class NodeConnectionManager {
);
this.quicSocket.removeEventListener(EventAll.name, this.handleEventAll);

const destroyProms: Array<Promise<void>> = [];
const destroyConnectionPs: Array<Promise<void>> = [];
const cancelSignallingPs: Array<PromiseCancellable<void> | Promise<void>> =
[];
for (const [nodeId] of this.connections) {
// It exists so we want to destroy it
const destroyProm = this.destroyConnection(
IdInternal.fromString<NodeId>(nodeId),
force,
);
destroyProms.push(destroyProm);
destroyConnectionPs.push(destroyProm);
}
await Promise.all(destroyProms);
const signallingProms: Array<PromiseCancellable<void> | Promise<void>> = [];
for (const [, activePunch] of this.activeHolePunchPs) {
signallingProms.push(activePunch);
activePunch.cancel();
cancelSignallingPs.push(activePunch);
activePunch.cancel(activePunchCancellationReason);
}
for (const activeSignal of this.activeSignalFinalPs) {
signallingProms.push(activeSignal);
cancelSignallingPs.push(activeSignal);
}
await Promise.allSettled(signallingProms);
await Promise.all(destroyConnectionPs);
await Promise.allSettled(cancelSignallingPs);
await this.quicServer.stop({ force: true });
await this.quicSocket.stop({ force: true });
await this.rpcServer.stop({ force: true });
Expand Down Expand Up @@ -628,7 +636,7 @@ class NodeConnectionManager {

// Increment usage count, and cancel timer
connectionAndTimer.usageCount += 1;
connectionAndTimer.timer?.cancel();
connectionAndTimer.timer?.cancel(timerCancellationReason);
connectionAndTimer.timer = null;
// Return tuple of [ResourceRelease, Resource]
return [
Expand All @@ -648,9 +656,15 @@ class NodeConnectionManager {
);
connectionAndTimer.timer = new Timer({
handler: async () =>
await this.destroyConnection(targetNodeId, false),
await this.destroyConnection(
targetNodeId,
false,
connectionAndTimer.connection.connectionId,
),
delay,
});
// Prevent unhandled exceptions when cancelling
connectionAndTimer.timer.catch(() => {});
}
},
connectionAndTimer.connection,
Expand Down Expand Up @@ -693,7 +707,7 @@ class NodeConnectionManager {
): AsyncGenerator<T, TReturn, TNext> {
const acquire = this.acquireConnection(targetNodeId);
const [release, conn] = await acquire();
let caughtError;
let caughtError: Error | undefined;
try {
if (conn == null) utils.never('NodeConnection should exist');
return yield* g(conn);
Expand Down Expand Up @@ -924,6 +938,8 @@ class NodeConnectionManager {
await this.destroyConnection(nodeId, false, connectionId),
delay: this.getStickyTimeoutValue(nodeId, true),
});
// Prevent unhandled exceptions when cancelling
newConnAndTimer.timer.catch(() => {});
entry = {
activeConnection: connectionId,
connections: {
Expand All @@ -940,6 +956,8 @@ class NodeConnectionManager {
entry.activeConnection > connectionId,
),
});
// Prevent unhandled exceptions when cancelling
newConnAndTimer.timer.catch(() => {});
// Updating existing entry
entry.connections[connectionId] = newConnAndTimer;
// If the new connection ID is less than the old then replace it
Expand Down Expand Up @@ -991,7 +1009,8 @@ class NodeConnectionManager {
);
destroyPs.push(connAndTimer.connection.destroy({ force }));
// Destroying TTL timer
if (connAndTimer.timer != null) connAndTimer.timer.cancel();
connAndTimer.timer?.cancel(timerCancellationReason);
connAndTimer.timer = null;
delete connections[connectionId];
}
}
Expand All @@ -1013,7 +1032,7 @@ class NodeConnectionManager {
/**
* Will determine how long to keep a node around for.
*
* Timeout is scaled linearly from 1 min to 2 hours based on it's bucket.
* Timeout is scaled linearly from 1 min to 2 hours based on its bucket.
* The value will be symmetric for two nodes,
* they will assign the same timeout for each other.
*/
Expand Down Expand Up @@ -1129,7 +1148,7 @@ class NodeConnectionManager {
try {
while (true) {
const message = keysUtils.getRandomBytes(32);
// Since the intention is to abstract away the success/failure of the holepunch operation,
// Since the intention is to abstract away the success/failure of the hole-punch operation,
// We should catch any errors thrown out of this, as the caller does not expect the method to throw
await this.quicSocket
.send(Buffer.from(message), port, host)
Expand Down Expand Up @@ -1215,7 +1234,7 @@ class NodeConnectionManager {
* Will validate the message, and initiate hole punching in the background and return immediately.
* Attempts to the same host and port are coalesced.
* Attempts to the same host are limited by a semaphore.
* Active attempts are tracked inside of the `activeHolePunchPs` set and are cancelled and awaited when the
* Active attempts are tracked inside the `activeHolePunchPs` set and are cancelled and awaited when the
* `NodeConnectionManager` stops.
*/
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
Expand All @@ -1231,17 +1250,24 @@ class NodeConnectionManager {
}
const holePunchAttempt = new PromiseCancellable<void>(
async (res, rej, signal) => {
await semaphore!.withF(async () => {
this.holePunch(host, port, { signal })
.finally(() => {
this.activeHolePunchPs.delete(id);
if (semaphore!.count === 0) {
this.activeHolePunchAddresses.delete(host);
}
})
.then(res, rej);
});
await semaphore!
.withF(async () => {
await this.holePunch(host, port, { signal });
})
.finally(() => {
this.activeHolePunchPs.delete(id);
if (semaphore!.count === 0) {
this.activeHolePunchAddresses.delete(host);
}
})
.then(res, rej);
},
).finally(() => {
this.activeHolePunchPs.delete(id);
});
holePunchAttempt.then(
() => {},
() => {},
);
this.activeHolePunchPs.set(id, holePunchAttempt);
}
Expand Down
Loading
Loading