Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisj committed Apr 25, 2024
1 parent 01cea05 commit c1edc33
Showing 1 changed file with 6 additions and 26 deletions.
32 changes: 6 additions & 26 deletions src/datasource/graphene/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import {
RENDER_RATIO_LIMIT,
getGrapheneFragmentKey,
isBaseSegmentId,
responseIdentity,
} from "#src/datasource/graphene/base.js";
import { decodeManifestChunk } from "#src/datasource/precomputed/backend.js";
import type { FragmentChunk, ManifestChunk } from "#src/mesh/backend.js";
Expand Down Expand Up @@ -265,7 +264,7 @@ function decodeChunkedGraphChunk(leaves: string[]) {
return final;
}

class BulkRequestProxy {
class LeavesManyProxy {
pendingRequests = new Map<
string,
[Signal<(response: any) => void>, Uint64Set, CancellationTokenSource]
Expand All @@ -281,33 +280,19 @@ class BulkRequestProxy {
bounds: string,
cancellationToken: CancellationToken,
): Promise<any> {
responseIdentity;
const { pendingRequests } = this;
let pendingRequest = pendingRequests.get(bounds);
if (!pendingRequest) {
const { parameters, credentialsProvider } = this;
const signal = new Signal<(request: any) => void>();
(signal as any).start = performance.now();
const requestCancellationToken = new CancellationTokenSource();
pendingRequest = [signal, new Uint64Set(), requestCancellationToken];
const segments = new Uint64Set();
pendingRequest = [signal, segments, requestCancellationToken];
pendingRequests.set(bounds, pendingRequest);
setTimeout(async () => {
// console.log("duration", performance.now() - (signal as any).start);
const pendingRequest = pendingRequests.get(bounds);
pendingRequests.delete(bounds);
if (!pendingRequest) {
console.error("how could this happen?");
return;
}
const [_, segments] = pendingRequest;
_;
// console.log(segments.size);
try {
if (requestCancellationToken.isCanceled) {
// TODO is this necessary?
signal.dispatch(new Error("cancelled!")); // TODO
return;
}
const response = await cancellableFetchSpecialOk(
credentialsProvider,
`${parameters.url}/leaves_many?int64_as_str=1&bounds=${bounds}`,
Expand All @@ -329,12 +314,8 @@ class BulkRequestProxy {
const [request, segments, requestCancellationToken] = pendingRequest;
segments.add(segment);
cancellationToken.add(() => {
// is it necessary to wait until there are no segments because
// console.log("cancellationToken called");
segments.delete(segment);

if (segments.size === 0) {
console.log("cancelling request");
requestCancellationToken.cancel();
}
});
Expand Down Expand Up @@ -362,15 +343,15 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
chunks: Map<string, ChunkedGraphChunk>;
tempChunkDataSize: Uint32Array;
tempChunkPosition: Float32Array;
requestProxy: BulkRequestProxy;
leavesManyProxy: LeavesManyProxy;

constructor(rpc: RPC, options: any) {
super(rpc, options);
this.spec = options.spec;
const rank = this.spec.rank;
this.tempChunkDataSize = new Uint32Array(rank);
this.tempChunkPosition = new Float32Array(rank);
this.requestProxy = new BulkRequestProxy(
this.leavesManyProxy = new LeavesManyProxy(
this.parameters,
this.credentialsProvider,
);
Expand All @@ -387,13 +368,12 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
`${chunkPosition[1]}-${chunkPosition[1] + chunkDataSize[1]}_` +
`${chunkPosition[2]}-${chunkPosition[2] + chunkDataSize[2]}`;

const request = await this.requestProxy.request(
const request = await this.leavesManyProxy.request(
chunk.segment,
bounds,
cancellationToken,
);
chunk.leaves = decodeChunkedGraphChunk(request);
// .catch((err) => console.error(err));
}

getChunk(chunkGridPosition: Float32Array, segment: Uint64) {
Expand Down

0 comments on commit c1edc33

Please sign in to comment.