diff --git a/src/client/environment.test.ts b/src/client/environment.test.ts index d3c9ae17..c0579d14 100644 --- a/src/client/environment.test.ts +++ b/src/client/environment.test.ts @@ -1,10 +1,23 @@ +import { AsyncLocalStorage } from "node:async_hooks"; import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import { patchMath, createDeterministicDate, createConsole, + setupEnvironment, + runWithWorkflowEnvironment, } from "./environment.js"; +// The workflow environment expects AsyncLocalStorage on globalThis (as provided by +// convex-backend's async_hooks runtime). Polyfill it here so tests run in Node.js. +if ( + (globalThis as { AsyncLocalStorage?: typeof AsyncLocalStorage }) + .AsyncLocalStorage === undefined +) { + (globalThis as { AsyncLocalStorage?: typeof AsyncLocalStorage }).AsyncLocalStorage = + AsyncLocalStorage; +} + describe("environment patching units", () => { describe("patchMath", () => { it("should preserve all Math methods except random", () => { @@ -503,4 +516,68 @@ describe("environment patching units", () => { expect(mockConsole.info).not.toHaveBeenCalled(); }); }); + + describe("workflow-scoped patching", () => { + it("keeps patched globals out of non-workflow execution", async () => { + const originalSetTimeout = globalThis.setTimeout; + const originalSetInterval = globalThis.setInterval; + const originalFetch = globalThis.fetch; + const originalDateNow = Date.now; + + setupEnvironment(); + + await runWithWorkflowEnvironment( + () => ({ now: 12345, latest: true }), + "workflow-1", + async () => { + expect(Date.now()).toBe(12345); + expect(() => setTimeout(() => {}, 0)).toThrow( + /setTimeout isn't supported within workflows yet/, + ); + expect(() => setInterval(() => {}, 0)).toThrow( + /setInterval isn't supported within workflows yet/, + ); + expect(() => fetch("https://example.com")).toThrow( + /Fetch isn't currently supported within workflows/, + ); + + await Promise.resolve(); + expect(() => setTimeout(() => {}, 0)).toThrow( + /setTimeout isn't supported within workflows yet/, + ); + }, + ); + + expect(globalThis.setTimeout).toBe(originalSetTimeout); + expect(globalThis.setInterval).toBe(originalSetInterval); + expect(globalThis.fetch).toBe(originalFetch); + expect(Date.now).toBe(originalDateNow); + + const timeout = setTimeout(() => {}, 0); + clearTimeout(timeout); + }); + + it("keeps restricted globals available outside workflow execution", () => { + setupEnvironment(); + + const hasProcessOutside = "process" in globalThis; + const hasCryptoOutside = "crypto" in globalThis; + + runWithWorkflowEnvironment( + () => ({ now: 5, latest: true }), + "workflow-2", + () => { + expect((globalThis as { process?: unknown }).process).toBeUndefined(); + expect((globalThis as { crypto?: unknown }).crypto).toBeUndefined(); + }, + ); + + if (hasProcessOutside) { + expect((globalThis as { process?: unknown }).process).toBeDefined(); + } + if (hasCryptoOutside) { + expect((globalThis as { crypto?: unknown }).crypto).toBeDefined(); + } + }); + }); }); diff --git a/src/client/environment.ts b/src/client/environment.ts index cd1bcc2a..c28b34c1 100644 --- a/src/client/environment.ts +++ b/src/client/environment.ts @@ -1,5 +1,48 @@ type GenerationState = { now: number; latest: boolean }; +type WorkflowEnvironment = { + math: typeof Math; + date: typeof Date; + console: Console; + fetch: typeof globalThis.fetch; + setTimeout: typeof globalThis.setTimeout; + setInterval: typeof globalThis.setInterval; +}; + +type AsyncLocalStorageLike = { + run(store: T, callback: () => R): R; + getStore(): T | undefined; +}; + +type AsyncLocalStorageConstructor = new () => AsyncLocalStorageLike; + +let workflowEnvironmentStorage: AsyncLocalStorageLike | undefined; +let globalsPatched = false; + +// Capture original globals before any patching occurs so createWorkflowEnvironment +// always wraps the true originals, even if called from within an active workflow context. +const originalGlobals = { + Math: globalThis.Math, + Date: globalThis.Date, + console: globalThis.console, +}; + +function ensureWorkflowEnvironmentStorage() { + if (workflowEnvironmentStorage !== undefined) { + return; + } + + const global = globalThis as { + AsyncLocalStorage?: AsyncLocalStorageConstructor; + }; + if (global.AsyncLocalStorage === undefined) { + throw new Error( + "AsyncLocalStorage is not available in this runtime. Update convex-backend to a build with async_hooks support.", + ); + } + workflowEnvironmentStorage = new global.AsyncLocalStorage(); +} + // Simple hash function to convert a string to a 32-bit seed function hashString(str: string): number { let hash = 0; @@ -82,41 +125,106 @@ export function createDeterministicDate( return DeterministicDate as typeof Date; } -export function setupEnvironment( +const unsupportedFetch: typeof globalThis.fetch = ( + _input: RequestInfo | URL, + _init?: RequestInit, +) => { + throw new Error( + `Fetch isn't currently supported within workflows. Perform the fetch within an action and call it with step.runAction().`, + ); +}; + +const unsupportedSetTimeout = ((..._args: any[]) => { + throw new Error("setTimeout isn't supported within workflows yet"); +}) as unknown as typeof globalThis.setTimeout; + +const unsupportedSetInterval = ((..._args: any[]) => { + throw new Error("setInterval isn't supported within workflows yet"); +}) as unknown as typeof globalThis.setInterval; + +function defineWorkflowAwareGlobal( + globalObject: Record, + key: string, + getWorkflowValue: (environment: WorkflowEnvironment) => T, +): void { + const descriptor = Object.getOwnPropertyDescriptor(globalObject, key); + if (descriptor?.configurable === false) { + return; + } + + let outsideValue = globalObject[key] as T; + Object.defineProperty(globalObject, key, { + configurable: true, + enumerable: descriptor?.enumerable ?? true, + get() { + const environment = workflowEnvironmentStorage?.getStore(); + if (environment !== undefined) { + return getWorkflowValue(environment); + } + return outsideValue; + }, + set(value: T) { + outsideValue = value; + }, + }); +} + +function createWorkflowEnvironment( getGenerationState: () => GenerationState, workflowId: string, -): void { - const global = globalThis as Record; +): WorkflowEnvironment { + return { + math: patchMath(originalGlobals.Math, workflowId), + date: createDeterministicDate(originalGlobals.Date, getGenerationState), + console: createConsole(originalGlobals.console, getGenerationState), + fetch: unsupportedFetch, + setTimeout: unsupportedSetTimeout, + setInterval: unsupportedSetInterval, + }; +} - // Patch Math with seeded random based on workflowId - global.Math = patchMath(global.Math as typeof Math, workflowId); +export function setupEnvironment(): void { + if (globalsPatched) { + return; + } - // Patch Date - const originalDate = global.Date as typeof Date; - global.Date = createDeterministicDate(originalDate, getGenerationState); + ensureWorkflowEnvironmentStorage(); - // Patch console - global.console = createConsole(global.console as Console, getGenerationState); + const global = globalThis as Record; + defineWorkflowAwareGlobal(global, "Math", (environment) => environment.math); + defineWorkflowAwareGlobal(global, "Date", (environment) => environment.date); + defineWorkflowAwareGlobal(global, "console", (environment) => environment.console); + defineWorkflowAwareGlobal(global, "fetch", (environment) => environment.fetch); + defineWorkflowAwareGlobal(global, "setTimeout", (environment) => environment.setTimeout); + defineWorkflowAwareGlobal(global, "setInterval", (environment) => environment.setInterval); - // Patch fetch - global.fetch = (_input: RequestInfo | URL, _init?: RequestInit) => { - throw new Error( - `Fetch isn't currently supported within workflows. Perform the fetch within an action and call it with step.runAction().`, - ); - }; + const restrictedGlobals = [ + "process", + "Crypto", + "crypto", + "CryptoKey", + "SubtleCrypto", + ]; + for (const key of restrictedGlobals) { + defineWorkflowAwareGlobal(global, key, () => undefined); + } - // Remove non-deterministic globals - delete global.process; - delete global.Crypto; - delete global.crypto; - delete global.CryptoKey; - delete global.SubtleCrypto; - global.setTimeout = () => { - throw new Error("setTimeout isn't supported within workflows yet"); - }; - global.setInterval = () => { - throw new Error("setInterval isn't supported within workflows yet"); - }; + globalsPatched = true; +} + +export function runWithWorkflowEnvironment( + getGenerationState: () => GenerationState, + workflowId: string, + run: () => T, +): T { + setupEnvironment(); + if (workflowEnvironmentStorage === undefined) { + throw new Error("AsyncLocalStorage is not initialized"); + } + return workflowEnvironmentStorage.run( + createWorkflowEnvironment(getGenerationState, workflowId), + run, + ); } function noop() {} diff --git a/src/client/workflowMutation.ts b/src/client/workflowMutation.ts index 5afb2a6b..d85c2fb0 100644 --- a/src/client/workflowMutation.ts +++ b/src/client/workflowMutation.ts @@ -15,7 +15,7 @@ import { } from "convex/values"; import { createLogger } from "../component/logging.js"; import { type JournalEntry } from "../component/schema.js"; -import { setupEnvironment } from "./environment.js"; +import { runWithWorkflowEnvironment } from "./environment.js"; import type { WorkflowDefinition } from "./index.js"; import { StepExecutor, type StepRequest, type WorkerResult } from "./step.js"; import { createWorkflowCtx } from "./workflowContext.js"; @@ -128,8 +128,6 @@ export function workflowMutation( Date.now(), workpoolOptions, ); - setupEnvironment(executor.getGenerationState.bind(executor), workflowId); - const handlerWorker = async (): Promise => { let runResult: RunResult; try { @@ -167,7 +165,11 @@ export function workflowMutation( const executorWorker = async (): Promise => { return await executor.run(); }; - const result = await Promise.race([handlerWorker(), executorWorker()]); + const result = await runWithWorkflowEnvironment( + executor.getGenerationState.bind(executor), + workflowId, + () => Promise.race([handlerWorker(), executorWorker()]), + ); switch (result.type) { case "handlerDone": { await ctx.runMutation(component.workflow.complete, {