Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions src/client/environment.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
import { AsyncLocalStorage } from "node:async_hooks";
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import {
patchMath,
createDeterministicDate,
createConsole,
setupEnvironment,
runWithWorkflowEnvironment,
} from "./environment.js";

// The workflow environment expects AsyncLocalStorage on globalThis (as provided by
// convex-backend's async_hooks runtime). Polyfill it here so tests run in Node.js.
if (
(globalThis as { AsyncLocalStorage?: typeof AsyncLocalStorage })
.AsyncLocalStorage === undefined
) {
(globalThis as { AsyncLocalStorage?: typeof AsyncLocalStorage }).AsyncLocalStorage =
AsyncLocalStorage;
}

describe("environment patching units", () => {
describe("patchMath", () => {
it("should preserve all Math methods except random", () => {
Expand Down Expand Up @@ -503,4 +516,68 @@ describe("environment patching units", () => {
expect(mockConsole.info).not.toHaveBeenCalled();
});
});

describe("workflow-scoped patching", () => {
it("keeps patched globals out of non-workflow execution", async () => {
const originalSetTimeout = globalThis.setTimeout;
const originalSetInterval = globalThis.setInterval;
const originalFetch = globalThis.fetch;
const originalDateNow = Date.now;

setupEnvironment();

await runWithWorkflowEnvironment(
() => ({ now: 12345, latest: true }),
"workflow-1",
async () => {
expect(Date.now()).toBe(12345);
expect(() => setTimeout(() => {}, 0)).toThrow(
/setTimeout isn't supported within workflows yet/,
);
expect(() => setInterval(() => {}, 0)).toThrow(
/setInterval isn't supported within workflows yet/,
);
expect(() => fetch("https://example.com")).toThrow(
/Fetch isn't currently supported within workflows/,
);

await Promise.resolve();
expect(() => setTimeout(() => {}, 0)).toThrow(
/setTimeout isn't supported within workflows yet/,
);
},
);

expect(globalThis.setTimeout).toBe(originalSetTimeout);
expect(globalThis.setInterval).toBe(originalSetInterval);
expect(globalThis.fetch).toBe(originalFetch);
expect(Date.now).toBe(originalDateNow);

const timeout = setTimeout(() => {}, 0);
clearTimeout(timeout);
});

it("keeps restricted globals available outside workflow execution", () => {
setupEnvironment();

const hasProcessOutside = "process" in globalThis;
const hasCryptoOutside = "crypto" in globalThis;

runWithWorkflowEnvironment(
() => ({ now: 5, latest: true }),
"workflow-2",
() => {
expect((globalThis as { process?: unknown }).process).toBeUndefined();
expect((globalThis as { crypto?: unknown }).crypto).toBeUndefined();
},
);

if (hasProcessOutside) {
expect((globalThis as { process?: unknown }).process).toBeDefined();
}
if (hasCryptoOutside) {
expect((globalThis as { crypto?: unknown }).crypto).toBeDefined();
}
});
});
});
164 changes: 136 additions & 28 deletions src/client/environment.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,48 @@
type GenerationState = { now: number; latest: boolean };

type WorkflowEnvironment = {
math: typeof Math;
date: typeof Date;
console: Console;
fetch: typeof globalThis.fetch;
setTimeout: typeof globalThis.setTimeout;
setInterval: typeof globalThis.setInterval;
};

type AsyncLocalStorageLike<T> = {
run<R>(store: T, callback: () => R): R;
getStore(): T | undefined;
};

type AsyncLocalStorageConstructor = new <T>() => AsyncLocalStorageLike<T>;

let workflowEnvironmentStorage: AsyncLocalStorageLike<WorkflowEnvironment> | undefined;
let globalsPatched = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't this !!workflowEnvironmentsStorage? What's different?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storage creation and global patching are separate phases. globalsPatched makes that boundary explicit and avoids conflating partial setup with done.


// Capture original globals before any patching occurs so createWorkflowEnvironment
// always wraps the true originals, even if called from within an active workflow context.
const originalGlobals = {
Math: globalThis.Math,
Date: globalThis.Date,
console: globalThis.console,
};

function ensureWorkflowEnvironmentStorage() {
if (workflowEnvironmentStorage !== undefined) {
return;
}

const global = globalThis as {
AsyncLocalStorage?: AsyncLocalStorageConstructor;
};
if (global.AsyncLocalStorage === undefined) {
throw new Error(
"AsyncLocalStorage is not available in this runtime. Update convex-backend to a build with async_hooks support.",
);
}
workflowEnvironmentStorage = new global.AsyncLocalStorage<WorkflowEnvironment>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why using the global instead of an import?

Copy link
Copy Markdown
Author

@robelest robelest Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the global path because it gives deterministic diagnostics (explicit availability check + clear error). The current node:async_hooks shim also has permissive fallbacks (createHook no-op, IDs 0) when async_hooks is absent, so it can fail less explicitly. Happy to follow up by tightening shim error behavior so import-path diagnostics match the global-path check

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Will this work in convex-test where our v8 shim doesn’t run? I don’t know if it’s normally a global or if we’re special-casing our own behavior

}

// Simple hash function to convert a string to a 32-bit seed
function hashString(str: string): number {
let hash = 0;
Expand Down Expand Up @@ -82,41 +125,106 @@ export function createDeterministicDate(
return DeterministicDate as typeof Date;
}

export function setupEnvironment(
const unsupportedFetch: typeof globalThis.fetch = (
_input: RequestInfo | URL,
_init?: RequestInit,
) => {
throw new Error(
`Fetch isn't currently supported within workflows. Perform the fetch within an action and call it with step.runAction().`,
);
};

const unsupportedSetTimeout = ((..._args: any[]) => {
throw new Error("setTimeout isn't supported within workflows yet");
}) as unknown as typeof globalThis.setTimeout;

const unsupportedSetInterval = ((..._args: any[]) => {
throw new Error("setInterval isn't supported within workflows yet");
}) as unknown as typeof globalThis.setInterval;

function defineWorkflowAwareGlobal<T>(
globalObject: Record<string, unknown>,
Comment thread
ianmacartney marked this conversation as resolved.
key: string,
getWorkflowValue: (environment: WorkflowEnvironment) => T,
): void {
const descriptor = Object.getOwnPropertyDescriptor(globalObject, key);
if (descriptor?.configurable === false) {
return;
}

let outsideValue = globalObject[key] as T;
Object.defineProperty(globalObject, key, {
configurable: true,
enumerable: descriptor?.enumerable ?? true,
get() {
const environment = workflowEnvironmentStorage?.getStore();
if (environment !== undefined) {
return getWorkflowValue(environment);
}
return outsideValue;
},
set(value: T) {
outsideValue = value;
},
});
}

function createWorkflowEnvironment(
getGenerationState: () => GenerationState,
workflowId: string,
): void {
const global = globalThis as Record<string, unknown>;
): WorkflowEnvironment {
return {
math: patchMath(originalGlobals.Math, workflowId),
date: createDeterministicDate(originalGlobals.Date, getGenerationState),
console: createConsole(originalGlobals.console, getGenerationState),
fetch: unsupportedFetch,
setTimeout: unsupportedSetTimeout,
setInterval: unsupportedSetInterval,
};
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Patch Math with seeded random based on workflowId
global.Math = patchMath(global.Math as typeof Math, workflowId);
export function setupEnvironment(): void {
if (globalsPatched) {
return;
}

// Patch Date
const originalDate = global.Date as typeof Date;
global.Date = createDeterministicDate(originalDate, getGenerationState);
ensureWorkflowEnvironmentStorage();

// Patch console
global.console = createConsole(global.console as Console, getGenerationState);
const global = globalThis as Record<string, unknown>;
defineWorkflowAwareGlobal(global, "Math", (environment) => environment.math);
defineWorkflowAwareGlobal(global, "Date", (environment) => environment.date);
defineWorkflowAwareGlobal(global, "console", (environment) => environment.console);
defineWorkflowAwareGlobal(global, "fetch", (environment) => environment.fetch);
defineWorkflowAwareGlobal(global, "setTimeout", (environment) => environment.setTimeout);
defineWorkflowAwareGlobal(global, "setInterval", (environment) => environment.setInterval);

// Patch fetch
global.fetch = (_input: RequestInfo | URL, _init?: RequestInit) => {
throw new Error(
`Fetch isn't currently supported within workflows. Perform the fetch within an action and call it with step.runAction().`,
);
};
const restrictedGlobals = [
"process",
"Crypto",
"crypto",
"CryptoKey",
"SubtleCrypto",
];
for (const key of restrictedGlobals) {
defineWorkflowAwareGlobal(global, key, () => undefined);
}

// Remove non-deterministic globals
delete global.process;
delete global.Crypto;
delete global.crypto;
delete global.CryptoKey;
delete global.SubtleCrypto;
global.setTimeout = () => {
throw new Error("setTimeout isn't supported within workflows yet");
};
global.setInterval = () => {
throw new Error("setInterval isn't supported within workflows yet");
};
globalsPatched = true;
}

export function runWithWorkflowEnvironment<T>(
getGenerationState: () => GenerationState,
workflowId: string,
run: () => T,
): T {
setupEnvironment();
if (workflowEnvironmentStorage === undefined) {
throw new Error("AsyncLocalStorage is not initialized");
}
return workflowEnvironmentStorage.run(
createWorkflowEnvironment(getGenerationState, workflowId),
run,
);
}

function noop() {}
Expand Down
10 changes: 6 additions & 4 deletions src/client/workflowMutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
} from "convex/values";
import { createLogger } from "../component/logging.js";
import { type JournalEntry } from "../component/schema.js";
import { setupEnvironment } from "./environment.js";
import { runWithWorkflowEnvironment } from "./environment.js";
import type { WorkflowDefinition } from "./index.js";
import { StepExecutor, type StepRequest, type WorkerResult } from "./step.js";
import { createWorkflowCtx } from "./workflowContext.js";
Expand Down Expand Up @@ -128,8 +128,6 @@ export function workflowMutation<ArgsValidator extends PropertyValidators>(
Date.now(),
workpoolOptions,
);
setupEnvironment(executor.getGenerationState.bind(executor), workflowId);

const handlerWorker = async (): Promise<WorkerResult> => {
let runResult: RunResult;
try {
Expand Down Expand Up @@ -167,7 +165,11 @@ export function workflowMutation<ArgsValidator extends PropertyValidators>(
const executorWorker = async (): Promise<WorkerResult> => {
return await executor.run();
};
const result = await Promise.race([handlerWorker(), executorWorker()]);
const result = await runWithWorkflowEnvironment(
executor.getGenerationState.bind(executor),
workflowId,
() => Promise.race([handlerWorker(), executorWorker()]),
);
switch (result.type) {
case "handlerDone": {
await ctx.runMutation(component.workflow.complete, {
Expand Down
Loading