Skip to content

Commit

Permalink
Merge pull request #284 from MatrixAI/nodeconnection-objectmap
Browse files Browse the repository at this point in the history
Changing `getConnectionToNode` structure, removing `ForwardProxy.openConnection` from `NodeConnection`. Fixes #282
  • Loading branch information
joshuakarp authored Nov 9, 2021
2 parents 2732d79 + bcfe675 commit 475266f
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/agent/GRPCClientAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class GRPCClientAgent extends GRPCClient<AgentServiceClient> {

public async start({
tlsConfig,
timeout = Infinity,
timeout = 25000,
}: {
tlsConfig?: TLSConfig;
timeout?: number;
Expand Down
2 changes: 1 addition & 1 deletion src/grpc/GRPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class GRPCClient<T extends Client> {
tlsConfig,
secure,
session,
timeout = Infinity,
timeout = 25000,
}: {
clientConstructor: new (
address: string,
Expand Down
25 changes: 16 additions & 9 deletions src/nodes/NodeConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import * as claimsUtils from '../claims/utils';
import * as claimsErrors from '../claims/errors';
import * as keysUtils from '../keys/utils';
import * as vaultsUtils from '../vaults/utils';
import * as grpcErrors from '../grpc/errors';
import { GRPCClientAgent } from '../agent';
import * as utilsPB from '../proto/js/polykey/v1/utils/utils_pb';
import * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb';
Expand Down Expand Up @@ -148,16 +149,14 @@ class NodeConnection {
Buffer.from(egressAddress),
);
// 2. Ask fwdProxy for connection to target (the revProxy of other node)
// 2. Start sending hole-punching packets to the target (via the client start -
// this establishes a HTTP CONNECT request with the forward proxy)
// 3. Relay the egress port to the broker/s (such that they can inform the other node)
// 4. Start sending hole-punching packets to other node (done in openConnection())
// Done in parallel
try {
await Promise.all([
this.fwdProxy.openConnection(
this.targetNodeId,
this.ingressHost,
this.ingressPort,
),
this.client.start(),
Array.from(brokerConnections, ([_, conn]) =>
conn.sendHolePunchMessage(
this.keyManager.getNodeId(),
Expand All @@ -169,12 +168,14 @@ class NodeConnection {
]);
} catch (e) {
await this.stop();
// If we catch an error, re-throw it to handle it.
// If the connection times out, re-throw this with a higher level nodes exception
if (e instanceof grpcErrors.ErrorGRPCClientTimeout) {
throw new nodesErrors.ErrorNodeConnectionTimeout();
}
throw e;
}
// 5. When finished, you have a connection to other node
// Then you can create/start the GRPCClient, and perform the request
await this.client.start({});
// The GRPCClient is ready to be used for requests
this.logger.info(
`Started NodeConnection from ${this.keyManager.getNodeId()} to ${
this.targetNodeId
Expand All @@ -183,8 +184,14 @@ class NodeConnection {
}

public async stop() {
this.logger.info('Stopping NodeConnection');
await this.client.stop();
await this.fwdProxy.closeConnection(this.ingressHost, this.ingressPort);
// Await this.fwdProxy.closeConnection(this.ingressHost, this.ingressPort);
this.logger.info(
`Stopped NodeConnection from ${this.keyManager.getNodeId()} to ${
this.targetNodeId
}`,
);
}

public async destroy() {
Expand Down
91 changes: 50 additions & 41 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import * as claimsUtils from '../claims/utils';
import { GRPCClientAgent } from '../agent';
import * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb';
import { ForwardProxy, ReverseProxy } from '../network';
import { Mutex } from 'async-mutex';
import { Mutex, MutexInterface } from 'async-mutex';
import {
CreateDestroyStartStop,
ready,
Expand Down Expand Up @@ -419,68 +419,77 @@ class NodeManager {
/**
* Treat this node as the client, and attempt to create/retrieve an existing
* undirectional connection to another node (server).
* ObjectMap pattern adapted from:
* https://gist.github.com/CMCDragonkai/f58f08e7eaab0430ed4467ca35527a42
*/
@ready(new nodesErrors.ErrorNodeManagerNotStarted())
public async getConnectionToNode(
targetNodeId: NodeId,
): Promise<NodeConnection> {
const connLock = this.connections.get(targetNodeId);
// If there's already an entry in the map, we have 2 cases:
// 1. The connection already exists
// 2. The connection is currently being created by another concurrent thread
if (connLock != null) {
// Return the connection if it already exists
if (connLock.connection != null) {
return connLock.connection;
let connection: NodeConnection | undefined;
let lock: MutexInterface;
let connAndLock = this.connections.get(targetNodeId);
if (connAndLock != null) {
({ connection, lock } = connAndLock);
if (connection != null) {
return connection;
}
// Otherwise, it's expected to be currently being created by some other thread
// Wait for the lock to release
let release;
try {
release = await connLock.lock.acquire();
release = await lock.acquire();
({ connection, lock } = connAndLock);
if (connection != null) {
return connection;
}
connection = await this.establishNodeConnection(targetNodeId, lock);
connAndLock.connection = connection;
return connection;
} finally {
release();
}
// Once the lock is released, then it's sufficient to recursively call the
// function. It will most likely enter the case where we already have an
// entry in the map (or, an error occurred, and the entry is removed - in
// which case, this thread will create the connection).
return await this.getConnectionToNode(targetNodeId);

// Otherwise, we need to create an entry
} else {
const lock = new Mutex();
this.connections.set(targetNodeId, { lock });
lock = new Mutex();
connAndLock = { lock };
this.connections.set(targetNodeId, connAndLock);
let release;
try {
release = await lock.acquire();
const targetAddress = await this.findNode(targetNodeId);
const connection = await NodeConnection.createNodeConnection({
targetNodeId: targetNodeId,
targetHost: targetAddress.ip,
targetPort: targetAddress.port,
forwardProxy: this.fwdProxy,
keyManager: this.keyManager,
logger: this.logger,
});
await connection.start({
brokerConnections: this.brokerNodeConnections,
});
// Add it to the map of active connections
this.connections.set(targetNodeId, { connection, lock });
connection = await this.establishNodeConnection(targetNodeId, lock);
connAndLock.connection = connection;
return connection;
} catch (e) {
// We need to make sure to delete any added lock if we encounter an error
// Otherwise, we can enter a state where we have a lock in the map, but
// no NodeConnection being created
this.connections.delete(targetNodeId);
throw e;
} finally {
release();
}
}
}

/**
* Strictly a helper function for this.getConnectionToNode. Do not call this
* function anywhere else.
* To create a connection to a node, always use getConnectionToNode.
*/
@ready(new nodesErrors.ErrorNodeManagerNotStarted())
protected async establishNodeConnection(
targetNodeId: NodeId,
lock: MutexInterface,
): Promise<NodeConnection> {
const targetAddress = await this.findNode(targetNodeId);
const connection = await NodeConnection.createNodeConnection({
targetNodeId: targetNodeId,
targetHost: targetAddress.ip,
targetPort: targetAddress.port,
forwardProxy: this.fwdProxy,
keyManager: this.keyManager,
logger: this.logger,
});
await connection.start({
brokerConnections: this.brokerNodeConnections,
});
// Add it to the map of active connections
this.connections.set(targetNodeId, { connection, lock });
return connection;
}

/**
* Create and start a connection to a broker node. Assumes that a direct
* connection to the broker can be established (i.e. no hole punching required).
Expand Down
5 changes: 5 additions & 0 deletions src/nodes/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class ErrorNodeConnectionNotStarted extends ErrorNodes {}

class ErrorNodeConnectionDestroyed extends ErrorNodes {}

class ErrorNodeConnectionTimeout extends ErrorNodes {
description: 'A node connection could not be established (timed out)';
}

class ErrorNodeConnectionNotExist extends ErrorNodes {}

class ErrorNodeConnectionInfoNotExist extends ErrorNodes {}
Expand Down Expand Up @@ -51,6 +55,7 @@ export {
ErrorNodeGraphInvalidBucketIndex,
ErrorNodeConnectionNotStarted,
ErrorNodeConnectionDestroyed,
ErrorNodeConnectionTimeout,
ErrorNodeConnectionNotExist,
ErrorNodeConnectionInfoNotExist,
ErrorNodeConnectionPublicKeyNotFound,
Expand Down
6 changes: 3 additions & 3 deletions tests/bin/nodes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ describe('CLI Nodes', () => {
expect(result2.code).toBe(1); // Should fail with no response. for automation purposes.
expect(result2.stdout).toContain('No response received');
},
global.failedConnectionTimeout,
global.failedConnectionTimeout * 2,
);
test(
"Should return failure if can't find the node",
Expand All @@ -223,7 +223,7 @@ describe('CLI Nodes', () => {
expect(result2.stdout).toContain('message');
expect(result2.stdout).toContain('Failed to resolve node ID');
},
global.failedConnectionTimeout,
global.failedConnectionTimeout * 2,
);
test('Should return success when pinging a live node', async () => {
const commands = genCommands(['ping', remoteOnlineNodeId]);
Expand Down Expand Up @@ -333,7 +333,7 @@ describe('CLI Nodes', () => {
expect(result2.stdout).toContain('success');
expect(result2.stdout).toContain('false');
},
global.failedConnectionTimeout,
global.failedConnectionTimeout * 2,
);
});
describe('commandAddNode', () => {
Expand Down
33 changes: 32 additions & 1 deletion tests/nodes/NodeManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,37 @@ describe('NodeManager', () => {
expect(finalConnLock).toBeDefined();
expect(finalConnLock?.lock.isLocked()).toBeFalsy();
});
test(
'unable to create new connection to offline node',
async () => {
// Add the dummy node
await nodeManager.setNode(dummyNode, {
ip: '125.0.0.1' as Host,
port: 55555 as Port,
});
// @ts-ignore accessing protected NodeConnectionMap
expect(nodeManager.connections.size).toBe(0);

await expect(() =>
nodeManager.getConnectionToNode(dummyNode),
).rejects.toThrow(nodesErrors.ErrorNodeConnectionTimeout);
// @ts-ignore accessing protected NodeConnectionMap
expect(nodeManager.connections.size).toBe(1);
// @ts-ignore accessing protected NodeConnectionMap
const connLock = nodeManager.connections.get(dummyNode);
// There should still be an entry in the connection map, but it should
// only contain a lock - no connection.
expect(connLock).toBeDefined();
expect(connLock?.lock).toBeDefined();
expect(connLock?.connection).toBeUndefined();

// Undo the initial dummy node add
// @ts-ignore - get the NodeGraph reference
const nodeGraph = nodeManager.nodeGraph;
await nodeGraph.unsetNode(dummyNode);
},
global.failedConnectionTimeout * 2,
);
});

test(
Expand Down Expand Up @@ -312,7 +343,7 @@ describe('NodeManager', () => {

await testUtils.cleanupRemoteKeynode(server);
},
global.failedConnectionTimeout,
global.failedConnectionTimeout * 2,
);
test('knows node (true and false case)', async () => {
// Known node
Expand Down

0 comments on commit 475266f

Please sign in to comment.