Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { ModelID, ProviderID } from "@/provider/schema"
import { Effect } from "effect"
import { EffectLogger } from "@/effect"
import { isMedia } from "@/util/media"
import { classifyStreamFailure } from "./stream-failure-classifier"
import { classifyStreamFailure, classifyBareTransportMessage, type TransportDisconnect } from "./stream-failure-classifier"
import { LLMTrace } from "./llm-trace"
import { RunObservability } from "./run-observability"
import { RunLifecycle } from "./run-lifecycle"
Expand Down Expand Up @@ -1344,6 +1344,25 @@ export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: Ses
return filterCompacted(stream(sessionID))
})

// Build the transport-disconnect APIError from a classified disconnect. Shared
// by the errno-code path (high priority, ahead of stream parsing) and the
// bare-message fallback (last resort, after stream parsing fails).
function transportDisconnectError(e: unknown, transport: TransportDisconnect) {
const message = (e as Error).message || ""
return new APIError(
{
// Per-errno: most transport disconnects are retryable, but a permanent one
// (e.g. ENOTFOUND, an unresolved host) is not — classifyRetry reads this so
// it stops instead of retrying into a stall.
message: message || "Connection interrupted",
isRetryable: transport.retryable,
metadata: { code: transport.code, message },
providerFailure: { kind: "transport_disconnect", code: transport.code },
},
{ cause: e },
).toObject()
}

export function fromError(
e: unknown,
ctx: { providerID: ProviderID; aborted?: boolean },
Expand All @@ -1366,21 +1385,12 @@ export function fromError(
},
{ cause: e },
).toObject()
case classifyStreamFailure(e) !== undefined: {
const transport = classifyStreamFailure(e)!
return new APIError(
{
message: (e as Error).message || "Connection interrupted",
isRetryable: true,
metadata: {
code: transport.code,
message: (e as Error).message || "",
},
providerFailure: { kind: "transport_disconnect", code: transport.code },
},
{ cause: e },
).toObject()
}
// Errno-coded transport disconnect (top-level or in the cause chain) is a
// definitive signal, so it runs ahead of stream parsing. The bare-message
// fallback does NOT run here — it is demoted below parseStreamError so a
// structured error carrying a transport phrase is not mis-grabbed.
case classifyStreamFailure(e) !== undefined:
return transportDisconnectError(e, classifyStreamFailure(e)!)
case e instanceof Error && (e as FetchDecompressionError).code === "ZlibError":
if (ctx.aborted) {
return new AbortedError({ message: e.message }, { cause: e }).toObject()
Expand Down Expand Up @@ -1425,7 +1435,7 @@ export function fromError(
},
{ cause: e },
).toObject()
default:
default: {
// A provider error can arrive raw or wrapped in an Error (the stream
// "error" part throws value.error; the iterator-throw mapper hands back a
// value). Run the stream parser for both before falling back to Unknown so
Expand Down Expand Up @@ -1456,10 +1466,15 @@ export function fromError(
).toObject()
}
} catch {}
// Last resort: a bare connection-dropped message (no errno code, not a
// structured stream error) is still a retryable transport disconnect.
const bareTransport = classifyBareTransportMessage(e)
if (bareTransport) return transportDisconnectError(e, bareTransport)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return new NamedError.Unknown(
{ message: e instanceof Error ? errorMessage(e) : JSON.stringify(e) },
{ cause: e },
).toObject()
}
}
}

