Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ private async Task HandleStdoutAsync(ServerStreamEvent ev, long timestamp)
IsError = false
};

_execution.Logs.Stdout.Add(msg);
if (_handlers is not { SkipAccumulation: true })
{
_execution.Logs.Stdout.Add(msg);
}

if (_handlers?.OnStdout != null)
{
Expand All @@ -112,7 +115,10 @@ private async Task HandleStderrAsync(ServerStreamEvent ev, long timestamp)
IsError = true
};

_execution.Logs.Stderr.Add(msg);
if (_handlers is not { SkipAccumulation: true })
{
_execution.Logs.Stderr.Add(msg);
}

if (_handlers?.OnStderr != null)
{
Expand Down
7 changes: 7 additions & 0 deletions sdks/sandbox/csharp/src/OpenSandbox/Models/Execution.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,11 @@ public class ExecutionHandlers
/// Gets or sets the handler for execution initialization.
/// </summary>
public Func<ExecutionInit, Task>? OnInit { get; set; }

/// <summary>
/// When true, stdout/stderr messages are only delivered to handlers without
/// being accumulated in <see cref="ExecutionLogs"/>. Use for long-running
/// executions to prevent unbounded memory growth.
/// </summary>
public bool SkipAccumulation { get; set; }
}
21 changes: 17 additions & 4 deletions sdks/sandbox/go/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ type ExecutionHandlers struct {
OnResult func(ExecutionResult) error
OnComplete func(ExecutionComplete) error
OnError func(ExecutionError) error

// SkipAccumulation, when true, prevents stdout/stderr messages from being
// accumulated in the Execution struct. Messages are still delivered to handlers.
// Use for long-running executions to prevent unbounded memory growth.
SkipAccumulation bool
Comment thread
Pangjiping marked this conversation as resolved.
}

