Skip to content

Commit 08d37e4

Browse files
author
bcode
committed
fix(bcode-laminar): track pending flushes so shutdown awaits in-flight OTLP exports
rc5 confirmed the issue: session.idle's await processor.forceFlush() never returns (no completion log printed) because the in-flight OTLP HTTP export is killed by process.exit before it can resolve. The host doesn't track these Promises because they're orphan microtasks from its perspective. Fix: track every processor.forceFlush() call kicked off by bus event handlers (session.idle, session.deleted) in a module-local pendingFlushes Set. The sync shutdown hook now awaits Promise.all(pendingFlushes) before returning, so the host's 3s race actually waits for the in-flight HTTP request to complete. trackFlush() wraps each Promise with .catch + .finally(delete) so failures don't leak and resolved entries are cleaned up. The shutdown hook itself also fires a final forceFlush to drain any spans the bus handlers haven't gotten to yet (defense in depth).
1 parent 1b18379 commit 08d37e4

1 file changed

Lines changed: 44 additions & 27 deletions

File tree

packages/bcode-laminar/src/plugin.ts

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,21 @@ export const LaminarPlugin: Plugin = ({ client }) => {
6969
sdk.start()
7070
log("info", `Laminar tracing initialized → ${otlpEndpoint ?? baseUrl}`)
7171

72+
// Track forceFlush() Promises kicked off by bus event handlers
73+
// (session.idle, session.deleted). Each is fire-started by the handler
74+
// but the host doesn't await them — they're orphan microtasks. If
75+
// `process.exit()` fires before they resolve, in-flight OTLP HTTP
76+
// requests die and the span is dropped server-side.
77+
//
78+
// The sync shutdown hook awaits this set before returning so the host's
79+
// `Promise.race([hooks, 3s])` race can let the export actually finish.
80+
const pendingFlushes = new Set<Promise<void>>()
81+
const trackFlush = (p: Promise<void> | undefined): void => {
82+
if (!p) return
83+
const wrapped = p.catch(() => {}).finally(() => pendingFlushes.delete(wrapped))
84+
pendingFlushes.add(wrapped)
85+
}
86+
7287
return Promise.resolve({
7388
config: async (config) => {
7489
if (!config.experimental?.openTelemetry) {
@@ -82,22 +97,24 @@ export const LaminarPlugin: Plugin = ({ client }) => {
8297
// this hook from its top-level finally before forceFlush, so span.end()
8398
// here gets its export drained by the host's existing forceFlush race.
8499
shutdown: async () => {
85-
// End any still-open turn spans synchronously, then drain the inner
86-
// BatchSpanProcessor. The host awaits this Promise with a bounded
87-
// race so a wedged exporter cannot hang `process.exit()`.
100+
// End any still-open turn spans, then drain. Awaits both an explicit
101+
// forceFlush AND any pendingFlushes kicked off by the bus event
102+
// handlers (session.idle, session.deleted) that the host doesn't
103+
// otherwise wait for. The host's `Promise.race([hooks, 3000ms])`
104+
// bounds this so a wedged exporter cannot hang `process.exit()`.
88105
//
89-
// This runs AFTER the Effect runtime has torn down. The bus-based
90-
// session.idle / server.instance.disposed handlers may have already
91-
// emptied `sessionCurrentTurnSpan` and unregistered the global
92-
// TracerProvider via `sdk.shutdown()`. Either way, the BSP itself
93-
// still has its queue intact and can drain — we hold a direct ref
94-
// to `processor` via closure.
106+
// The pendingFlushes set is the critical fix: session.idle's await
107+
// on processor.forceFlush() is an orphan microtask from the host's
108+
// perspective — it kicks off the OTLP HTTP export but `process.exit()`
109+
// would kill the request mid-flight without us tracking it here.
95110
//
96111
// stderr writes go to v4-worker's bcode-output-<runId>.log so cloud
97112
// verification can see whether this path executed. Temporary, will
98113
// be removed once headless V4 telemetry is settled.
99114
const sessionIds = Object.keys(sessionCurrentTurnSpan)
100-
process.stderr.write(`[bcode-laminar] shutdown: ending ${sessionIds.length} open turn span(s)\n`)
115+
process.stderr.write(
116+
`[bcode-laminar] shutdown: ending ${sessionIds.length} open turn span(s), waiting on ${pendingFlushes.size} pending flush(es)\n`,
117+
)
101118
for (const sessionId of sessionIds) {
102119
const span = sessionCurrentTurnSpan[sessionId]
103120
if (!span) continue
@@ -110,14 +127,17 @@ export const LaminarPlugin: Plugin = ({ client }) => {
110127
}
111128
delete sessionCurrentTurnSpan[sessionId]
112129
}
113-
process.stderr.write(`[bcode-laminar] shutdown: forceFlush start\n`)
114130
const start = Date.now()
131+
// Kick a final flush AND wait for any in-flight ones from bus handlers.
132+
trackFlush(processor.forceFlush())
115133
try {
116-
await processor.forceFlush()
117-
process.stderr.write(`[bcode-laminar] shutdown: forceFlush done in ${Date.now() - start}ms\n`)
134+
await Promise.all(Array.from(pendingFlushes))
135+
process.stderr.write(
136+
`[bcode-laminar] shutdown: all flushes done in ${Date.now() - start}ms\n`,
137+
)
118138
} catch (err) {
119139
process.stderr.write(
120-
`[bcode-laminar] shutdown: forceFlush threw after ${Date.now() - start}ms: ${(err as Error).message}\n`,
140+
`[bcode-laminar] shutdown: flush threw after ${Date.now() - start}ms: ${(err as Error).message}\n`,
121141
)
122142
}
123143
},
@@ -128,22 +148,19 @@ export const LaminarPlugin: Plugin = ({ client }) => {
128148
const span = sessionCurrentTurnSpan[sessionId]
129149
if (span) {
130150
const sid = span.spanContext().spanId
131-
process.stderr.write(`[bcode-laminar] session.idle: ending turn span ${sid} session=${sessionId}\n`)
151+
process.stderr.write(
152+
`[bcode-laminar] session.idle: ending turn span ${sid} session=${sessionId}\n`,
153+
)
132154
span.end()
133155
delete sessionCurrentTurnSpan[sessionId]
134-
const start = Date.now()
135-
try {
136-
await processor.forceFlush()
137-
process.stderr.write(`[bcode-laminar] session.idle: forceFlush done in ${Date.now() - start}ms\n`)
138-
} catch (err) {
139-
process.stderr.write(
140-
`[bcode-laminar] session.idle: forceFlush threw after ${Date.now() - start}ms: ${(err as Error).message}\n`,
141-
)
142-
}
143156
} else {
144-
process.stderr.write(`[bcode-laminar] session.idle: no turn span for session=${sessionId}\n`)
145-
await processor.forceFlush()
157+
process.stderr.write(
158+
`[bcode-laminar] session.idle: no turn span for session=${sessionId}\n`,
159+
)
146160
}
161+
// Track the flush Promise so the sync shutdown hook can await it
162+
// before process.exit. Fire-and-forget from this fiber's POV.
163+
trackFlush(processor.forceFlush())
147164
break
148165
}
149166
case "server.instance.disposed": {
@@ -182,7 +199,7 @@ export const LaminarPlugin: Plugin = ({ client }) => {
182199
}
183200
delete subagentSessionIds[sessionId]
184201
for (const children of Object.values(subagentSessionIds)) children.delete(sessionId)
185-
await processor.forceFlush()
202+
trackFlush(processor.forceFlush())
186203
break
187204
}
188205
}

0 commit comments

Comments
 (0)