Skip to content

feat(grpc): add FlushCache RPC and profiling support for gRPC mode#1088

Open
Kangyan-Zhou wants to merge 1 commit intolightseekorg:mainfrom
Kangyan-Zhou:profile_endpoint
Open

feat(grpc): add FlushCache RPC and profiling support for gRPC mode#1088
Kangyan-Zhou wants to merge 1 commit intolightseekorg:mainfrom
Kangyan-Zhou:profile_endpoint

Conversation

@Kangyan-Zhou
Copy link
Copy Markdown
Contributor

@Kangyan-Zhou Kangyan-Zhou commented Apr 10, 2026

Summary

Adds admin/observability support for gRPC-mode engines using a hybrid approach:

gRPC native: FlushCache RPC

  • Proto: Added FlushCache RPC with FlushCacheRequest/FlushCacheResponse messages
  • Servicer: Implemented FlushCache handler using send_communicator_req with timeout handling and multi-DP-rank error aggregation
  • Rust client: Added flush_cache method to SglangSchedulerClient
  • Router client: Added flush_cache dispatcher returning (success, message) tuple
  • Router manager: Updated flush_cache_all to handle gRPC workers natively (no HTTP sidecar needed)

Transport layer: GrpcRequestManager

  • Added recv_from_tokenizer socket to receive communicator responses (profile, flush_cache, get_internal_state) sent via the scheduler's send_to_tokenizer path
  • Added _handle_tokenizer_loop with proper zmq.error.Again handling
  • Added send_communicator_req() — generic thin transport method for sglang-side business logic
  • Added communicators for profile, flush_cache, and get_internal_state
  • Added on_request_manager_ready async callback to serve_grpc() passing (request_manager, server_args, scheduler_info)

Motivation

gRPC mode previously had no way to flush the KV cache, trigger profiler traces, or query server info. The router's /flush_cache silently no-oped for gRPC workers (filtered to HTTP-only). This blocked bench_serving.py and benchmarking workflows against gRPC deployments.

Companion PR

sgl-project/sglang#22500

Architecture

Endpoint Transport Why
/flush_cache gRPC RPC (via router) Should fan out to all workers; router already has the HTTP route
/start_profile, /stop_profile HTTP sidecar (direct engine) PD mode needs per-worker targeting, not fan-out
/server_info HTTP sidecar (direct engine) Engine-level info, not router-level
/metrics HTTP sidecar (Prometheus scraping) Standard pattern

Test plan

  • Rust builds clean (cargo check for smg-grpc-client and smg)
  • E2E: Single engine — FlushCache gRPC direct, sidecar endpoints all work
  • E2E: Engine + Router — generation through router 200, /flush_cache through router 200 (confirmed gRPC path: total_http_workers: 0, workers_flushed: 1)
  • CI tests

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features
    • Added a new cache flush operation accessible via the gRPC interface
    • Extended cache flush functionality to support both HTTP and gRPC backend workers for comprehensive cache management across different connection types

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 10, 2026

📝 Walkthrough

Walkthrough

The changes implement a new FlushCache scheduler management RPC across the full gRPC stack, from protobuf definition through client wrappers, request management routing, servicer handlers, and worker manager integration to support cache flushing on both HTTP and gRPC-based workers.

Changes

Cohort / File(s) Summary
Protocol Definition
crates/grpc_client/proto/sglang_scheduler.proto
Added new FlushCache RPC with FlushCacheRequest (timeout_s field) and FlushCacheResponse (success, message fields) message types to the SglangScheduler service.
gRPC Client Wrappers
crates/grpc_client/src/sglang_scheduler.rs, model_gateway/src/routers/grpc/client.rs
Added flush_cache(timeout_s) async methods to forward cache flush requests; the gateway client maps responses to (success, message) tuples and returns unimplemented for non-Sglang backends.
Request Management & Routing
grpc_servicer/smg_grpc_servicer/sglang/request_manager.py
Introduced a second ZMQ socket (recv_from_tokenizer) and new dispatch loop (_handle_tokenizer_loop) to route FlushCacheReqOutput and other scheduler responses to dedicated _GrpcCommunicator instances; added public send_communicator_req() helper for generic request/response round-trips.
Server Initialization Hook
grpc_servicer/smg_grpc_servicer/sglang/server.py
Extended serve_grpc() with optional on_request_manager_ready callback invoked after request manager creation but before server startup.
RPC Handler
grpc_servicer/smg_grpc_servicer/sglang/servicer.py
Added FlushCache() RPC handler that forwards requests via request_manager.send_communicator_req(), implements timeout and exception handling (converting to gRPC status codes), and aggregates per-target success/failure responses.
Worker Flush Integration
model_gateway/src/worker/manager.rs
Extended flush_cache_all() to handle both HTTP and gRPC workers by splitting registry workers by connection mode and performing parallel async fan-out to gRPC workers alongside existing HTTP fan-out.

