Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: added cancellation and tests for some git operations
Browse files Browse the repository at this point in the history
aryanjassal committed Jan 24, 2025
1 parent bd8902a commit 1094c0d
Showing 8 changed files with 274 additions and 170 deletions.
2 changes: 1 addition & 1 deletion src/client/handlers/VaultsSecretsRemove.ts
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ class VaultsSecretsRemove extends DuplexHandler<
// release it when cleaning up.
const acquire = await vaultManager.withVaults(
[vaultId],
async (vault) => vault.acquireWrite(ctx),
async (vault) => vault.acquireWrite(undefined, ctx),
);
vaultAcquires.push(acquire);
}
30 changes: 26 additions & 4 deletions src/git/http.ts
Original file line number Diff line number Diff line change
@@ -365,9 +365,19 @@ async function* generatePackRequest(
ctx: ContextTimed,
): AsyncGenerator<Buffer, void, void> {
const [wants, haves, _capabilities] = await parsePackRequest(body);
const efsProxy = new Proxy(efs, {
get: (target, key) => {
ctx.signal.throwIfAborted();
const resource = target[key];
if (typeof resource === 'function') {
return (...args) => resource.apply(target, args);
}
return resource;
},
});
const objectIds = await gitUtils.listObjects(
{
efs: efs,
efs: efsProxy,
dir: dir,
gitDir: gitDir,
wants: wants,
@@ -377,7 +387,8 @@ async function* generatePackRequest(
);
// Reply that we have no common history and that we need to send everything
yield packetLineBuffer(gitUtils.NAK_BUFFER);
// Send everything over in pack format
// Send everything over in pack format. This method already proxies efs, so
// there's no need to double-proxy it.
yield* generatePackData(
{
efs: efs,
@@ -421,15 +432,26 @@ async function* generatePackData(
let packFile: PackObjectsResult;
// In case of errors we don't want to throw them. This will result in the error being thrown into `isometric-git`
// when it consumes the response. It handles this by logging out the error which we don't want to happen.
const efsProxy = new Proxy(efs, {
get: (target, key) => {
ctx.signal.throwIfAborted();
const resource = target[key];
if (typeof resource === 'function') {
return (...args) => resource.apply(target, args);
}
return resource;
},
});
try {
packFile = await git.packObjects({
fs: efs,
fs: efsProxy,
dir: dir,
gitdir: gitDir,
oids: objectIds,
});
} catch {
} catch (e) {
// Return without sending any data
if (e === ctx.signal.reason) throw e;
return;
}
// Pack file will only be undefined if it was written to disk instead
239 changes: 114 additions & 125 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
@@ -499,17 +499,17 @@ class NodeManager {
* @param f Function to handle communication
* @param ctx
*/
public withConnF<T>(
public async withConnF<T>(
nodeId: NodeId,
ctx: Partial<ContextTimedInput> | undefined,
f: (conn: NodeConnection) => Promise<T>,
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<T>;
): Promise<T>;
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
public async withConnF<T>(
nodeId: NodeId,
f: (conn: NodeConnection) => Promise<T>,
@context ctx: ContextTimed,
) {
f: (conn: NodeConnection) => Promise<T>,
): Promise<T> {
return await withF(
[this.acquireConnection(nodeId, ctx)],
async ([conn]) => {
@@ -1220,52 +1220,45 @@ class NodeManager {
@context ctx: ContextTimed,
): Promise<Record<ClaimId, SignedClaim>> {
// Verify the node's chain with its own public key
return await this.withConnF(
targetNodeId,
async (connection) => {
const claims: Record<ClaimId, SignedClaim> = {};
const client = connection.getClient();
for await (const agentClaim of await client.methods.nodesClaimsGet({
claimIdEncoded:
claimId != null
? claimsUtils.encodeClaimId(claimId)
: ('' as ClaimIdEncoded),
})) {
if (ctx.signal.aborted) throw ctx.signal.reason;
// Need to re-construct each claim
const claimId: ClaimId = claimsUtils.decodeClaimId(
agentClaim.claimIdEncoded,
)!;
const signedClaimEncoded = agentClaim.signedTokenEncoded;
const signedClaim = claimsUtils.parseSignedClaim(signedClaimEncoded);
// Verifying the claim
const issPublicKey = keysUtils.publicKeyFromNodeId(
nodesUtils.decodeNodeId(signedClaim.payload.iss)!,
);
const subPublicKey =
signedClaim.payload.typ === 'node'
? keysUtils.publicKeyFromNodeId(
nodesUtils.decodeNodeId(signedClaim.payload.iss)!,
)
: null;
const token = Token.fromSigned(signedClaim);
if (!token.verifyWithPublicKey(issPublicKey)) {
this.logger.warn('Failed to verify issuing node');
continue;
}
if (
subPublicKey != null &&
!token.verifyWithPublicKey(subPublicKey)
) {
this.logger.warn('Failed to verify subject node');
continue;
}
claims[claimId] = signedClaim;
return await this.withConnF(targetNodeId, ctx, async (connection) => {
const claims: Record<ClaimId, SignedClaim> = {};
const client = connection.getClient();
for await (const agentClaim of await client.methods.nodesClaimsGet({
claimIdEncoded:
claimId != null
? claimsUtils.encodeClaimId(claimId)
: ('' as ClaimIdEncoded),
})) {
if (ctx.signal.aborted) throw ctx.signal.reason;
// Need to re-construct each claim
const claimId: ClaimId = claimsUtils.decodeClaimId(
agentClaim.claimIdEncoded,
)!;
const signedClaimEncoded = agentClaim.signedTokenEncoded;
const signedClaim = claimsUtils.parseSignedClaim(signedClaimEncoded);
// Verifying the claim
const issPublicKey = keysUtils.publicKeyFromNodeId(
nodesUtils.decodeNodeId(signedClaim.payload.iss)!,
);
const subPublicKey =
signedClaim.payload.typ === 'node'
? keysUtils.publicKeyFromNodeId(
nodesUtils.decodeNodeId(signedClaim.payload.iss)!,
)
: null;
const token = Token.fromSigned(signedClaim);
if (!token.verifyWithPublicKey(issPublicKey)) {
this.logger.warn('Failed to verify issuing node');
continue;
}
return claims;
},
ctx,
);
if (subPublicKey != null && !token.verifyWithPublicKey(subPublicKey)) {
this.logger.warn('Failed to verify subject node');
continue;
}
claims[claimId] = signedClaim;
}
return claims;
});
}

/**
@@ -1296,83 +1289,79 @@ class NodeManager {
},
undefined,
async (token) => {
return this.withConnF(
targetNodeId,
async (conn) => {
// 2. create the agentClaim message to send
const halfSignedClaim = token.toSigned();
const halfSignedClaimEncoded =
claimsUtils.generateSignedClaim(halfSignedClaim);
const client = conn.getClient();
const stream = await client.methods.nodesCrossSignClaim();
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();
let fullySignedToken: Token<Claim>;
try {
await writer.write({
signedTokenEncoded: halfSignedClaimEncoded,
});
// 3. We expect to receive the doubly signed claim
const readStatus = await reader.read();
if (readStatus.done) {
throw new claimsErrors.ErrorEmptyStream();
}
const receivedClaim = readStatus.value;
// We need to re-construct the token from the message
const signedClaim = claimsUtils.parseSignedClaim(
receivedClaim.signedTokenEncoded,
);
fullySignedToken = Token.fromSigned(signedClaim);
// Check that the signatures are correct
const targetNodePublicKey =
keysUtils.publicKeyFromNodeId(targetNodeId);
if (
!fullySignedToken.verifyWithPublicKey(
this.keyRing.keyPair.publicKey,
) ||
!fullySignedToken.verifyWithPublicKey(targetNodePublicKey)
) {
throw new claimsErrors.ErrorDoublySignedClaimVerificationFailed();
}
return this.withConnF(targetNodeId, ctx, async (conn) => {
// 2. create the agentClaim message to send
const halfSignedClaim = token.toSigned();
const halfSignedClaimEncoded =
claimsUtils.generateSignedClaim(halfSignedClaim);
const client = conn.getClient();
const stream = await client.methods.nodesCrossSignClaim();
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();
let fullySignedToken: Token<Claim>;
try {
await writer.write({
signedTokenEncoded: halfSignedClaimEncoded,
});
// 3. We expect to receive the doubly signed claim
const readStatus = await reader.read();
if (readStatus.done) {
throw new claimsErrors.ErrorEmptyStream();
}
const receivedClaim = readStatus.value;
// We need to re-construct the token from the message
const signedClaim = claimsUtils.parseSignedClaim(
receivedClaim.signedTokenEncoded,
);
fullySignedToken = Token.fromSigned(signedClaim);
// Check that the signatures are correct
const targetNodePublicKey =
keysUtils.publicKeyFromNodeId(targetNodeId);
if (
!fullySignedToken.verifyWithPublicKey(
this.keyRing.keyPair.publicKey,
) ||
!fullySignedToken.verifyWithPublicKey(targetNodePublicKey)
) {
throw new claimsErrors.ErrorDoublySignedClaimVerificationFailed();
}

// Next stage is to process the claim for the other node
const readStatus2 = await reader.read();
if (readStatus2.done) {
throw new claimsErrors.ErrorEmptyStream();
}
const receivedClaimRemote = readStatus2.value;
// We need to re-construct the token from the message
const signedClaimRemote = claimsUtils.parseSignedClaim(
receivedClaimRemote.signedTokenEncoded,
);
// This is a singly signed claim,
// we want to verify it before signing and sending back
const signedTokenRemote = Token.fromSigned(signedClaimRemote);
if (!signedTokenRemote.verifyWithPublicKey(targetNodePublicKey)) {
throw new claimsErrors.ErrorSinglySignedClaimVerificationFailed();
}
signedTokenRemote.signWithPrivateKey(this.keyRing.keyPair);
// 4. X <- responds with double signing the X signed claim <- Y
const agentClaimedMessageRemote = claimsUtils.generateSignedClaim(
signedTokenRemote.toSigned(),
);
await writer.write({
signedTokenEncoded: agentClaimedMessageRemote,
});

// Check the stream is closed (should be closed by other side)
const finalResponse = await reader.read();
if (finalResponse.done != null) {
await writer.close();
}
} catch (e) {
await writer.abort(e);
throw e;
// Next stage is to process the claim for the other node
const readStatus2 = await reader.read();
if (readStatus2.done) {
throw new claimsErrors.ErrorEmptyStream();
}
return fullySignedToken;
},
ctx,
);
const receivedClaimRemote = readStatus2.value;
// We need to re-construct the token from the message
const signedClaimRemote = claimsUtils.parseSignedClaim(
receivedClaimRemote.signedTokenEncoded,
);
// This is a singly signed claim,
// we want to verify it before signing and sending back
const signedTokenRemote = Token.fromSigned(signedClaimRemote);
if (!signedTokenRemote.verifyWithPublicKey(targetNodePublicKey)) {
throw new claimsErrors.ErrorSinglySignedClaimVerificationFailed();
}
signedTokenRemote.signWithPrivateKey(this.keyRing.keyPair);
// 4. X <- responds with double signing the X signed claim <- Y
const agentClaimedMessageRemote = claimsUtils.generateSignedClaim(
signedTokenRemote.toSigned(),
);
await writer.write({
signedTokenEncoded: agentClaimedMessageRemote,
});

// Check the stream is closed (should be closed by other side)
const finalResponse = await reader.read();
if (finalResponse.done != null) {
await writer.close();
}
} catch (e) {
await writer.abort(e);
throw e;
}
return fullySignedToken;
});
},
tran,
);
16 changes: 10 additions & 6 deletions src/notifications/NotificationsManager.ts
Original file line number Diff line number Diff line change
@@ -166,12 +166,16 @@ class NotificationsManager {
);
// The task id if a new task has been scheduled for a retry.
try {
await this.nodeManager.withConnF(nodeId, async (connection) => {
const client = connection.getClient();
await client.methods.notificationsSend({
signedNotificationEncoded: signedNotification,
});
});
await this.nodeManager.withConnF(
nodeId,
undefined,
async (connection) => {
const client = connection.getClient();
await client.methods.notificationsSend({
signedNotificationEncoded: signedNotification,
});
},
);
await this.db.del(notificationKeyPath);
} catch (e) {
this.logger.warn(
Loading

0 comments on commit 1094c0d

Please sign in to comment.