Expand Down
18 changes: 17 additions & 1 deletion packages/opencode/src/session/retry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,30 @@ describe("classifyRetry — reads providerFailure.kind (slice ④)", () => {

// Transient kinds always retry, even if isRetryable is false and the status is
// not 5xx — the classification, not the SDK flag, is the source of truth.
for (const kind of ["rate_limit", "server_overload", "transport_disconnect", "decompression"] as const) {
for (const kind of ["rate_limit", "server_overload", "decompression"] as const) {
test(`transient kind ${kind} retries even when isRetryable is false and status is not 5xx`, () => {
expect(
classifyRetry(makeAPIError({ isRetryable: false, statusCode: 400, providerFailure: { kind } }))?.kind,
).toBe("unknown")
})
}

// transport_disconnect honors the per-errno isRetryable the stream classifier
// sets (#1105b): most transport errnos are transient, but a permanent one
// (e.g. ENOTFOUND — unresolved host) is marked isRetryable=false and must not
// auto-retry into a stall.
test("transport_disconnect retries when the classifier marked it retryable", () => {
expect(
classifyRetry(makeAPIError({ isRetryable: true, providerFailure: { kind: "transport_disconnect" } }))?.kind,
).toBe("unknown")
})

test("transport_disconnect does not retry when the classifier marked it non-retryable", () => {
expect(
classifyRetry(makeAPIError({ isRetryable: false, providerFailure: { kind: "transport_disconnect" } })),
).toBeUndefined()
})

test("unknown kind falls back to the legacy isRetryable + 5xx gate", () => {
expect(
classifyRetry(makeAPIError({ isRetryable: true, statusCode: 404, providerFailure: { kind: "unknown" } }))?.kind,
Expand Down
26 changes: 13 additions & 13 deletions packages/opencode/src/session/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@ export type { RetryAction } from "./retry-classification"
// always retry. classifyRetry reads these so the retry decision follows the
// canonical classification instead of re-deriving it from the provider SDK's
// isRetryable flag; `unknown` or absent kinds fall back to that legacy signal.
// transport_disconnect is handled separately: it honors the per-errno
// isRetryable the stream classifier set (most errnos transient, ENOTFOUND not).
const RETRY_TERMINAL_KINDS = new Set<ProviderFailureKind>(["auth", "invalid_request", "quota_exhausted"])
const RETRY_TRANSIENT_KINDS = new Set<ProviderFailureKind>([
"rate_limit",
"server_overload",
"transport_disconnect",
"decompression",
])
const RETRY_TRANSIENT_KINDS = new Set<ProviderFailureKind>(["rate_limit", "server_overload", "decompression"])

export const RETRY_INITIAL_DELAY = 2000
export const RETRY_BACKOFF_FACTOR = 2
Expand Down Expand Up @@ -85,16 +82,19 @@ export function classifyRetry(error: Err): RetryClassification | undefined {
const status = error.data.statusCode
const kind = error.data.providerFailure?.kind
// Retry/stop gate. Prefer the canonical providerFailure.kind: terminal kinds
// never retry, transient kinds always do. When the kind is `unknown` or the
// row predates providerFailure, fall back to the legacy signal — isRetryable,
// or a 5xx the provider SDK didn't explicitly mark retryable. The two agree
// for every classified kind today.
// never retry, transient kinds always do. transport_disconnect honors the
// per-errno isRetryable the stream classifier set (most errnos transient,
// ENOTFOUND not). When the kind is `unknown` or the row predates
// providerFailure, fall back to the legacy signal — isRetryable, or a 5xx the
// provider SDK didn't explicitly mark retryable.
const retryable =
kind && RETRY_TERMINAL_KINDS.has(kind)
? false
: kind && RETRY_TRANSIENT_KINDS.has(kind)
? true
: error.data.isRetryable || (status !== undefined && status >= 500)
: kind === "transport_disconnect"
? error.data.isRetryable
: kind && RETRY_TRANSIENT_KINDS.has(kind)
? true
: error.data.isRetryable || (status !== undefined && status >= 500)
if (!retryable) return undefined

// Strict 3-way AND: opencode provider + FreeUsageLimitError marker in body
Expand Down
156 changes: 89 additions & 67 deletions packages/opencode/src/session/stream-failure-classifier.test.ts
Original file line number Diff line number Diff line change
@@ -1,85 +1,107 @@
import { describe, expect, test } from "bun:test"
import { classifyStreamFailure } from "./stream-failure-classifier"
import { classifyStreamFailure, classifyBareTransportMessage } from "./stream-failure-classifier"

describe("classifyStreamFailure", () => {
describe("transport disconnect — retryable", () => {
test("ECONNRESET SystemError", () => {
const error = Object.assign(new Error("read ECONNRESET"), { code: "ECONNRESET", syscall: "read" })
const result = classifyStreamFailure(error)
expect(result).toEqual({
kind: "provider_transport_disconnect",
retryable: true,
code: "ECONNRESET",
})
})
// Every transport errno and its expected retryability. ENOTFOUND is the only
// permanent one (unresolved host); everything else is transient. Driven as a
// matrix so adding/renaming a code in TRANSPORT_CODES forces a matching row.
const TRANSPORT_CODE_MATRIX: ReadonlyArray<readonly [code: string, retryable: boolean]> = [
["ECONNRESET", true],
["ECONNREFUSED", true],
["ETIMEDOUT", true],
["ECONNABORTED", true],
["EPIPE", true],
["EHOSTUNREACH", true],
["ENETUNREACH", true],
["EAI_AGAIN", true],
["ENOTFOUND", false],
["UND_ERR_SOCKET", true],
["UND_ERR_CONNECT_TIMEOUT", true],
["UND_ERR_HEADERS_TIMEOUT", true],
["UND_ERR_BODY_TIMEOUT", true],
]

test("UND_ERR_SOCKET — TypeError('terminated') with cause.code", () => {
const cause = Object.assign(new Error("other side closed"), { code: "UND_ERR_SOCKET" })
const error = new TypeError("terminated", { cause })
const result = classifyStreamFailure(error)
expect(result).toEqual({
kind: "provider_transport_disconnect",
retryable: true,
code: "UND_ERR_SOCKET",
})
describe("classifyStreamFailure — transport code matrix", () => {
for (const [code, retryable] of TRANSPORT_CODE_MATRIX) {
test(`top-level ${code} → transport disconnect (retryable=${retryable})`, () => {
const error = Object.assign(new Error(`failed: ${code}`), { code })
expect(classifyStreamFailure(error)).toEqual({ kind: "provider_transport_disconnect", retryable, code })
})

test("ECONNREFUSED SystemError", () => {
const error = Object.assign(new Error("connect ECONNREFUSED"), { code: "ECONNREFUSED", syscall: "connect" })
const result = classifyStreamFailure(error)
expect(result).toEqual({
kind: "provider_transport_disconnect",
retryable: true,
code: "ECONNREFUSED",
})
test(`${code} nested in the cause chain → transport disconnect (retryable=${retryable})`, () => {
const cause = Object.assign(new Error("inner"), { code })
const error = new TypeError("fetch failed", { cause })
expect(classifyStreamFailure(error)).toEqual({ kind: "provider_transport_disconnect", retryable, code })
})
}
})

test("ETIMEDOUT SystemError", () => {
const error = Object.assign(new Error("connect ETIMEDOUT"), { code: "ETIMEDOUT", syscall: "connect" })
const result = classifyStreamFailure(error)
expect(result).toEqual({
kind: "provider_transport_disconnect",
retryable: true,
code: "ETIMEDOUT",
})
})
describe("classifyStreamFailure — not a code-based transport error", () => {
test("generic Error returns undefined", () => {
expect(classifyStreamFailure(new Error("something broke"))).toBeUndefined()
})

test("UND_ERR_SOCKET nested in cause chain", () => {
const innerCause = Object.assign(new Error("socket hang up"), { code: "UND_ERR_SOCKET" })
const midError = new Error("fetch failed", { cause: innerCause })
const outerError = new TypeError("terminated", { cause: midError })
const result = classifyStreamFailure(outerError)
expect(result).toEqual({
kind: "provider_transport_disconnect",
retryable: true,
code: "UND_ERR_SOCKET",
})
})
test("unknown errno code returns undefined", () => {
expect(classifyStreamFailure(Object.assign(new Error("x"), { code: "ESOMETHINGELSE" }))).toBeUndefined()
})

describe("non-transport errors — not classified", () => {
test("generic Error returns undefined", () => {
expect(classifyStreamFailure(new Error("something broke"))).toBeUndefined()
})
test("AbortError returns undefined", () => {
expect(classifyStreamFailure(new DOMException("The operation was aborted", "AbortError"))).toBeUndefined()
})

test("TypeError without transport cause returns undefined", () => {
expect(classifyStreamFailure(new TypeError("Cannot read properties of undefined"))).toBeUndefined()
})
test("non-Error values return undefined", () => {
expect(classifyStreamFailure("string error")).toBeUndefined()
expect(classifyStreamFailure(null)).toBeUndefined()
expect(classifyStreamFailure(42)).toBeUndefined()
})

test("AbortError returns undefined", () => {
const error = new DOMException("The operation was aborted", "AbortError")
expect(classifyStreamFailure(error)).toBeUndefined()
})
test("TypeError('terminated') without a transport-coded cause returns undefined", () => {
expect(classifyStreamFailure(new TypeError("terminated", { cause: new Error("unrelated") }))).toBeUndefined()
})

test("non-Error values return undefined", () => {
expect(classifyStreamFailure("string error")).toBeUndefined()
expect(classifyStreamFailure(null)).toBeUndefined()
expect(classifyStreamFailure(42)).toBeUndefined()
test("a bare 'socket hang up' message is NOT code-classified (the bare-message fallback owns it)", () => {
// The message fallback was moved out of classifyStreamFailure so structured
// stream parsing can run first; the code path only matches errno codes.
expect(classifyStreamFailure(new Error("socket hang up"))).toBeUndefined()
})

test("HTTP error (statusCode) with a transport-coded cause stays an API error, not transport", () => {
const cause = Object.assign(new Error("socket hang up"), { code: "UND_ERR_SOCKET" })
const error = Object.assign(new Error("400 invalid request"), { statusCode: 400, cause })
expect(classifyStreamFailure(error)).toBeUndefined()
})
})

describe("classifyBareTransportMessage — anchored bare connection messages", () => {
test("exact 'socket hang up' → retryable transport", () => {
expect(classifyBareTransportMessage(new Error("socket hang up"))).toEqual({
kind: "provider_transport_disconnect",
retryable: true,
code: "SOCKET_HANG_UP",
})
})

test("TypeError('terminated') without UND_ERR_SOCKET cause returns undefined", () => {
const error = new TypeError("terminated", { cause: new Error("unrelated") })
expect(classifyStreamFailure(error)).toBeUndefined()
test("exact 'premature close' (surrounding whitespace trimmed) → retryable transport", () => {
expect(classifyBareTransportMessage(new Error(" premature close\n"))).toEqual({
kind: "provider_transport_disconnect",
retryable: true,
code: "PREMATURE_CLOSE",
})
})

test("a longer message that merely CONTAINS the phrase is not a bare transport error", () => {
expect(
classifyBareTransportMessage(new Error("invalid request: socket hang up is not allowed")),
).toBeUndefined()
})

test("an HTTP error (statusCode) whose message is exactly 'socket hang up' is not bare transport", () => {
expect(
classifyBareTransportMessage(Object.assign(new Error("socket hang up"), { statusCode: 502 })),
).toBeUndefined()
})

test("non-Error value and unrelated message return undefined", () => {
expect(classifyBareTransportMessage("socket hang up")).toBeUndefined()
expect(classifyBareTransportMessage(new Error("something else"))).toBeUndefined()
})
})
Loading