Sequence Diagram

sequenceDiagram
    participant Client as External Client
    participant GWGrpc as Gateway gRPC Server
    participant WMgr as WorkerManager
    participant HttpWorker as HTTP Worker
    participant GrpcClient as gRPC Client
    participant SGrpc as Sglang gRPC Server
    participant ReqMgr as RequestManager
    participant Scheduler as Scheduler Process

    Client->>GWGrpc: FlushCache(timeout_s)
    activate GWGrpc
    
    GWGrpc->>WMgr: flush_cache_all()
    activate WMgr
    
    par HTTP Fan-out
        WMgr->>HttpWorker: POST /flush_cache
        activate HttpWorker
        HttpWorker-->>WMgr: success/failure
        deactivate HttpWorker
    and gRPC Fan-out
        WMgr->>GrpcClient: flush_cache(timeout_s)
        activate GrpcClient
        GrpcClient->>SGrpc: FlushCache RPC
        activate SGrpc
        SGrpc->>ReqMgr: send_communicator_req(FlushCacheReqInput)
        activate ReqMgr
        ReqMgr->>Scheduler: send via ZMQ
        activate Scheduler
        Scheduler-->>ReqMgr: FlushCacheReqOutput
        deactivate Scheduler
        ReqMgr-->>SGrpc: FlushCacheResponse(success, message)
        deactivate ReqMgr
        SGrpc-->>GrpcClient: FlushCacheResponse
        deactivate SGrpc
        GrpcClient-->>WMgr: (success, message)
        deactivate GrpcClient
    end
    
    WMgr-->>GWGrpc: FlushCacheResult (aggregated)
    deactivate WMgr
    GWGrpc-->>Client: FlushCacheResult
    deactivate GWGrpc
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 A cache flush hops through the gRPC stacks,
With sockets and communicators on its tracks.
From proto to manager, from handler to field,
Each worker shall bow where the flush is revealed.
HTTP and gRPC dance side by side,
In timeout-bound harmony, they flush with pride! ✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the primary change: adding a FlushCache RPC for gRPC mode with supporting infrastructure.
Docstring Coverage ✅ Passed Docstring coverage is 83.33% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions bot added the grpc gRPC client and router changes label Apr 10, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces profiling capabilities to the GrpcRequestManager and adds a new ZMQ socket (recv_from_tokenizer) to handle communicator responses directly from the scheduler. It also implements start_profile and stop_profile methods, along with a callback mechanism in serve_grpc to allow external components to interact with the GrpcRequestManager upon initialization. Feedback includes addressing potential compatibility issues with TimeoutError, improving ZMQ error handling consistency, and using UUIDs for profile identifiers to prevent collisions.

async def _execute_profile(self, req: ProfileReq):
try:
results = await self.profile_communicator(req, timeout=600.0)
except TimeoutError:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In Python versions prior to 3.11, TimeoutError is not a built-in exception, and asyncio.wait_for raises asyncio.TimeoutError. Using the bare TimeoutError here will cause a NameError on those Python versions when a timeout occurs. It is safer to use asyncio.TimeoutError for compatibility.

Suggested change
except TimeoutError:
except asyncio.TimeoutError:

Comment on lines +560 to +564
except zmq.error.ZMQError as e:
if self.gracefully_exit:
break
logger.error(f"ZMQ error in tokenizer loop: {e}")
break
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To maintain consistency with handle_loop and avoid logic duplication, consider refactoring the ZMQ error handling into a shared helper function. The tokenizer loop should handle zmq.error.Again and include a traceback for ZMQError to ensure transient timeouts don't unexpectedly terminate the task and to provide better debugging information.

