From e85e8cf0b2ccd2bdcef6e813efd1f95e64b34567 Mon Sep 17 00:00:00 2001 From: Benj-AI-e Date: Thu, 5 Feb 2026 09:42:58 +0000 Subject: [PATCH] Fix flagUnion propagation for incremental buckets --- .../__tests__/flagUnionIncremental-test.ts | 178 ++++++++++++++++++ grafast/grafast/src/prepare.ts | 13 +- 2 files changed, 189 insertions(+), 2 deletions(-) create mode 100644 grafast/grafast/__tests__/flagUnionIncremental-test.ts diff --git a/grafast/grafast/__tests__/flagUnionIncremental-test.ts b/grafast/grafast/__tests__/flagUnionIncremental-test.ts new file mode 100644 index 0000000000..ac81913376 --- /dev/null +++ b/grafast/grafast/__tests__/flagUnionIncremental-test.ts @@ -0,0 +1,178 @@ +/* eslint-disable graphile-export/exhaustive-deps, graphile-export/export-methods, graphile-export/export-plans, graphile-export/export-instances, graphile-export/export-subclasses, graphile-export/no-nested */ +import { expect } from "chai"; +import { resolvePreset } from "graphile-config"; +import { it } from "mocha"; + +import type { AsyncExecutionResult } from "graphql"; +import type { UnbatchedExecutionExtra } from "../dist/index.js"; +import { + DEFAULT_ACCEPT_FLAGS, + TRAP_ERROR, + UnbatchedStep, + constant, + flagError, + grafast, + lambda, + makeGrafastSchema, + Step, +} from "../dist/index.js"; + +import { resolveStreamDefer, streamToArray } from "./incrementalUtils.ts"; + +const resolvedPreset = resolvePreset({}); +const requestContext = {}; + +class FlagUnionCheckStep extends UnbatchedStep { + isSyncAndSafe = false; + constructor($dep: Step) { + super(); + this.addDependency({ + step: $dep, + acceptFlags: DEFAULT_ACCEPT_FLAGS | TRAP_ERROR, + }); + } + unbatchedExecute( + _extra: UnbatchedExecutionExtra, + _value: unknown, + ): number { + if ((_extra._bucket.flagUnion & TRAP_ERROR) === 0) { + throw new Error("Missing flagUnion error flag"); + } + return 1; + } +} + +it("carries error flags into stream buckets", async () => { + let $bad: Step | null = null; + const schema = makeGrafastSchema({ + typeDefs: /* GraphQL */ ` + type Thing { + id: Int + check: Int + } + type Query { + list: [Thing!]! + } + `, + objects: { + Query: { + plans: { + list() { + $bad = lambda(null, () => + flagError(new Error("Root error flag")), + ); + return constant([1, 2]); + }, + }, + }, + Thing: { + plans: { + id($i: Step) { + return $i; + }, + check() { + if ($bad == null) { + throw new Error("Expected $bad to be initialised"); + } + return new FlagUnionCheckStep($bad); + }, + }, + }, + }, + enableDeferStream: true, + }); + + const source = /* GraphQL */ ` + { + list @stream(initialCount: 0) { + id + check + } + } + `; + + const result = await grafast({ + schema, + source, + resolvedPreset, + requestContext, + }); + const payloads = (await streamToArray(result)) as AsyncExecutionResult[]; + payloads.forEach((payload) => { + expect(payload.errors).to.equal(undefined); + }); + const merged = resolveStreamDefer(payloads); + expect(merged.data).to.deep.equal({ + list: [ + { id: 1, check: 1 }, + { id: 2, check: 1 }, + ], + }); +}); + +it("carries error flags into deferred buckets", async () => { + let $bad: Step | null = null; + const schema = makeGrafastSchema({ + typeDefs: /* GraphQL */ ` + type Thing { + id: Int + check: Int + } + type Query { + thing: Thing + } + `, + objects: { + Query: { + plans: { + thing() { + $bad = lambda(null, () => + flagError(new Error("Root error flag")), + ); + return constant(1); + }, + }, + }, + Thing: { + plans: { + id($i: Step) { + return $i; + }, + check() { + if ($bad == null) { + throw new Error("Expected $bad to be initialised"); + } + return new FlagUnionCheckStep($bad); + }, + }, + }, + }, + enableDeferStream: true, + }); + + const source = /* GraphQL */ ` + { + thing { + id + ... @defer { + check + } + } + } + `; + + const result = await grafast({ + schema, + source, + resolvedPreset, + requestContext, + }); + const payloads = (await streamToArray(result)) as AsyncExecutionResult[]; + payloads.forEach((payload) => { + expect(payload.errors).to.equal(undefined); + }); + const merged = resolveStreamDefer(payloads); + expect(merged.data).to.deep.equal({ + thing: { id: 1, check: 1 }, + }); +}); diff --git a/grafast/grafast/src/prepare.ts b/grafast/grafast/src/prepare.ts index 63a2029186..5664a39a79 100644 --- a/grafast/grafast/src/prepare.ts +++ b/grafast/grafast/src/prepare.ts @@ -42,6 +42,7 @@ import { establishOperationPlan } from "./establishOperationPlan.ts"; import type { ErrorBehavior, EstablishOperationPlanEvent, + ExecutionEntryFlags, GrafastExecutionArgs, GrafastTimeouts, JSONValue, @@ -104,6 +105,14 @@ const bypassGraphQLObj = Object.assign(Object.create(null), { function noop() {} +function getFlagUnionFromStore(store: Bucket["store"]): ExecutionEntryFlags { + let flagUnion = NO_FLAGS; + for (const ev of store.values()) { + flagUnion |= ev._getStateUnion(); + } + return flagUnion; +} + function processRoot( // errors should already have been handled, and this ctx isn't suitable to be reused. ctx: Omit, @@ -885,7 +894,7 @@ async function processStream( layerPlan: directLayerPlanChild, size, store, - flagUnion: NO_FLAGS, + flagUnion: getFlagUnionFromStore(store), polymorphicPathList, polymorphicType: null, iterators, @@ -1054,7 +1063,7 @@ function processSingleDeferred( layerPlan: outputPlan.layerPlan, size, store, - flagUnion: NO_FLAGS, + flagUnion: getFlagUnionFromStore(store), polymorphicPathList, polymorphicType: null, iterators,