// sseErrorPayload is the nested error object in a ServerStreamEvent.
Expand Down Expand Up @@ -146,7 +151,9 @@ func processStreamEvent(exec *Execution, event StreamEvent, handlers *ExecutionH
if err := json.Unmarshal([]byte(data), &ev); err != nil {
// Not JSON — treat as raw stdout
msg := OutputMessage{Text: data}
exec.Stdout = append(exec.Stdout, msg)
if handlers == nil || !handlers.SkipAccumulation {
exec.Stdout = append(exec.Stdout, msg)
}
if handlers != nil && handlers.OnStdout != nil {
return handlers.OnStdout(msg)
}
Expand All @@ -163,14 +170,18 @@ func processStreamEvent(exec *Execution, event StreamEvent, handlers *ExecutionH

case "stdout":
msg := OutputMessage{Text: ev.Text, Timestamp: ev.Timestamp}
exec.Stdout = append(exec.Stdout, msg)
if handlers == nil || !handlers.SkipAccumulation {
exec.Stdout = append(exec.Stdout, msg)
}
if handlers != nil && handlers.OnStdout != nil {
return handlers.OnStdout(msg)
}

case "stderr":
msg := OutputMessage{Text: ev.Text, Timestamp: ev.Timestamp}
exec.Stderr = append(exec.Stderr, msg)
if handlers == nil || !handlers.SkipAccumulation {
exec.Stderr = append(exec.Stderr, msg)
}
if handlers != nil && handlers.OnStderr != nil {
return handlers.OnStderr(msg)
}
Expand Down Expand Up @@ -239,7 +250,9 @@ func processStreamEvent(exec *Execution, event StreamEvent, handlers *ExecutionH
// Unknown event type — treat as stdout
if ev.Text != "" {
msg := OutputMessage{Text: ev.Text, Timestamp: ev.Timestamp}
exec.Stdout = append(exec.Stdout, msg)
if handlers == nil || !handlers.SkipAccumulation {
exec.Stdout = append(exec.Stdout, msg)
}
if handlers != nil && handlers.OnStdout != nil {
return handlers.OnStdout(msg)
}
Expand Down
6 changes: 6 additions & 0 deletions sdks/sandbox/javascript/src/models/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,10 @@ export interface ExecutionHandlers {
onExecutionComplete?: (c: ExecutionComplete) => void | Promise<void>;
onError?: (err: ExecutionError) => void | Promise<void>;
onInit?: (init: ExecutionInit) => void | Promise<void>;
/**
* When true, stdout/stderr messages are only delivered to handlers without
* being accumulated in the execution logs. Use for long-running executions
* to prevent unbounded memory growth.
*/
skipAccumulation?: boolean;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@ export class ExecutionEventDispatcher {
}
case "stdout": {
const msg: OutputMessage = { text: ev.text ?? "", timestamp: ts, isError: false };
this.execution.logs.stdout.push(msg);
if (!this.handlers?.skipAccumulation) {
this.execution.logs.stdout.push(msg);
}
await this.handlers?.onStdout?.(msg);
return;
}
case "stderr": {
const msg: OutputMessage = { text: ev.text ?? "", timestamp: ts, isError: true };
this.execution.logs.stderr.push(msg);
if (!this.handlers?.skipAccumulation) {
this.execution.logs.stderr.push(msg);
}
await this.handlers?.onStderr?.(msg);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ class ExecutionHandlers private constructor(
* Called when code execution starts.
*/
val onInit: OutputHandler<ExecutionInit>? = null,
/**
* When true, stdout/stderr messages are only delivered to handlers without
* being accumulated in [ExecutionLogs]. Use this for long-running executions
* to prevent unbounded memory growth.
*/
val skipAccumulation: Boolean = false,
) {
companion object {
@JvmStatic
Expand All @@ -229,6 +235,7 @@ class ExecutionHandlers private constructor(
private var onExecutionComplete: OutputHandler<ExecutionComplete>? = null
private var onError: OutputHandler<ExecutionError>? = null
private var onInit: OutputHandler<ExecutionInit>? = null
private var skipAccumulation: Boolean = false

fun onStdout(handler: OutputHandler<OutputMessage>): Builder {
this.onStdout = handler
Expand Down Expand Up @@ -260,6 +267,11 @@ class ExecutionHandlers private constructor(
return this
}

fun skipAccumulation(skip: Boolean): Builder {
this.skipAccumulation = skip
return this
}

fun build(): ExecutionHandlers {
return ExecutionHandlers(
onStdout = onStdout,
Expand All @@ -268,6 +280,7 @@ class ExecutionHandlers private constructor(
onExecutionComplete = onExecutionComplete,
onError = onError,
onInit = onInit,
skipAccumulation = skipAccumulation,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ class ExecutionEventDispatcher(
) {
val stdoutText = eventNode.text ?: ""
val stdoutMessage = OutputMessage(stdoutText, timestamp, false)
execution.logs.addStdout(stdoutMessage)
if (handlers?.skipAccumulation != true) {
execution.logs.addStdout(stdoutMessage)
}
handlers?.onStdout?.handle(stdoutMessage)
}

Expand All @@ -72,7 +74,9 @@ class ExecutionEventDispatcher(
) {
val stderrText = eventNode.text ?: ""
val stderrMessage = OutputMessage(stderrText, timestamp, true)
execution.logs.addStderr(stderrMessage)
if (handlers?.skipAccumulation != true) {
execution.logs.addStderr(stderrMessage)
}
handlers?.onStderr?.handle(stderrMessage)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ async def _handle_stdout(self, event_node: EventNode, timestamp: int) -> None:
timestamp=timestamp,
is_error=False,
)
self.execution.logs.add_stdout(message)
if not (self.handlers and self.handlers.skip_accumulation):
self.execution.logs.add_stdout(message)
if self.handlers and self.handlers.on_stdout:
await self.handlers.on_stdout(message)

Expand All @@ -91,7 +92,8 @@ async def _handle_stderr(self, event_node: EventNode, timestamp: int) -> None:
timestamp=timestamp,
is_error=True,
)
self.execution.logs.add_stderr(message)
if not (self.handlers and self.handlers.skip_accumulation):
self.execution.logs.add_stderr(message)
if self.handlers and self.handlers.on_stderr:
await self.handlers.on_stderr(message)

Expand Down
6 changes: 6 additions & 0 deletions sdks/sandbox/python/src/opensandbox/models/execd.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ async def handle_stdout(msg: OutputMessage):
on_init: AsyncOutputHandler | None = Field(
default=None, description="Async handler for execution init"
)
skip_accumulation: bool = Field(
default=False,
description="When True, stdout/stderr messages are only delivered to handlers "
"without being accumulated in ExecutionLogs. Use for long-running "
"executions to prevent unbounded memory growth.",
)

model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)

Expand Down
5 changes: 5 additions & 0 deletions sdks/sandbox/python/src/opensandbox/models/execd_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,10 @@ class ExecutionHandlersSync(BaseModel):
on_execution_complete: SyncOutputHandler | None = Field(default=None, alias="on_execution_complete")
on_error: SyncOutputHandler | None = Field(default=None)
on_init: SyncOutputHandler | None = Field(default=None)
skip_accumulation: bool = Field(
default=False,
description="When True, stdout/stderr messages are only delivered to handlers "
"without being accumulated in ExecutionLogs.",
)

model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ def _handle_init(self, event_node: EventNode, timestamp: int) -> None:

def _handle_stdout(self, event_node: EventNode, timestamp: int) -> None:
message = OutputMessage(text=event_node.text or "", timestamp=timestamp, is_error=False)
self.execution.logs.add_stdout(message)
if not (self.handlers and self.handlers.skip_accumulation):
self.execution.logs.add_stdout(message)
if self.handlers and self.handlers.on_stdout:
self.handlers.on_stdout(message)

def _handle_stderr(self, event_node: EventNode, timestamp: int) -> None:
message = OutputMessage(text=event_node.text or "", timestamp=timestamp, is_error=True)
self.execution.logs.add_stderr(message)
if not (self.handlers and self.handlers.skip_accumulation):
self.execution.logs.add_stderr(message)
if self.handlers and self.handlers.on_stderr:
self.handlers.on_stderr(message)

Expand Down
Loading