-
Notifications
You must be signed in to change notification settings - Fork 17
fix(client): scope workflow global patching to execution context #213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,48 @@ | ||
| type GenerationState = { now: number; latest: boolean }; | ||
|
|
||
| type WorkflowEnvironment = { | ||
| math: typeof Math; | ||
| date: typeof Date; | ||
| console: Console; | ||
| fetch: typeof globalThis.fetch; | ||
| setTimeout: typeof globalThis.setTimeout; | ||
| setInterval: typeof globalThis.setInterval; | ||
| }; | ||
|
|
||
| type AsyncLocalStorageLike<T> = { | ||
| run<R>(store: T, callback: () => R): R; | ||
| getStore(): T | undefined; | ||
| }; | ||
|
|
||
| type AsyncLocalStorageConstructor = new <T>() => AsyncLocalStorageLike<T>; | ||
|
|
||
| let workflowEnvironmentStorage: AsyncLocalStorageLike<WorkflowEnvironment> | undefined; | ||
| let globalsPatched = false; | ||
|
|
||
| // Capture original globals before any patching occurs so createWorkflowEnvironment | ||
| // always wraps the true originals, even if called from within an active workflow context. | ||
| const originalGlobals = { | ||
| Math: globalThis.Math, | ||
| Date: globalThis.Date, | ||
| console: globalThis.console, | ||
| }; | ||
|
|
||
| function ensureWorkflowEnvironmentStorage() { | ||
| if (workflowEnvironmentStorage !== undefined) { | ||
| return; | ||
| } | ||
|
|
||
| const global = globalThis as { | ||
| AsyncLocalStorage?: AsyncLocalStorageConstructor; | ||
| }; | ||
| if (global.AsyncLocalStorage === undefined) { | ||
| throw new Error( | ||
| "AsyncLocalStorage is not available in this runtime. Update convex-backend to a build with async_hooks support.", | ||
| ); | ||
| } | ||
| workflowEnvironmentStorage = new global.AsyncLocalStorage<WorkflowEnvironment>(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why using the global instead of an import?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kept the global path because it gives deterministic diagnostics (explicit availability check + clear error). The current node:async_hooks shim also has permissive fallbacks (createHook no-op, IDs 0) when async_hooks is absent, so it can fail less explicitly. Happy to follow up by tightening shim error behavior so import-path diagnostics match the global-path check
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha. Will this work in convex-test where our v8 shim doesn’t run? I don’t know if it’s normally a global or if we’re special-casing our own behavior |
||
| } | ||
|
|
||
| // Simple hash function to convert a string to a 32-bit seed | ||
| function hashString(str: string): number { | ||
| let hash = 0; | ||
|
|
@@ -82,41 +125,106 @@ export function createDeterministicDate( | |
| return DeterministicDate as typeof Date; | ||
| } | ||
|
|
||
| export function setupEnvironment( | ||
| const unsupportedFetch: typeof globalThis.fetch = ( | ||
| _input: RequestInfo | URL, | ||
| _init?: RequestInit, | ||
| ) => { | ||
| throw new Error( | ||
| `Fetch isn't currently supported within workflows. Perform the fetch within an action and call it with step.runAction().`, | ||
| ); | ||
| }; | ||
|
|
||
| const unsupportedSetTimeout = ((..._args: any[]) => { | ||
| throw new Error("setTimeout isn't supported within workflows yet"); | ||
| }) as unknown as typeof globalThis.setTimeout; | ||
|
|
||
| const unsupportedSetInterval = ((..._args: any[]) => { | ||
| throw new Error("setInterval isn't supported within workflows yet"); | ||
| }) as unknown as typeof globalThis.setInterval; | ||
|
|
||
| function defineWorkflowAwareGlobal<T>( | ||
| globalObject: Record<string, unknown>, | ||
|
ianmacartney marked this conversation as resolved.
|
||
| key: string, | ||
| getWorkflowValue: (environment: WorkflowEnvironment) => T, | ||
| ): void { | ||
| const descriptor = Object.getOwnPropertyDescriptor(globalObject, key); | ||
| if (descriptor?.configurable === false) { | ||
| return; | ||
| } | ||
|
|
||
| let outsideValue = globalObject[key] as T; | ||
| Object.defineProperty(globalObject, key, { | ||
| configurable: true, | ||
| enumerable: descriptor?.enumerable ?? true, | ||
| get() { | ||
| const environment = workflowEnvironmentStorage?.getStore(); | ||
| if (environment !== undefined) { | ||
| return getWorkflowValue(environment); | ||
| } | ||
| return outsideValue; | ||
| }, | ||
| set(value: T) { | ||
| outsideValue = value; | ||
| }, | ||
| }); | ||
| } | ||
|
|
||
| function createWorkflowEnvironment( | ||
| getGenerationState: () => GenerationState, | ||
| workflowId: string, | ||
| ): void { | ||
| const global = globalThis as Record<string, unknown>; | ||
| ): WorkflowEnvironment { | ||
| return { | ||
| math: patchMath(originalGlobals.Math, workflowId), | ||
| date: createDeterministicDate(originalGlobals.Date, getGenerationState), | ||
| console: createConsole(originalGlobals.console, getGenerationState), | ||
| fetch: unsupportedFetch, | ||
| setTimeout: unsupportedSetTimeout, | ||
| setInterval: unsupportedSetInterval, | ||
| }; | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| // Patch Math with seeded random based on workflowId | ||
| global.Math = patchMath(global.Math as typeof Math, workflowId); | ||
| export function setupEnvironment(): void { | ||
| if (globalsPatched) { | ||
| return; | ||
| } | ||
|
|
||
| // Patch Date | ||
| const originalDate = global.Date as typeof Date; | ||
| global.Date = createDeterministicDate(originalDate, getGenerationState); | ||
| ensureWorkflowEnvironmentStorage(); | ||
|
|
||
| // Patch console | ||
| global.console = createConsole(global.console as Console, getGenerationState); | ||
| const global = globalThis as Record<string, unknown>; | ||
| defineWorkflowAwareGlobal(global, "Math", (environment) => environment.math); | ||
| defineWorkflowAwareGlobal(global, "Date", (environment) => environment.date); | ||
| defineWorkflowAwareGlobal(global, "console", (environment) => environment.console); | ||
| defineWorkflowAwareGlobal(global, "fetch", (environment) => environment.fetch); | ||
| defineWorkflowAwareGlobal(global, "setTimeout", (environment) => environment.setTimeout); | ||
| defineWorkflowAwareGlobal(global, "setInterval", (environment) => environment.setInterval); | ||
|
|
||
| // Patch fetch | ||
| global.fetch = (_input: RequestInfo | URL, _init?: RequestInit) => { | ||
| throw new Error( | ||
| `Fetch isn't currently supported within workflows. Perform the fetch within an action and call it with step.runAction().`, | ||
| ); | ||
| }; | ||
| const restrictedGlobals = [ | ||
| "process", | ||
| "Crypto", | ||
| "crypto", | ||
| "CryptoKey", | ||
| "SubtleCrypto", | ||
| ]; | ||
| for (const key of restrictedGlobals) { | ||
| defineWorkflowAwareGlobal(global, key, () => undefined); | ||
| } | ||
|
|
||
| // Remove non-deterministic globals | ||
| delete global.process; | ||
| delete global.Crypto; | ||
| delete global.crypto; | ||
| delete global.CryptoKey; | ||
| delete global.SubtleCrypto; | ||
| global.setTimeout = () => { | ||
| throw new Error("setTimeout isn't supported within workflows yet"); | ||
| }; | ||
| global.setInterval = () => { | ||
| throw new Error("setInterval isn't supported within workflows yet"); | ||
| }; | ||
| globalsPatched = true; | ||
| } | ||
|
|
||
| export function runWithWorkflowEnvironment<T>( | ||
| getGenerationState: () => GenerationState, | ||
| workflowId: string, | ||
| run: () => T, | ||
| ): T { | ||
| setupEnvironment(); | ||
| if (workflowEnvironmentStorage === undefined) { | ||
| throw new Error("AsyncLocalStorage is not initialized"); | ||
| } | ||
| return workflowEnvironmentStorage.run( | ||
| createWorkflowEnvironment(getGenerationState, workflowId), | ||
| run, | ||
| ); | ||
| } | ||
|
|
||
| function noop() {} | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why isn't this !!workflowEnvironmentsStorage? What's different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storage creation and global patching are separate phases. globalsPatched makes that boundary explicit and avoids conflating partial setup with done.