feat(engine): PlainAdapter with action timeouts#15
feat(engine): PlainAdapter with action timeouts#15albertgwo wants to merge 2 commits intofeat/engine-pr2-engine-lifecyclefrom
Conversation
Introduce an adapter abstraction layer so action handler execution can be plugged into different runtimes (PlainAdapter for in-process, TemporalAdapter later for durable execution). - ExecutionAdapter interface with executeAction(), init/shutdown lifecycle - PlainAdapter: in-process execution with timeout enforcement via AbortSignal combining and Promise.race - ActionTimeoutError for timeout failures - CompiledFlow delegates registered/entry_point handler calls through the adapter; expressions/rules stay engine-native - Default adapter is PlainAdapter when none is provided
PlainAdapter unit tests: - Handler execution and result passing - ExecutionContext with AbortSignal forwarding - Timeout enforcement (50ms kills slow handler) - Fast handler with generous timeout succeeds - Pre-aborted signal rejects immediately - Default timeout vs config override behavior - Handler error propagation (non-timeout) Integration tests: - Engine with explicit PlainAdapter - Engine with custom mock adapter verifies call args - Engine without adapter defaults to PlainAdapter Lifecycle tests: - init/shutdown callable on PlainAdapter (no-op) - Custom adapter with init/shutdown lifecycle
| private readonly resolvedHandlers: ReadonlyMap<string, ResolvedHandler>, | ||
| private readonly options: EngineOptions, | ||
| ) {} | ||
| ) { | ||
| this.adapter = options.adapter ?? new PlainAdapter({ defaultTimeout: options.defaultTimeout }) | ||
| } |
There was a problem hiding this comment.
CompiledFlow stores an adapter but never calls its init or shutdown — should we await this.adapter.init?.() on startup and await this.adapter.shutdown?.() on teardown?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/engine/compiled-flow.ts around lines 33 to 37, the CompiledFlow
constructor currently assigns this.adapter but never calls adapter.init() or
adapter.shutdown(), so adapters that need setup/teardown will leak resources. Add an
explicit async init(): Promise<void> method on CompiledFlow that awaits
this.adapter.init?.(), and an async shutdown(): Promise<void> (or close/teardown) method
that awaits this.adapter.shutdown?.(); add a private boolean this._initialized to
prevent double-init and have execute paths (or buildCallbacks) assert or lazily await
init if it hasn't run yet. Update the constructor to set the adapter and initialized
flag to false and document that callers must call flow.init() before running, or
alternatively implement a safe lazy-init call in the first executeAction call that
awaits adapter.init() once. Ensure types are updated and add unit tests to verify init
and shutdown are invoked for a mock adapter.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| switch (handler.type) { | ||
| case 'registered': | ||
| case 'entry_point': | ||
| result = await handler.fn(ctx) | ||
| result = await adapter.executeAction(nodeId, handler.fn, ctx, { | ||
| metadata: node.metadata as Record<string, unknown> | undefined, | ||
| }) | ||
| break |
There was a problem hiding this comment.
The switch branch for registered/entry_point nodes now calls adapter.executeAction(...) with the node metadata, and exactly the same sequence appears again in onParallel (lines 272‑276) and onError (lines 349‑351). Each branch repeats the adapter call, metadata extraction, and state tracking, so any future change to how metadata is merged, autocancelled, or logged would need to be updated in three spots. Can we extract a helper on CompiledFlow (e.g. executeRegisteredHandler(nodeId, handler, ctx, nodeMetadata)) so this shared behavior is centralized and future updates happen in one place?
Finding type: Code Dedup and Conventions | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| import type { ExecutionContext } from '../walker/types.js' | ||
| import type { ExecutionAdapter, ActionConfig } from './types.js' | ||
|
|
||
| /** | ||
| * Error thrown when an action handler exceeds its timeout. |
There was a problem hiding this comment.
flowprint-engine was changed but no .changeset/.md was added — should we add one documenting the engine update?
Finding type: AI Coding Guidelines | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/adapters/plain.ts around lines 1-5, the new PlainAdapter was
added which modifies the @ruminaider/flowprint-engine package and requires a release
changeset. Add a new file under .changeset (e.g. .changeset/add-plain-adapter.md) that
contains a short summary like: "Add PlainAdapter: in-process execution adapter for
dev/testing" and tells the changesets tool to bump @ruminaider/flowprint-engine (suggest
minor). Ensure the changeset file follows the repository's changeset format (package
name -> minor) and commit it alongside this change so release metadata is present.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| // Reject immediately if signal is already aborted | ||
| if (combinedSignal.aborted) { | ||
| throw new ActionTimeoutError(nodeId, timeout) | ||
| } | ||
|
|
||
| // Promise.race: handler vs timeout backstop | ||
| const timeoutId = setTimeout(() => controller.abort(), timeout) | ||
| try { | ||
| const result = await Promise.race([ | ||
| handler(adapterCtx), | ||
| new Promise<never>((_, reject) => { | ||
| combinedSignal.addEventListener( |
There was a problem hiding this comment.
Combined signal aborts always reject with ActionTimeoutError even for pre‑aborts or user cancellations — should we propagate the actual abort reason or throw a distinct cancellation error instead?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/adapters/plain.ts around lines 65 to 83, the executeAction logic
currently treats any abort on the combined signal as a timeout and throws
ActionTimeoutError. Change it to preserve and propagate the actual abort reason: (1)
when creating the timeout backstop, call controller.abort(new ActionTimeoutError(nodeId,
timeout)) so the controller's abort reason indicates a timeout; (2) replace the
immediate check at lines ~65-68 to inspect combinedSignal.reason and if it is an
ActionTimeoutError throw it, otherwise throw or rethrow a distinct CancellationError (or
the original reason) to represent user/walk cancellations; (3) in the combinedSignal
'abort' event handler (lines ~76-82) reject with signal.reason if present, or with
ActionTimeoutError only when the reason is an ActionTimeoutError. This preserves timeout
vs cancellation semantics for callers.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
Deploying flowprint with
|
| Latest commit: |
0d11a51
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://a30ed179.flowprint.pages.dev |
| Branch Preview URL: | https://feat-engine-pr3-plain-adapte.flowprint.pages.dev |
Code Review FindingsCritical (90-100 confidence)1. Pre-aborted signal throws misleading
setTimeout(() => controller.abort(new ActionTimeoutError(nodeId, timeout)), timeout)
// In handler:
throw combinedSignal.reason instanceof ActionTimeoutError
? combinedSignal.reason
: combinedSignal.reason ?? new Error('Action cancelled')2.
Important (80-89 confidence)3. Per-action
4.
5. Missing changeset file (85)
|
User description
Summary
Dependency
Base: `feat/engine-pr2-engine-lifecycle`
PR 7 of 16 in the execution engine implementation.
Test plan
Generated description
Below is a concise technical summary of the changes proposed in this PR:
Introduce the
ExecutionAdaptercontract and itsPlainAdapterimplementation so action handlers execute via adapters that enforce per-node timeouts, propagate metadata, and respect default limits, enabling in-process flows without Temporal. Expose the adapter surface through engine options and exports while wiringCompiledFlowto delegate handler execution to the configured adapter to preserve backward compatibility.PlainAdapterandFlowprintEngineexecution.Modified files (1)
Latest Contributors(0)
ExecutionAdaptersurface, thePlainAdapterimplementation, and how the engine now routes handler execution through adapters with timeout/cancellation support, metadata forwarding, and default wiring to keep existing flows working without Temporal.Modified files (7)
Latest Contributors(0)