Skip to content

Commit dda1d20

Browse files
author
bcode
committed
fix(plugin): synchronous shutdown hook for OTel span drain
Revert PR #74's bus-await mechanism (v0.1.7) and replace with a synchronous shutdown hook invoked from src/index.ts's top-level finally before forceFlush. PR #74 was a no-op in headless 'bcode run' mode because the plugin's bus subscriber fiber gets interrupted by Effect scope teardown before it can process session.idle / server.instance.disposed events — confirmed by A/B trace shape comparison between v0.1.6 and v0.1.7 (identical, turn parent span missing in both). The new path is a direct function call from inside the running finally, so the event loop is alive, no scope race, no bus dependency.
1 parent 41277ae commit dda1d20

4 files changed

Lines changed: 56 additions & 29 deletions

File tree

packages/bcode-laminar/src/plugin.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,18 @@ export const LaminarPlugin: Plugin = ({ client }) => {
7575
config.experimental = { ...(config.experimental ?? {}), openTelemetry: true }
7676
}
7777
},
78+
// Synchronous end-of-turn drain. The bus-based session.idle /
79+
// server.instance.disposed events race with Effect scope teardown in
80+
// headless `bcode run` mode and don't reliably deliver, so the turn span
81+
// was historically being left un-ended and never exported. The host calls
82+
// this hook from its top-level finally before forceFlush, so span.end()
83+
// here gets its export drained by the host's existing forceFlush race.
84+
shutdown: () => {
85+
for (const [sessionId, span] of Object.entries(sessionCurrentTurnSpan)) {
86+
span.end()
87+
delete sessionCurrentTurnSpan[sessionId]
88+
}
89+
},
7890
event: async ({ event }) => {
7991
switch (event.type) {
8092
case "session.idle": {

packages/opencode/src/index.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,23 @@ try {
250250
}
251251
process.exitCode = 1
252252
} finally {
253+
// Give plugins a synchronous chance to end any open OTel spans before the
254+
// exporter drain below. The bus-based session.idle / server.instance.disposed
255+
// events race with Effect scope teardown and don't reliably reach plugin
256+
// subscribers in headless `bcode run` mode, so we expose a direct sync hook
257+
// (see packages/opencode/src/plugin/index.ts pluginShutdownHooks).
258+
const { pluginShutdownHooks } = await import("./plugin")
259+
for (const hook of pluginShutdownHooks) {
260+
try {
261+
hook()
262+
} catch (err) {
263+
Log.Default.error("plugin shutdown hook failed", { error: err })
264+
}
265+
}
253266
// Drain any registered OTel span processors (e.g. bcode-laminar) before
254-
// exiting. The plugin's `session.idle` event handler is invoked
255-
// fire-and-forget (`packages/opencode/src/plugin/index.ts:249`), so its
256-
// `processor.forceFlush()` Promise was never awaited — without this drain,
257-
// `process.exit()` kills any in-flight gRPC export and the final agent
258-
// span is lost. Bounded with a 3 s race so a wedged exporter cannot hang
259-
// bcode on exit. Generic to any OTel-based plugin, not laminar-specific.
267+
// exiting so the just-ended turn spans actually hit the wire. Bounded with
268+
// a 3 s race so a wedged exporter cannot hang bcode on exit. Generic to any
269+
// OTel-based plugin, not laminar-specific.
260270
const provider = trace.getTracerProvider() as { forceFlush?: () => Promise<void> }
261271
if (provider.forceFlush) {
262272
await Promise.race([

packages/opencode/src/plugin/index.ts

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ import { RuntimeFlags } from "@/effect/runtime-flags"
3232

3333
const log = Log.create({ service: "plugin" })
3434

35+
// Synchronous shutdown hooks invoked from src/index.ts's top-level finally
36+
// before forceFlush. Plugins register here when loaded; runs once per
37+
// process before process.exit(). Module-level intentionally — needs to be
38+
// reachable outside the Effect runtime.
39+
export const pluginShutdownHooks = new Set<() => void>()
40+
3541
type State = {
3642
hooks: Hooks[]
3743
}
@@ -244,38 +250,28 @@ export const layer = Layer.effect(
244250
}).pipe(Effect.ignore)
245251
}
246252

247-
// Subscribe to bus events, fiber interrupted when scope closes.
248-
// session.idle and server.instance.disposed are plugins' only chance to
249-
// drain async work (e.g. OTel span exporters) before src/index.ts's
250-
// top-level finally runs forceFlush and calls process.exit() — await
251-
// those handlers; keep the rest fire-and-forget for throughput.
253+
// Subscribe to bus events, fiber interrupted when scope closes
252254
yield* bus.subscribeAll().pipe(
253255
Stream.runForEach((input) =>
254-
Effect.promise(async () => {
255-
const awaitHook = input.type === "server.instance.disposed" || input.type === "session.idle"
256+
Effect.sync(() => {
256257
for (const hook of hooks) {
257-
try {
258-
const ret = hook["event"]?.({ event: input as any })
259-
if (awaitHook && ret) {
260-
await ret
261-
} else if (ret) {
262-
// Fire-and-forget path: surface async failures to logs instead of letting them
263-
// become unhandledRejections that hide which plugin/event broke.
264-
void Promise.resolve(ret).catch((err) =>
265-
log.error("plugin event hook failed", { error: err }),
266-
)
267-
}
268-
} catch (err) {
269-
// Catches sync throws + awaited async rejections so one bad plugin can't kill
270-
// the subscription fiber and silently disable every other plugin.
271-
log.error("plugin event hook failed", { error: err })
272-
}
258+
void hook["event"]?.({ event: input as any })
273259
}
274260
}),
275261
),
276262
Effect.forkScoped,
277263
)
278264

265+
// Register synchronous shutdown hooks for the top-level finally in
266+
// src/index.ts. Runs before forceFlush so plugins can end any open
267+
// OTel spans (e.g. bcode-laminar's turn span) — the bus-based
268+
// session.idle / server.instance.disposed paths race with scope
269+
// teardown and don't reliably deliver, so plugins need a direct sync
270+
// entry point.
271+
for (const hook of hooks) {
272+
if (hook.shutdown) pluginShutdownHooks.add(hook.shutdown)
273+
}
274+
279275
return { hooks }
280276
}),
281277
)

packages/plugin/src/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,15 @@ export type AuthOuathResult = AuthOAuthResult
222222
export interface Hooks {
223223
event?: (input: { event: Event }) => Promise<void>
224224
config?: (input: Config) => Promise<void>
225+
/**
226+
* Synchronous shutdown hook invoked once per process before
227+
* `process.exit()`, after the event loop has finished its last task and
228+
* before the host's OTel span exporter drain. Use this to end any
229+
* still-open OTel spans your plugin created — async work is not honored
230+
* here, but ending a span (`span.end()`) is synchronous and the host's
231+
* `forceFlush` runs right after this hook.
232+
*/
233+
shutdown?: () => void
225234
tool?: {
226235
[key: string]: ToolDefinition
227236
}

0 commit comments

Comments
 (0)