Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions grafast/grafast/__tests__/flagUnionIncremental-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/* eslint-disable graphile-export/exhaustive-deps, graphile-export/export-methods, graphile-export/export-plans, graphile-export/export-instances, graphile-export/export-subclasses, graphile-export/no-nested */
import { expect } from "chai";
import { resolvePreset } from "graphile-config";
import { it } from "mocha";

import type { AsyncExecutionResult } from "graphql";
import type { UnbatchedExecutionExtra } from "../dist/index.js";
import {
DEFAULT_ACCEPT_FLAGS,
TRAP_ERROR,
UnbatchedStep,
constant,
flagError,
grafast,
lambda,
makeGrafastSchema,
Step,
} from "../dist/index.js";

import { resolveStreamDefer, streamToArray } from "./incrementalUtils.ts";

const resolvedPreset = resolvePreset({});
const requestContext = {};

class FlagUnionCheckStep extends UnbatchedStep<number> {
isSyncAndSafe = false;
constructor($dep: Step<unknown>) {
super();
this.addDependency({
step: $dep,
acceptFlags: DEFAULT_ACCEPT_FLAGS | TRAP_ERROR,
});
}
unbatchedExecute(
_extra: UnbatchedExecutionExtra,
_value: unknown,
): number {
if ((_extra._bucket.flagUnion & TRAP_ERROR) === 0) {
throw new Error("Missing flagUnion error flag");
}
return 1;
}
}

it("carries error flags into stream buckets", async () => {
let $bad: Step<unknown> | null = null;
const schema = makeGrafastSchema({
typeDefs: /* GraphQL */ `
type Thing {
id: Int
check: Int
}
type Query {
list: [Thing!]!
}
`,
objects: {
Query: {
plans: {
list() {
$bad = lambda(null, () =>
flagError(new Error("Root error flag")),
);
return constant([1, 2]);
},
},
},
Thing: {
plans: {
id($i: Step<number>) {
return $i;
},
check() {
if ($bad == null) {
throw new Error("Expected $bad to be initialised");
}
return new FlagUnionCheckStep($bad);
},
},
},
},
enableDeferStream: true,
});

const source = /* GraphQL */ `
{
list @stream(initialCount: 0) {
id
check
}
}
`;

const result = await grafast({
schema,
source,
resolvedPreset,
requestContext,
});
const payloads = (await streamToArray(result)) as AsyncExecutionResult[];
payloads.forEach((payload) => {
expect(payload.errors).to.equal(undefined);
});
const merged = resolveStreamDefer(payloads);
expect(merged.data).to.deep.equal({
list: [
{ id: 1, check: 1 },
{ id: 2, check: 1 },
],
});
});

it("carries error flags into deferred buckets", async () => {
let $bad: Step<unknown> | null = null;
const schema = makeGrafastSchema({
typeDefs: /* GraphQL */ `
type Thing {
id: Int
check: Int
}
type Query {
thing: Thing
}
`,
objects: {
Query: {
plans: {
thing() {
$bad = lambda(null, () =>
flagError(new Error("Root error flag")),
);
return constant(1);
},
},
},
Thing: {
plans: {
id($i: Step<number>) {
return $i;
},
check() {
if ($bad == null) {
throw new Error("Expected $bad to be initialised");
}
return new FlagUnionCheckStep($bad);
},
},
},
},
enableDeferStream: true,
});

const source = /* GraphQL */ `
{
thing {
id
... @defer {
check
}
}
}
`;

const result = await grafast({
schema,
source,
resolvedPreset,
requestContext,
});
const payloads = (await streamToArray(result)) as AsyncExecutionResult[];
payloads.forEach((payload) => {
expect(payload.errors).to.equal(undefined);
});
const merged = resolveStreamDefer(payloads);
expect(merged.data).to.deep.equal({
thing: { id: 1, check: 1 },
});
});
13 changes: 11 additions & 2 deletions grafast/grafast/src/prepare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import { establishOperationPlan } from "./establishOperationPlan.ts";
import type {
ErrorBehavior,
EstablishOperationPlanEvent,
ExecutionEntryFlags,
GrafastExecutionArgs,
GrafastTimeouts,
JSONValue,
Expand Down Expand Up @@ -104,6 +105,14 @@ const bypassGraphQLObj = Object.assign(Object.create(null), {

function noop() {}

function getFlagUnionFromStore(store: Bucket["store"]): ExecutionEntryFlags {
let flagUnion = NO_FLAGS;
for (const ev of store.values()) {
flagUnion |= ev._getStateUnion();
}
return flagUnion;
}

function processRoot(
// errors should already have been handled, and this ctx isn't suitable to be reused.
ctx: Omit<OutputPlanContext, "errors">,
Expand Down Expand Up @@ -885,7 +894,7 @@ async function processStream(
layerPlan: directLayerPlanChild,
size,
store,
flagUnion: NO_FLAGS,
flagUnion: getFlagUnionFromStore(store),
polymorphicPathList,
polymorphicType: null,
iterators,
Expand Down Expand Up @@ -1054,7 +1063,7 @@ function processSingleDeferred(
layerPlan: outputPlan.layerPlan,
size,
store,
flagUnion: NO_FLAGS,
flagUnion: getFlagUnionFromStore(store),
polymorphicPathList,
polymorphicType: null,
iterators,
Expand Down
Loading