From 642a5e3e5b4dbce12251b31d53f9ed7bb76aa07e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Sat, 6 Jun 2026 20:50:52 +0800 Subject: [PATCH] feat(sdk): add skipAccumulation option to ExecutionHandlers to prevent OOM When streaming handlers are provided for long-running executions, the SDK unconditionally accumulates every stdout/stderr message in memory, causing OOM for executions with large output volumes. Add `skipAccumulation` (default false) to ExecutionHandlers across all SDK languages. When enabled, messages are delivered to handler callbacks without being appended to ExecutionLogs, allowing constant-memory streaming. Closes #981 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Internal/ExecutionEventDispatcher.cs | 10 +++++++-- .../src/OpenSandbox/Models/Execution.cs | 7 +++++++ sdks/sandbox/go/execution.go | 21 +++++++++++++++---- .../javascript/src/models/execution.ts | 6 ++++++ .../src/models/executionEventDispatcher.ts | 8 +++++-- .../execd/executions/ExecutionModels.kt | 13 ++++++++++++ .../converter/ExecutionEventDispatcher.kt | 8 +++++-- .../converter/execution_event_dispatcher.py | 6 ++++-- .../python/src/opensandbox/models/execd.py | 6 ++++++ .../src/opensandbox/models/execd_sync.py | 5 +++++ .../converter/execution_event_dispatcher.py | 6 ++++-- 11 files changed, 82 insertions(+), 14 deletions(-) diff --git a/sdks/sandbox/csharp/src/OpenSandbox/Internal/ExecutionEventDispatcher.cs b/sdks/sandbox/csharp/src/OpenSandbox/Internal/ExecutionEventDispatcher.cs index 25886f2b2..274b55b1f 100644 --- a/sdks/sandbox/csharp/src/OpenSandbox/Internal/ExecutionEventDispatcher.cs +++ b/sdks/sandbox/csharp/src/OpenSandbox/Internal/ExecutionEventDispatcher.cs @@ -95,7 +95,10 @@ private async Task HandleStdoutAsync(ServerStreamEvent ev, long timestamp) IsError = false }; - _execution.Logs.Stdout.Add(msg); + if (_handlers is not { SkipAccumulation: true }) + { + _execution.Logs.Stdout.Add(msg); + } if (_handlers?.OnStdout != null) { @@ -112,7 +115,10 @@ private async Task HandleStderrAsync(ServerStreamEvent ev, long timestamp) IsError = true }; - _execution.Logs.Stderr.Add(msg); + if (_handlers is not { SkipAccumulation: true }) + { + _execution.Logs.Stderr.Add(msg); + } if (_handlers?.OnStderr != null) { diff --git a/sdks/sandbox/csharp/src/OpenSandbox/Models/Execution.cs b/sdks/sandbox/csharp/src/OpenSandbox/Models/Execution.cs index cfe34a85c..fc27b9b7a 100644 --- a/sdks/sandbox/csharp/src/OpenSandbox/Models/Execution.cs +++ b/sdks/sandbox/csharp/src/OpenSandbox/Models/Execution.cs @@ -207,4 +207,11 @@ public class ExecutionHandlers /// Gets or sets the handler for execution initialization. /// public Func? OnInit { get; set; } + + /// + /// When true, stdout/stderr messages are only delivered to handlers without + /// being accumulated in . Use for long-running + /// executions to prevent unbounded memory growth. + /// + public bool SkipAccumulation { get; set; } } diff --git a/sdks/sandbox/go/execution.go b/sdks/sandbox/go/execution.go index 93692dd40..1f0deca44 100644 --- a/sdks/sandbox/go/execution.go +++ b/sdks/sandbox/go/execution.go @@ -104,6 +104,11 @@ type ExecutionHandlers struct { OnResult func(ExecutionResult) error OnComplete func(ExecutionComplete) error OnError func(ExecutionError) error + + // SkipAccumulation, when true, prevents stdout/stderr messages from being + // accumulated in the Execution struct. Messages are still delivered to handlers. + // Use for long-running executions to prevent unbounded memory growth. + SkipAccumulation bool } // sseErrorPayload is the nested error object in a ServerStreamEvent. @@ -146,7 +151,9 @@ func processStreamEvent(exec *Execution, event StreamEvent, handlers *ExecutionH if err := json.Unmarshal([]byte(data), &ev); err != nil { // Not JSON — treat as raw stdout msg := OutputMessage{Text: data} - exec.Stdout = append(exec.Stdout, msg) + if handlers == nil || !handlers.SkipAccumulation { + exec.Stdout = append(exec.Stdout, msg) + } if handlers != nil && handlers.OnStdout != nil { return handlers.OnStdout(msg) } @@ -163,14 +170,18 @@ func processStreamEvent(exec *Execution, event StreamEvent, handlers *ExecutionH case "stdout": msg := OutputMessage{Text: ev.Text, Timestamp: ev.Timestamp} - exec.Stdout = append(exec.Stdout, msg) + if handlers == nil || !handlers.SkipAccumulation { + exec.Stdout = append(exec.Stdout, msg) + } if handlers != nil && handlers.OnStdout != nil { return handlers.OnStdout(msg) } case "stderr": msg := OutputMessage{Text: ev.Text, Timestamp: ev.Timestamp} - exec.Stderr = append(exec.Stderr, msg) + if handlers == nil || !handlers.SkipAccumulation { + exec.Stderr = append(exec.Stderr, msg) + } if handlers != nil && handlers.OnStderr != nil { return handlers.OnStderr(msg) } @@ -239,7 +250,9 @@ func processStreamEvent(exec *Execution, event StreamEvent, handlers *ExecutionH // Unknown event type — treat as stdout if ev.Text != "" { msg := OutputMessage{Text: ev.Text, Timestamp: ev.Timestamp} - exec.Stdout = append(exec.Stdout, msg) + if handlers == nil || !handlers.SkipAccumulation { + exec.Stdout = append(exec.Stdout, msg) + } if handlers != nil && handlers.OnStdout != nil { return handlers.OnStdout(msg) } diff --git a/sdks/sandbox/javascript/src/models/execution.ts b/sdks/sandbox/javascript/src/models/execution.ts index 144236dc3..171adda2c 100644 --- a/sdks/sandbox/javascript/src/models/execution.ts +++ b/sdks/sandbox/javascript/src/models/execution.ts @@ -69,4 +69,10 @@ export interface ExecutionHandlers { onExecutionComplete?: (c: ExecutionComplete) => void | Promise; onError?: (err: ExecutionError) => void | Promise; onInit?: (init: ExecutionInit) => void | Promise; + /** + * When true, stdout/stderr messages are only delivered to handlers without + * being accumulated in the execution logs. Use for long-running executions + * to prevent unbounded memory growth. + */ + skipAccumulation?: boolean; } diff --git a/sdks/sandbox/javascript/src/models/executionEventDispatcher.ts b/sdks/sandbox/javascript/src/models/executionEventDispatcher.ts index 303fdcc04..e0f058149 100644 --- a/sdks/sandbox/javascript/src/models/executionEventDispatcher.ts +++ b/sdks/sandbox/javascript/src/models/executionEventDispatcher.ts @@ -48,13 +48,17 @@ export class ExecutionEventDispatcher { } case "stdout": { const msg: OutputMessage = { text: ev.text ?? "", timestamp: ts, isError: false }; - this.execution.logs.stdout.push(msg); + if (!this.handlers?.skipAccumulation) { + this.execution.logs.stdout.push(msg); + } await this.handlers?.onStdout?.(msg); return; } case "stderr": { const msg: OutputMessage = { text: ev.text ?? "", timestamp: ts, isError: true }; - this.execution.logs.stderr.push(msg); + if (!this.handlers?.skipAccumulation) { + this.execution.logs.stderr.push(msg); + } await this.handlers?.onStderr?.(msg); return; } diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/ExecutionModels.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/ExecutionModels.kt index a46b0938f..9c26e9d47 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/ExecutionModels.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/models/execd/executions/ExecutionModels.kt @@ -216,6 +216,12 @@ class ExecutionHandlers private constructor( * Called when code execution starts. */ val onInit: OutputHandler? = null, + /** + * When true, stdout/stderr messages are only delivered to handlers without + * being accumulated in [ExecutionLogs]. Use this for long-running executions + * to prevent unbounded memory growth. + */ + val skipAccumulation: Boolean = false, ) { companion object { @JvmStatic @@ -229,6 +235,7 @@ class ExecutionHandlers private constructor( private var onExecutionComplete: OutputHandler? = null private var onError: OutputHandler? = null private var onInit: OutputHandler? = null + private var skipAccumulation: Boolean = false fun onStdout(handler: OutputHandler): Builder { this.onStdout = handler @@ -260,6 +267,11 @@ class ExecutionHandlers private constructor( return this } + fun skipAccumulation(skip: Boolean): Builder { + this.skipAccumulation = skip + return this + } + fun build(): ExecutionHandlers { return ExecutionHandlers( onStdout = onStdout, @@ -268,6 +280,7 @@ class ExecutionHandlers private constructor( onExecutionComplete = onExecutionComplete, onError = onError, onInit = onInit, + skipAccumulation = skipAccumulation, ) } } diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/converter/ExecutionEventDispatcher.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/converter/ExecutionEventDispatcher.kt index 0bef96ef7..82f271626 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/converter/ExecutionEventDispatcher.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/adapters/converter/ExecutionEventDispatcher.kt @@ -62,7 +62,9 @@ class ExecutionEventDispatcher( ) { val stdoutText = eventNode.text ?: "" val stdoutMessage = OutputMessage(stdoutText, timestamp, false) - execution.logs.addStdout(stdoutMessage) + if (handlers?.skipAccumulation != true) { + execution.logs.addStdout(stdoutMessage) + } handlers?.onStdout?.handle(stdoutMessage) } @@ -72,7 +74,9 @@ class ExecutionEventDispatcher( ) { val stderrText = eventNode.text ?: "" val stderrMessage = OutputMessage(stderrText, timestamp, true) - execution.logs.addStderr(stderrMessage) + if (handlers?.skipAccumulation != true) { + execution.logs.addStderr(stderrMessage) + } handlers?.onStderr?.handle(stderrMessage) } diff --git a/sdks/sandbox/python/src/opensandbox/adapters/converter/execution_event_dispatcher.py b/sdks/sandbox/python/src/opensandbox/adapters/converter/execution_event_dispatcher.py index c3689eee4..a022b1cd7 100644 --- a/sdks/sandbox/python/src/opensandbox/adapters/converter/execution_event_dispatcher.py +++ b/sdks/sandbox/python/src/opensandbox/adapters/converter/execution_event_dispatcher.py @@ -80,7 +80,8 @@ async def _handle_stdout(self, event_node: EventNode, timestamp: int) -> None: timestamp=timestamp, is_error=False, ) - self.execution.logs.add_stdout(message) + if not (self.handlers and self.handlers.skip_accumulation): + self.execution.logs.add_stdout(message) if self.handlers and self.handlers.on_stdout: await self.handlers.on_stdout(message) @@ -91,7 +92,8 @@ async def _handle_stderr(self, event_node: EventNode, timestamp: int) -> None: timestamp=timestamp, is_error=True, ) - self.execution.logs.add_stderr(message) + if not (self.handlers and self.handlers.skip_accumulation): + self.execution.logs.add_stderr(message) if self.handlers and self.handlers.on_stderr: await self.handlers.on_stderr(message) diff --git a/sdks/sandbox/python/src/opensandbox/models/execd.py b/sdks/sandbox/python/src/opensandbox/models/execd.py index f7997ff1c..0ab218728 100644 --- a/sdks/sandbox/python/src/opensandbox/models/execd.py +++ b/sdks/sandbox/python/src/opensandbox/models/execd.py @@ -260,6 +260,12 @@ async def handle_stdout(msg: OutputMessage): on_init: AsyncOutputHandler | None = Field( default=None, description="Async handler for execution init" ) + skip_accumulation: bool = Field( + default=False, + description="When True, stdout/stderr messages are only delivered to handlers " + "without being accumulated in ExecutionLogs. Use for long-running " + "executions to prevent unbounded memory growth.", + ) model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) diff --git a/sdks/sandbox/python/src/opensandbox/models/execd_sync.py b/sdks/sandbox/python/src/opensandbox/models/execd_sync.py index d0375fa56..520cbd4b3 100644 --- a/sdks/sandbox/python/src/opensandbox/models/execd_sync.py +++ b/sdks/sandbox/python/src/opensandbox/models/execd_sync.py @@ -39,5 +39,10 @@ class ExecutionHandlersSync(BaseModel): on_execution_complete: SyncOutputHandler | None = Field(default=None, alias="on_execution_complete") on_error: SyncOutputHandler | None = Field(default=None) on_init: SyncOutputHandler | None = Field(default=None) + skip_accumulation: bool = Field( + default=False, + description="When True, stdout/stderr messages are only delivered to handlers " + "without being accumulated in ExecutionLogs.", + ) model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True) diff --git a/sdks/sandbox/python/src/opensandbox/sync/adapters/converter/execution_event_dispatcher.py b/sdks/sandbox/python/src/opensandbox/sync/adapters/converter/execution_event_dispatcher.py index 0dfb50d7d..5227cd0d4 100644 --- a/sdks/sandbox/python/src/opensandbox/sync/adapters/converter/execution_event_dispatcher.py +++ b/sdks/sandbox/python/src/opensandbox/sync/adapters/converter/execution_event_dispatcher.py @@ -67,13 +67,15 @@ def _handle_init(self, event_node: EventNode, timestamp: int) -> None: def _handle_stdout(self, event_node: EventNode, timestamp: int) -> None: message = OutputMessage(text=event_node.text or "", timestamp=timestamp, is_error=False) - self.execution.logs.add_stdout(message) + if not (self.handlers and self.handlers.skip_accumulation): + self.execution.logs.add_stdout(message) if self.handlers and self.handlers.on_stdout: self.handlers.on_stdout(message) def _handle_stderr(self, event_node: EventNode, timestamp: int) -> None: message = OutputMessage(text=event_node.text or "", timestamp=timestamp, is_error=True) - self.execution.logs.add_stderr(message) + if not (self.handlers and self.handlers.skip_accumulation): + self.execution.logs.add_stderr(message) if self.handlers and self.handlers.on_stderr: self.handlers.on_stderr(message)