feat(grpc): add FlushCache RPC and profiling support for gRPC mode#1088
feat(grpc): add FlushCache RPC and profiling support for gRPC mode#1088Kangyan-Zhou wants to merge 1 commit intolightseekorg:mainfrom
Conversation
📝 WalkthroughWalkthroughThe changes implement a new Changes
Sequence DiagramsequenceDiagram
participant Client as External Client
participant GWGrpc as Gateway gRPC Server
participant WMgr as WorkerManager
participant HttpWorker as HTTP Worker
participant GrpcClient as gRPC Client
participant SGrpc as Sglang gRPC Server
participant ReqMgr as RequestManager
participant Scheduler as Scheduler Process
Client->>GWGrpc: FlushCache(timeout_s)
activate GWGrpc
GWGrpc->>WMgr: flush_cache_all()
activate WMgr
par HTTP Fan-out
WMgr->>HttpWorker: POST /flush_cache
activate HttpWorker
HttpWorker-->>WMgr: success/failure
deactivate HttpWorker
and gRPC Fan-out
WMgr->>GrpcClient: flush_cache(timeout_s)
activate GrpcClient
GrpcClient->>SGrpc: FlushCache RPC
activate SGrpc
SGrpc->>ReqMgr: send_communicator_req(FlushCacheReqInput)
activate ReqMgr
ReqMgr->>Scheduler: send via ZMQ
activate Scheduler
Scheduler-->>ReqMgr: FlushCacheReqOutput
deactivate Scheduler
ReqMgr-->>SGrpc: FlushCacheResponse(success, message)
deactivate ReqMgr
SGrpc-->>GrpcClient: FlushCacheResponse
deactivate SGrpc
GrpcClient-->>WMgr: (success, message)
deactivate GrpcClient
end
WMgr-->>GWGrpc: FlushCacheResult (aggregated)
deactivate WMgr
GWGrpc-->>Client: FlushCacheResult
deactivate GWGrpc
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces profiling capabilities to the GrpcRequestManager and adds a new ZMQ socket (recv_from_tokenizer) to handle communicator responses directly from the scheduler. It also implements start_profile and stop_profile methods, along with a callback mechanism in serve_grpc to allow external components to interact with the GrpcRequestManager upon initialization. Feedback includes addressing potential compatibility issues with TimeoutError, improving ZMQ error handling consistency, and using UUIDs for profile identifiers to prevent collisions.
| async def _execute_profile(self, req: ProfileReq): | ||
| try: | ||
| results = await self.profile_communicator(req, timeout=600.0) | ||
| except TimeoutError: |
There was a problem hiding this comment.
In Python versions prior to 3.11, TimeoutError is not a built-in exception, and asyncio.wait_for raises asyncio.TimeoutError. Using the bare TimeoutError here will cause a NameError on those Python versions when a timeout occurs. It is safer to use asyncio.TimeoutError for compatibility.
| except TimeoutError: | |
| except asyncio.TimeoutError: |
| except zmq.error.ZMQError as e: | ||
| if self.gracefully_exit: | ||
| break | ||
| logger.error(f"ZMQ error in tokenizer loop: {e}") | ||
| break |
There was a problem hiding this comment.
To maintain consistency with handle_loop and avoid logic duplication, consider refactoring the ZMQ error handling into a shared helper function. The tokenizer loop should handle zmq.error.Again and include a traceback for ZMQError to ensure transient timeouts don't unexpectedly terminate the task and to provide better debugging information.
| except zmq.error.ZMQError as e: | |
| if self.gracefully_exit: | |
| break | |
| logger.error(f"ZMQ error in tokenizer loop: {e}") | |
| break | |
| except zmq.error.Again: | |
| if self.gracefully_exit: | |
| break | |
| continue | |
| except zmq.error.ZMQError as e: | |
| if self.gracefully_exit: | |
| logger.debug(f"ZMQ recv interrupted during shutdown: {e}") | |
| break | |
| logger.error(f"ZMQ error in tokenizer loop: {e}\n{get_exception_traceback()}") | |
| break |
References
- If a code block is identified as duplicated across multiple functions or modules, consider refactoring to unify the logic into a shared helper function or class. This improves maintainability and reduces the chance of inconsistencies when changes are needed.
There was a problem hiding this comment.
Fixed — added zmq.error.Again handler and logger.debug on graceful shutdown to match handle_loop's pattern exactly.
| with_stack=with_stack, | ||
| record_shapes=record_shapes, | ||
| profile_by_stage=profile_by_stage, | ||
| profile_id=str(time.time()), |
There was a problem hiding this comment.
Using time.time() for a profile ID can lead to collisions if multiple profiles are started in rapid succession and results in a string representation that may be less ideal for filenames (containing dots). Using a UUID is more robust and consistent with how request_id is generated elsewhere in this class.
| profile_id=str(time.time()), | |
| profile_id=f"profile-{uuid.uuid4().hex}", |
cd99331 to
63ae959
Compare
9738eca to
b4034da
Compare
ad792bb to
4754d5a
Compare
Hybrid approach for admin/observability in gRPC mode: gRPC native (FlushCache RPC): - Proto: FlushCacheRequest/FlushCacheResponse messages - Servicer: FlushCache handler with timeout and multi-DP error handling - Rust client: flush_cache method returning (success, message) - Router: flush_cache_all updated to fan out to gRPC workers natively Transport layer (GrpcRequestManager): - recv_from_tokenizer socket + _handle_tokenizer_loop for communicator responses sent via scheduler's send_to_tokenizer path - send_communicator_req: generic thin transport for sglang-side logic - Communicators for profile, flush_cache, get_internal_state - on_request_manager_ready async callback to serve_grpc() Signed-off-by: Kangyan Zhou <zky314343421@gmail.com> Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
4754d5a to
18d209c
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 18d209c868
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| "flush_cache_communicator", | ||
| timeout=comm_timeout, | ||
| ) | ||
| except TimeoutError: |
There was a problem hiding this comment.
Catch asyncio timeout in FlushCache handler
send_communicator_req() is backed by asyncio.wait_for, which raises asyncio.TimeoutError on Python 3.10, but this handler catches built-in TimeoutError, so timeout cases bypass this branch and get reported by the generic Exception path as INTERNAL instead of DEADLINE_EXCEEDED. This misclassifies real scheduler timeouts and breaks client retry/diagnostic behavior for FlushCache requests under load or partial scheduler stalls.
Useful? React with 👍 / 👎.
| async move { | ||
| let url = worker.url().to_string(); | ||
| match worker.get_grpc_client().await { | ||
| Ok(Some(grpc_client)) => match grpc_client.flush_cache(0.0).await { |
There was a problem hiding this comment.
Bound gRPC flush calls with per-worker timeout
The gRPC fan-out path awaits grpc_client.flush_cache(0.0) without any timeout wrapper, unlike the HTTP branch which enforces REQUEST_TIMEOUT; as a result, one slow or unreachable gRPC worker can stall join_all and keep /flush_cache hanging for an unbounded/very long time. This turns an admin endpoint into a potential blocker during partial outages, so each gRPC flush should be wrapped with a per-worker timeout and treated as a failed worker on expiry.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/smg_grpc_servicer/sglang/server.py`:
- Around line 120-121: The on_request_manager_ready callback is awaited after
creating request_manager and scheduler_procs but before the main shutdown guard,
so any exception leaks child processes/sockets; wrap the await
on_request_manager_ready(request_manager, server_args, scheduler_info) in a
try/except (or move the await inside the existing startup/teardown guard) and on
exception perform clean shutdown steps: call request_manager.shutdown() (or its
stop/close method), stop/bootstrap_server.shutdown() (or the server close
routine), and terminate/join each process in scheduler_procs before re-raising
the exception; ensure you reference the same symbols (on_request_manager_ready,
request_manager, bootstrap_server, scheduler_procs) to locate and run their
existing cleanup paths.
In `@model_gateway/src/routers/grpc/client.rs`:
- Around line 237-246: The match in GrpcClient::flush_cache omits the Self::Mlx
variant causing a non-exhaustive match; add an explicit branch for Self::Mlx in
the flush_cache method (alongside Self::Sglang, Self::Vllm, Self::Trtllm). If
the Mlx client exposes a flush_cache method, call it the same way as in
Self::Sglang (await, unpack resp.success and resp.message, return Ok); if it
does not support flush_cache, return Err(tonic::Status::unimplemented(...)) for
Self::Mlx so the match is exhaustive.
In `@model_gateway/src/worker/manager.rs`:
- Around line 734-753: gRPC fan-out currently awaits
grpc_client.flush_cache(0.0) with join_all, causing unbounded concurrency and
infinite waits; modify the async closure built from grpc_workers (the one that
clones Arc and calls get_grpc_client()) to wrap the flush_cache(...) call with
tokio::time::timeout(REQUEST_TIMEOUT, ...), handle Timeout errors as an Err
result string, and then replace the future::join_all(grpc_futures).await
aggregation with
stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect().await so
concurrency is bounded and each flush respects REQUEST_TIMEOUT; keep handling of
Ok(Some(grpc_client)), Ok(None), and Err from get_grpc_client unchanged but
adapt error strings for timeouts and cancelled tasks.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 3fa01a34-e8cc-4cf9-9d81-fdf0801ed51f
📒 Files selected for processing (7)
crates/grpc_client/proto/sglang_scheduler.protocrates/grpc_client/src/sglang_scheduler.rsgrpc_servicer/smg_grpc_servicer/sglang/request_manager.pygrpc_servicer/smg_grpc_servicer/sglang/server.pygrpc_servicer/smg_grpc_servicer/sglang/servicer.pymodel_gateway/src/routers/grpc/client.rsmodel_gateway/src/worker/manager.rs
| if on_request_manager_ready is not None: | ||
| await on_request_manager_ready(request_manager, server_args, scheduler_info) |
There was a problem hiding this comment.
Clean up launched resources if the startup hook fails.
Line 120 awaits the hook after the scheduler processes and GrpcRequestManager already exist, but before serve_grpc() reaches its shutdown path. If that callback raises, startup aborts with child processes and ZMQ sockets still alive.
Move this hook under the same startup/teardown guard as the rest of the server bootstrap, or explicitly shut down request_manager, bootstrap_server, and scheduler_procs before re-raising.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@grpc_servicer/smg_grpc_servicer/sglang/server.py` around lines 120 - 121, The
on_request_manager_ready callback is awaited after creating request_manager and
scheduler_procs but before the main shutdown guard, so any exception leaks child
processes/sockets; wrap the await on_request_manager_ready(request_manager,
server_args, scheduler_info) in a try/except (or move the await inside the
existing startup/teardown guard) and on exception perform clean shutdown steps:
call request_manager.shutdown() (or its stop/close method),
stop/bootstrap_server.shutdown() (or the server close routine), and
terminate/join each process in scheduler_procs before re-raising the exception;
ensure you reference the same symbols (on_request_manager_ready,
request_manager, bootstrap_server, scheduler_procs) to locate and run their
existing cleanup paths.
| pub async fn flush_cache(&self, timeout_s: f32) -> Result<(bool, String), tonic::Status> { | ||
| match self { | ||
| Self::Sglang(client) => { | ||
| let resp = client.flush_cache(timeout_s).await?; | ||
| Ok((resp.success, resp.message)) | ||
| } | ||
| Self::Vllm(_) | Self::Trtllm(_) => Err(tonic::Status::unimplemented( | ||
| "flush_cache not supported for this backend", | ||
| )), | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
python - <<'PY'
from pathlib import Path
path = Path("model_gateway/src/routers/grpc/client.rs")
text = path.read_text()
enum_block = text.split("pub enum GrpcClient", 1)[1].split("impl GrpcClient", 1)[0]
method_block = text.split("pub async fn flush_cache", 1)[1].split("/// Fetch tokenizer bundle", 1)[0]
print("enum_has_mlx =", "Mlx(" in enum_block)
print("flush_cache_handles_mlx =", "Self::Mlx" in method_block)
PYRepository: lightseekorg/smg
Length of output: 111
Handle the GrpcClient::Mlx variant in this match.
The match self on line 238 is non-exhaustive. The GrpcClient enum includes the Mlx variant, but this method does not handle it, preventing compilation.
Suggested fix
pub async fn flush_cache(&self, timeout_s: f32) -> Result<(bool, String), tonic::Status> {
match self {
Self::Sglang(client) => {
let resp = client.flush_cache(timeout_s).await?;
Ok((resp.success, resp.message))
}
- Self::Vllm(_) | Self::Trtllm(_) => Err(tonic::Status::unimplemented(
+ Self::Vllm(_) | Self::Trtllm(_) | Self::Mlx(_) => Err(tonic::Status::unimplemented(
"flush_cache not supported for this backend",
)),
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/routers/grpc/client.rs` around lines 237 - 246, The match
in GrpcClient::flush_cache omits the Self::Mlx variant causing a non-exhaustive
match; add an explicit branch for Self::Mlx in the flush_cache method (alongside
Self::Sglang, Self::Vllm, Self::Trtllm). If the Mlx client exposes a flush_cache
method, call it the same way as in Self::Sglang (await, unpack resp.success and
resp.message, return Ok); if it does not support flush_cache, return
Err(tonic::Status::unimplemented(...)) for Self::Mlx so the match is exhaustive.
| let grpc_futures: Vec<_> = grpc_workers | ||
| .iter() | ||
| .map(|w| { | ||
| let worker = Arc::clone(w); | ||
| async move { | ||
| let url = worker.url().to_string(); | ||
| match worker.get_grpc_client().await { | ||
| Ok(Some(grpc_client)) => match grpc_client.flush_cache(0.0).await { | ||
| Ok((true, _)) => (url, Ok(())), | ||
| Ok((false, msg)) => (url, Err(format!("flush failed: {msg}"))), | ||
| Err(e) => (url, Err(format!("gRPC error: {e}"))), | ||
| }, | ||
| Ok(None) => (url, Err("no gRPC client available".to_string())), | ||
| Err(e) => (url, Err(format!("failed to get gRPC client: {e}"))), | ||
| } | ||
| } | ||
| }) | ||
| .collect(); | ||
|
|
||
| let grpc_results: Vec<_> = future::join_all(grpc_futures).await; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "HTTP fan-out path:"
sed -n '52,84p' model_gateway/src/worker/manager.rs
echo
echo "gRPC flush path:"
sed -n '733,759p' model_gateway/src/worker/manager.rsRepository: lightseekorg/smg
Length of output: 2254
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Get full flush_cache_all function
echo "=== Full flush_cache_all function ==="
sed -n '691,780p' model_gateway/src/worker/manager.rs
echo
echo "=== REQUEST_TIMEOUT and MAX_CONCURRENT definitions ==="
rg 'REQUEST_TIMEOUT|MAX_CONCURRENT' model_gateway/src/worker/manager.rs -B 2 -A 2Repository: lightseekorg/smg
Length of output: 4039
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if flush_cache_all has a caller-level timeout wrapper
echo "=== Calls to flush_cache_all ==="
rg 'flush_cache_all' model_gateway/src --type rust -B 3 -A 3
echo
echo "=== gRPC client builder for timeouts ==="
fd -t f 'grpc_client' crates/grpc_client/src | head -5 | xargs -I {} sh -c 'echo "=== {} ==="; head -100 {}'Repository: lightseekorg/smg
Length of output: 1580
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Searching for flush_cache method definition in gRPC clients ==="
fd -t f '\.rs$' crates/grpc_client/src | xargs grep -l 'flush_cache' | head -3
echo
echo "=== flush_cache method signatures ==="
rg 'fn flush_cache' crates/grpc_client/src -A 5 | head -40
echo
echo "=== gRPC timeout patterns in model_gateway ==="
rg 'timeout|Timeout|Duration' model_gateway/src/routers/grpc --type rust -c | sort | uniq -c | head -10Repository: lightseekorg/smg
Length of output: 892
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Check for gRPC request timeout patterns in model_gateway ==="
rg 'tokio::time::timeout|timeout\(' model_gateway/src/routers/grpc --type rust -B 2 -A 2 | head -60Repository: lightseekorg/smg
Length of output: 568
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== All flush_cache calls in the codebase ==="
rg 'flush_cache\(' model_gateway/src --type rust -B 2 -A 2
echo
echo "=== Check flush_cache implementation for timeout_s usage ==="
sed -n '/pub async fn flush_cache/,/^ }/p' crates/grpc_client/src/sglang_scheduler.rs | head -40Repository: lightseekorg/smg
Length of output: 2129
🏁 Script executed:
#!/bin/bash
set -euo pipeftp
echo "=== Check if timeout behavior is documented or if 0.0 is intentional ==="
git log --all --oneline --grep="flush_cache" -- model_gateway/src/worker/manager.rs | head -5
git log --all -p -- crates/grpc_client/src/sglang_scheduler.rs | grep -A 5 -B 5 'flush_cache' | head -40Repository: lightseekorg/smg
Length of output: 112
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Check if there's any wrapper timeout around flush_cache_all calls ==="
sed -n '1,/^async fn flush_cache/p' model_gateway/src/server.rs | tail -20
echo
echo "=== Check other RPC calls in grpc/client.rs for timeout patterns ==="
rg 'tokio::time::timeout' model_gateway/src/routers/grpc/client.rs -A 3 -B 1 | head -40Repository: lightseekorg/smg
Length of output: 811
Apply REQUEST_TIMEOUT and concurrency bounding to the gRPC fan-out, matching the HTTP path.
The HTTP fan-out path enforces .timeout(REQUEST_TIMEOUT) (5 seconds) and .buffer_unordered(MAX_CONCURRENT) (32 workers), but the gRPC path calls flush_cache(0.0) (infinite timeout) and uses join_all() (unbounded concurrency). A single stalled gRPC worker can hang the entire cache-flush operation indefinitely; firing all gRPC flushes at once risks overwhelming the fleet on deployments with many workers.
Code comparison
HTTP path (lines 69, 81):
.timeout(REQUEST_TIMEOUT)
.buffer_unordered(MAX_CONCURRENT)
gRPC path (lines 740, 753):
grpc_client.flush_cache(0.0).await
future::join_all(grpc_futures).await
Wrap each flush_cache() call with tokio::time::timeout(REQUEST_TIMEOUT, ...) and replace join_all() with stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect() to match the HTTP implementation.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/worker/manager.rs` around lines 734 - 753, gRPC fan-out
currently awaits grpc_client.flush_cache(0.0) with join_all, causing unbounded
concurrency and infinite waits; modify the async closure built from grpc_workers
(the one that clones Arc and calls get_grpc_client()) to wrap the
flush_cache(...) call with tokio::time::timeout(REQUEST_TIMEOUT, ...), handle
Timeout errors as an Err result string, and then replace the
future::join_all(grpc_futures).await aggregation with
stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect().await so
concurrency is bounded and each flush respects REQUEST_TIMEOUT; keep handling of
Ok(Some(grpc_client)), Ok(None), and Err from get_grpc_client unchanged but
adapt error strings for timeouts and cancelled tasks.
Summary
Adds admin/observability support for gRPC-mode engines using a hybrid approach:
gRPC native: FlushCache RPC
FlushCacheRPC withFlushCacheRequest/FlushCacheResponsemessagesFlushCachehandler usingsend_communicator_reqwith timeout handling and multi-DP-rank error aggregationflush_cachemethod toSglangSchedulerClientflush_cachedispatcher returning(success, message)tupleflush_cache_allto handle gRPC workers natively (no HTTP sidecar needed)Transport layer: GrpcRequestManager
recv_from_tokenizersocket to receive communicator responses (profile, flush_cache, get_internal_state) sent via the scheduler'ssend_to_tokenizerpath_handle_tokenizer_loopwith properzmq.error.Againhandlingsend_communicator_req()— generic thin transport method for sglang-side business logicon_request_manager_readyasync callback toserve_grpc()passing(request_manager, server_args, scheduler_info)Motivation
gRPC mode previously had no way to flush the KV cache, trigger profiler traces, or query server info. The router's
/flush_cachesilently no-oped for gRPC workers (filtered to HTTP-only). This blockedbench_serving.pyand benchmarking workflows against gRPC deployments.Companion PR
sgl-project/sglang#22500
Architecture
/flush_cache/start_profile,/stop_profile/server_info/metricsTest plan
cargo checkfor smg-grpc-client and smg)/flush_cachethrough router 200 (confirmed gRPC path:total_http_workers: 0, workers_flushed: 1)🤖 Generated with Claude Code
Summary by CodeRabbit