Suggested change
except zmq.error.ZMQError as e:
if self.gracefully_exit:
break
logger.error(f"ZMQ error in tokenizer loop: {e}")
break
except zmq.error.Again:
if self.gracefully_exit:
break
continue
except zmq.error.ZMQError as e:
if self.gracefully_exit:
logger.debug(f"ZMQ recv interrupted during shutdown: {e}")
break
logger.error(f"ZMQ error in tokenizer loop: {e}\n{get_exception_traceback()}")
break
References
  1. If a code block is identified as duplicated across multiple functions or modules, consider refactoring to unify the logic into a shared helper function or class. This improves maintainability and reduces the chance of inconsistencies when changes are needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — added zmq.error.Again handler and logger.debug on graceful shutdown to match handle_loop's pattern exactly.

with_stack=with_stack,
record_shapes=record_shapes,
profile_by_stage=profile_by_stage,
profile_id=str(time.time()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using time.time() for a profile ID can lead to collisions if multiple profiles are started in rapid succession and results in a string representation that may be less ideal for filenames (containing dots). Using a UUID is more robust and consistent with how request_id is generated elsewhere in this class.

Suggested change
profile_id=str(time.time()),
profile_id=f"profile-{uuid.uuid4().hex}",

@Kangyan-Zhou Kangyan-Zhou force-pushed the profile_endpoint branch 2 times, most recently from 9738eca to b4034da Compare April 10, 2026 19:37
@github-actions github-actions bot added the model-gateway Model gateway crate changes label Apr 10, 2026
@Kangyan-Zhou Kangyan-Zhou changed the title feat(grpc-servicer): add profiling support to GrpcRequestManager feat(grpc): add FlushCache RPC and profiling support for gRPC mode Apr 10, 2026
@Kangyan-Zhou Kangyan-Zhou force-pushed the profile_endpoint branch 2 times, most recently from ad792bb to 4754d5a Compare April 10, 2026 19:52
Hybrid approach for admin/observability in gRPC mode:

gRPC native (FlushCache RPC):
- Proto: FlushCacheRequest/FlushCacheResponse messages
- Servicer: FlushCache handler with timeout and multi-DP error handling
- Rust client: flush_cache method returning (success, message)
- Router: flush_cache_all updated to fan out to gRPC workers natively

Transport layer (GrpcRequestManager):
- recv_from_tokenizer socket + _handle_tokenizer_loop for communicator
  responses sent via scheduler's send_to_tokenizer path
- send_communicator_req: generic thin transport for sglang-side logic
- Communicators for profile, flush_cache, get_internal_state
- on_request_manager_ready async callback to serve_grpc()

Signed-off-by: Kangyan Zhou <zky314343421@gmail.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Kangyan-Zhou Kangyan-Zhou marked this pull request as ready for review April 15, 2026 03:50
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 18d209c868

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

"flush_cache_communicator",
timeout=comm_timeout,
)
except TimeoutError:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Catch asyncio timeout in FlushCache handler

send_communicator_req() is backed by asyncio.wait_for, which raises asyncio.TimeoutError on Python 3.10, but this handler catches built-in TimeoutError, so timeout cases bypass this branch and get reported by the generic Exception path as INTERNAL instead of DEADLINE_EXCEEDED. This misclassifies real scheduler timeouts and breaks client retry/diagnostic behavior for FlushCache requests under load or partial scheduler stalls.

Useful? React with 👍 / 👎.

