Skip to content

Commit

Permalink
Treat ResponseDefinition as optional
Browse files Browse the repository at this point in the history
  • Loading branch information
srikrsna-buf committed Nov 30, 2023
1 parent 26dd6cb commit 5b2273b
Showing 1 changed file with 9 additions and 25 deletions.
34 changes: 9 additions & 25 deletions packages/connect-node-conformance/src/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { Code, ConnectError } from "@connectrpc/connect";
import type { ConnectRouter, HandlerContext } from "@connectrpc/connect";
import { ConformanceService } from "./gen/connectrpc/conformance/v1/service_connect.js";
import { Any } from "@bufbuild/protobuf";
Expand Down Expand Up @@ -84,19 +83,13 @@ export default ({ service }: ConnectRouter) => {
},
async *serverStream(req, ctx) {
const def = req.responseDefinition;
if (def === undefined) {
throw new ConnectError(
"missing response_definition",
Code.InvalidArgument,
);
}
appendProtoHeaders(ctx.responseHeader, def.responseHeaders);
appendProtoHeaders(ctx.responseTrailer, def.responseTrailers);
appendProtoHeaders(ctx.responseHeader, def?.responseHeaders ?? []);
appendProtoHeaders(ctx.responseTrailer, def?.responseTrailers ?? []);
const anyReq = Any.pack(req);
let reqInfo: ConformancePayload_RequestInfo | undefined =
createRequestInfo(ctx, [anyReq]);
for (const res of def.responseData) {
await wait(def.responseDelayMs);
for (const res of def?.responseData ?? []) {
await wait(def!.responseDelayMs);
yield {
payload: new ConformancePayload({
requestInfo: reqInfo,
Expand All @@ -106,7 +99,7 @@ export default ({ service }: ConnectRouter) => {
// Only echo back the request info in the first response
reqInfo = undefined;
}
if (def.error !== undefined) {
if (def?.error !== undefined) {
if (def.responseData.length === 0) {
def.error.details.push(Any.pack(createRequestInfo(ctx, [anyReq])));
}
Expand All @@ -121,26 +114,17 @@ export default ({ service }: ConnectRouter) => {
for await (const req of reqIt) {
if (def === undefined) {
def = req.responseDefinition;
if (def === undefined) {
throw new ConnectError(
"missing response_definition",
Code.InvalidArgument,
);
}
appendProtoHeaders(ctx.responseHeader, def.responseHeaders);
appendProtoHeaders(ctx.responseTrailer, def.responseTrailers);
appendProtoHeaders(ctx.responseHeader, def?.responseHeaders ?? []);
appendProtoHeaders(ctx.responseTrailer, def?.responseTrailers ?? []);
fullDuplex = req.fullDuplex;
}
reqs.push(Any.pack(req));
if (!fullDuplex) {
continue;
}
// fullDuplex, so send one of the desired responses each time we get a message on the stream
if (resNum >= def.responseData.length) {
throw new ConnectError(
"received more requests than desired responses on a full duplex stream",
Code.Aborted,
);
if (def === undefined || resNum >= def.responseData.length) {
break;
}
await wait(def.responseDelayMs);
yield {
Expand Down

0 comments on commit 5b2273b

Please sign in to comment.