async move {
let url = worker.url().to_string();
match worker.get_grpc_client().await {
Ok(Some(grpc_client)) => match grpc_client.flush_cache(0.0).await {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Bound gRPC flush calls with per-worker timeout

The gRPC fan-out path awaits grpc_client.flush_cache(0.0) without any timeout wrapper, unlike the HTTP branch which enforces REQUEST_TIMEOUT; as a result, one slow or unreachable gRPC worker can stall join_all and keep /flush_cache hanging for an unbounded/very long time. This turns an admin endpoint into a potential blocker during partial outages, so each gRPC flush should be wrapped with a per-worker timeout and treated as a failed worker on expiry.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@grpc_servicer/smg_grpc_servicer/sglang/server.py`:
- Around line 120-121: The on_request_manager_ready callback is awaited after
creating request_manager and scheduler_procs but before the main shutdown guard,
so any exception leaks child processes/sockets; wrap the await
on_request_manager_ready(request_manager, server_args, scheduler_info) in a
try/except (or move the await inside the existing startup/teardown guard) and on
exception perform clean shutdown steps: call request_manager.shutdown() (or its
stop/close method), stop/bootstrap_server.shutdown() (or the server close
routine), and terminate/join each process in scheduler_procs before re-raising
the exception; ensure you reference the same symbols (on_request_manager_ready,
request_manager, bootstrap_server, scheduler_procs) to locate and run their
existing cleanup paths.

In `@model_gateway/src/routers/grpc/client.rs`:
- Around line 237-246: The match in GrpcClient::flush_cache omits the Self::Mlx
variant causing a non-exhaustive match; add an explicit branch for Self::Mlx in
the flush_cache method (alongside Self::Sglang, Self::Vllm, Self::Trtllm). If
the Mlx client exposes a flush_cache method, call it the same way as in
Self::Sglang (await, unpack resp.success and resp.message, return Ok); if it
does not support flush_cache, return Err(tonic::Status::unimplemented(...)) for
Self::Mlx so the match is exhaustive.

In `@model_gateway/src/worker/manager.rs`:
- Around line 734-753: gRPC fan-out currently awaits
grpc_client.flush_cache(0.0) with join_all, causing unbounded concurrency and
infinite waits; modify the async closure built from grpc_workers (the one that
clones Arc and calls get_grpc_client()) to wrap the flush_cache(...) call with
tokio::time::timeout(REQUEST_TIMEOUT, ...), handle Timeout errors as an Err
result string, and then replace the future::join_all(grpc_futures).await
aggregation with
stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect().await so
concurrency is bounded and each flush respects REQUEST_TIMEOUT; keep handling of
Ok(Some(grpc_client)), Ok(None), and Err from get_grpc_client unchanged but
adapt error strings for timeouts and cancelled tasks.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 3fa01a34-e8cc-4cf9-9d81-fdf0801ed51f

📥 Commits

Reviewing files that changed from the base of the PR and between f9cffd7 and 18d209c.

📒 Files selected for processing (7)
  • crates/grpc_client/proto/sglang_scheduler.proto
  • crates/grpc_client/src/sglang_scheduler.rs
  • grpc_servicer/smg_grpc_servicer/sglang/request_manager.py
  • grpc_servicer/smg_grpc_servicer/sglang/server.py
  • grpc_servicer/smg_grpc_servicer/sglang/servicer.py
  • model_gateway/src/routers/grpc/client.rs
  • model_gateway/src/worker/manager.rs

Comment on lines +120 to +121
if on_request_manager_ready is not None:
await on_request_manager_ready(request_manager, server_args, scheduler_info)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Clean up launched resources if the startup hook fails.

Line 120 awaits the hook after the scheduler processes and GrpcRequestManager already exist, but before serve_grpc() reaches its shutdown path. If that callback raises, startup aborts with child processes and ZMQ sockets still alive.

Move this hook under the same startup/teardown guard as the rest of the server bootstrap, or explicitly shut down request_manager, bootstrap_server, and scheduler_procs before re-raising.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@grpc_servicer/smg_grpc_servicer/sglang/server.py` around lines 120 - 121, The
on_request_manager_ready callback is awaited after creating request_manager and
scheduler_procs but before the main shutdown guard, so any exception leaks child
processes/sockets; wrap the await on_request_manager_ready(request_manager,
server_args, scheduler_info) in a try/except (or move the await inside the
existing startup/teardown guard) and on exception perform clean shutdown steps:
call request_manager.shutdown() (or its stop/close method),
stop/bootstrap_server.shutdown() (or the server close routine), and
terminate/join each process in scheduler_procs before re-raising the exception;
ensure you reference the same symbols (on_request_manager_ready,
request_manager, bootstrap_server, scheduler_procs) to locate and run their
existing cleanup paths.

Comment on lines +237 to +246
pub async fn flush_cache(&self, timeout_s: f32) -> Result<(bool, String), tonic::Status> {
match self {
Self::Sglang(client) => {
let resp = client.flush_cache(timeout_s).await?;
Ok((resp.success, resp.message))
}
Self::Vllm(_) | Self::Trtllm(_) => Err(tonic::Status::unimplemented(
"flush_cache not supported for this backend",
)),
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail
python - <<'PY'
from pathlib import Path

path = Path("model_gateway/src/routers/grpc/client.rs")
text = path.read_text()

enum_block = text.split("pub enum GrpcClient", 1)[1].split("impl GrpcClient", 1)[0]
method_block = text.split("pub async fn flush_cache", 1)[1].split("/// Fetch tokenizer bundle", 1)[0]

print("enum_has_mlx =", "Mlx(" in enum_block)
print("flush_cache_handles_mlx =", "Self::Mlx" in method_block)
PY

Repository: lightseekorg/smg

Length of output: 111


Handle the GrpcClient::Mlx variant in this match.

The match self on line 238 is non-exhaustive. The GrpcClient enum includes the Mlx variant, but this method does not handle it, preventing compilation.

Suggested fix
     pub async fn flush_cache(&self, timeout_s: f32) -> Result<(bool, String), tonic::Status> {
         match self {
             Self::Sglang(client) => {
                 let resp = client.flush_cache(timeout_s).await?;
                 Ok((resp.success, resp.message))
             }
-            Self::Vllm(_) | Self::Trtllm(_) => Err(tonic::Status::unimplemented(
+            Self::Vllm(_) | Self::Trtllm(_) | Self::Mlx(_) => Err(tonic::Status::unimplemented(
                 "flush_cache not supported for this backend",
             )),
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/grpc/client.rs` around lines 237 - 246, The match
in GrpcClient::flush_cache omits the Self::Mlx variant causing a non-exhaustive
match; add an explicit branch for Self::Mlx in the flush_cache method (alongside
Self::Sglang, Self::Vllm, Self::Trtllm). If the Mlx client exposes a flush_cache
method, call it the same way as in Self::Sglang (await, unpack resp.success and
resp.message, return Ok); if it does not support flush_cache, return
Err(tonic::Status::unimplemented(...)) for Self::Mlx so the match is exhaustive.

Comment on lines +734 to +753
let grpc_futures: Vec<_> = grpc_workers
.iter()
.map(|w| {
let worker = Arc::clone(w);
async move {
let url = worker.url().to_string();
match worker.get_grpc_client().await {
Ok(Some(grpc_client)) => match grpc_client.flush_cache(0.0).await {
Ok((true, _)) => (url, Ok(())),
Ok((false, msg)) => (url, Err(format!("flush failed: {msg}"))),
Err(e) => (url, Err(format!("gRPC error: {e}"))),
},
Ok(None) => (url, Err("no gRPC client available".to_string())),
Err(e) => (url, Err(format!("failed to get gRPC client: {e}"))),
}
}
})
.collect();

let grpc_results: Vec<_> = future::join_all(grpc_futures).await;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail
echo "HTTP fan-out path:"
sed -n '52,84p' model_gateway/src/worker/manager.rs
echo
echo "gRPC flush path:"
sed -n '733,759p' model_gateway/src/worker/manager.rs

Repository: lightseekorg/smg

Length of output: 2254


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get full flush_cache_all function
echo "=== Full flush_cache_all function ==="
sed -n '691,780p' model_gateway/src/worker/manager.rs

echo
echo "=== REQUEST_TIMEOUT and MAX_CONCURRENT definitions ==="
rg 'REQUEST_TIMEOUT|MAX_CONCURRENT' model_gateway/src/worker/manager.rs -B 2 -A 2

Repository: lightseekorg/smg

Length of output: 4039


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check if flush_cache_all has a caller-level timeout wrapper
echo "=== Calls to flush_cache_all ==="
rg 'flush_cache_all' model_gateway/src --type rust -B 3 -A 3

echo
echo "=== gRPC client builder for timeouts ==="
fd -t f 'grpc_client' crates/grpc_client/src | head -5 | xargs -I {} sh -c 'echo "=== {} ==="; head -100 {}'

Repository: lightseekorg/smg

Length of output: 1580


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Searching for flush_cache method definition in gRPC clients ==="
fd -t f '\.rs$' crates/grpc_client/src | xargs grep -l 'flush_cache' | head -3

echo
echo "=== flush_cache method signatures ==="
rg 'fn flush_cache' crates/grpc_client/src -A 5 | head -40

echo
echo "=== gRPC timeout patterns in model_gateway ==="
rg 'timeout|Timeout|Duration' model_gateway/src/routers/grpc --type rust -c | sort | uniq -c | head -10

Repository: lightseekorg/smg

Length of output: 892


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Check for gRPC request timeout patterns in model_gateway ==="
rg 'tokio::time::timeout|timeout\(' model_gateway/src/routers/grpc --type rust -B 2 -A 2 | head -60

Repository: lightseekorg/smg

Length of output: 568


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== All flush_cache calls in the codebase ==="
rg 'flush_cache\(' model_gateway/src --type rust -B 2 -A 2

echo
echo "=== Check flush_cache implementation for timeout_s usage ==="
sed -n '/pub async fn flush_cache/,/^    }/p' crates/grpc_client/src/sglang_scheduler.rs | head -40

Repository: lightseekorg/smg

Length of output: 2129


🏁 Script executed:

#!/bin/bash
set -euo pipeftp
echo "=== Check if timeout behavior is documented or if 0.0 is intentional ==="
git log --all --oneline --grep="flush_cache" -- model_gateway/src/worker/manager.rs | head -5
git log --all -p -- crates/grpc_client/src/sglang_scheduler.rs | grep -A 5 -B 5 'flush_cache' | head -40

Repository: lightseekorg/smg

Length of output: 112


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Check if there's any wrapper timeout around flush_cache_all calls ==="
sed -n '1,/^async fn flush_cache/p' model_gateway/src/server.rs | tail -20

echo
echo "=== Check other RPC calls in grpc/client.rs for timeout patterns ==="
rg 'tokio::time::timeout' model_gateway/src/routers/grpc/client.rs -A 3 -B 1 | head -40

Repository: lightseekorg/smg

Length of output: 811


Apply REQUEST_TIMEOUT and concurrency bounding to the gRPC fan-out, matching the HTTP path.

The HTTP fan-out path enforces .timeout(REQUEST_TIMEOUT) (5 seconds) and .buffer_unordered(MAX_CONCURRENT) (32 workers), but the gRPC path calls flush_cache(0.0) (infinite timeout) and uses join_all() (unbounded concurrency). A single stalled gRPC worker can hang the entire cache-flush operation indefinitely; firing all gRPC flushes at once risks overwhelming the fleet on deployments with many workers.

Code comparison

HTTP path (lines 69, 81):

.timeout(REQUEST_TIMEOUT)
.buffer_unordered(MAX_CONCURRENT)

gRPC path (lines 740, 753):

grpc_client.flush_cache(0.0).await
future::join_all(grpc_futures).await

Wrap each flush_cache() call with tokio::time::timeout(REQUEST_TIMEOUT, ...) and replace join_all() with stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect() to match the HTTP implementation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/worker/manager.rs` around lines 734 - 753, gRPC fan-out
currently awaits grpc_client.flush_cache(0.0) with join_all, causing unbounded
concurrency and infinite waits; modify the async closure built from grpc_workers
(the one that clones Arc and calls get_grpc_client()) to wrap the
flush_cache(...) call with tokio::time::timeout(REQUEST_TIMEOUT, ...), handle
Timeout errors as an Err result string, and then replace the
future::join_all(grpc_futures).await aggregation with
stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect().await so
concurrency is bounded and each flush respects REQUEST_TIMEOUT; keep handling of
Ok(Some(grpc_client)), Ok(None), and Err from get_grpc_client unchanged but
adapt error strings for timeouts and cancelled tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

grpc gRPC client and router changes model-gateway Model gateway crate changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant