Conversation
Revert "Check" This reverts commit dddaf1b. Check graph pool TODO: -> this requires additional patch in MP to reset initialized_ flag in CalculatorGraph and verify if that works. Previous MP tests with reruns worked due to using AddVectorSink which changes the underlying graph and does not use OutputStreamPollers. Need to verify if change in MP will enable graph pool or we need to go back to thread pool. Rebase POC MP FW test POC part 2 WIP to stash
a4a8f32 to
bb24c5c
Compare
525c8d8 to
79f91f3
Compare
79f91f3 to
002dc9b
Compare
0c50e49 to
f895c20
Compare
…downgrade SPDLOG_ERROR to SPDLOG_DEBUG
… queue-size-0 rejection, warn+clamp for exceeding hw threads, log cleanup
There was a problem hiding this comment.
Pull request overview
This PR introduces a MediaPipe graph pooling mechanism (“GraphQueue”) to reuse pre-initialized CalculatorGraph instances across requests, adds a # OVMS_GRAPH_QUEUE_SIZE: pbtxt directive (also emitted by graph exporters), and updates executors/tests/docs to exercise the new queue path (including LLM/OpenAI flows).
Changes:
- Add
GraphQueue+ observer-swapping mechanism to reuse MediaPipe graphs (unary + streaming) and track per-graph timestamps. - Parse
# OVMS_GRAPH_QUEUE_SIZE:directive during graph validation and initialize the pool accordingly; update graph export to emit the directive. - Expand and adjust unit/stress tests, configs, and docs to cover queue behavior and directive parsing.
Reviewed changes
Copilot reviewed 47 out of 48 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| src/test/test_utils.hpp | Adjust test graph definition utilities for sidePacketMaps now being a pointer/shared object. |
| src/test/stress_test_utils.hpp | Add queue-enabled configs and timing instrumentation for stress tests. |
| src/test/streaming_test.cpp | Add StreamingQueueTest cases and update executor construction signature. |
| src/test/pythonnode_test.cpp | Update mocked executor construction to new side-packets + guard API. |
| src/test/pull_hf_model_test.cpp | Update graph header stripping to account for optional queue directive line. |
| src/test/mediapipeflow_test.cpp | Minor robustness/logging tweaks; add directive parsing tests. |
| src/test/mediapipe_framework_test.cpp | Add POC/perf-style tests around output stream observers and graph creation approaches. |
| src/test/mediapipe/graphdummyadapterfull_dummyinputnames_newpath.pbtxt | Add a new test graph file used for reload path testing. |
| src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames_newpath.pbtxt | Add queue-enabled test graph with directive header for reload path testing. |
| src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames.pbtxt | Add queue-enabled test graph with directive header. |
| src/test/mediapipe/graph_gpt_with_queue.pbtxt | Add queue-enabled OpenAI mock graph with directive header. |
| src/test/mediapipe/config_mediapipe_openai_chat_completions_mock_with_queue.json | Add config pointing to queue-enabled OpenAI mock graph. |
| src/test/mediapipe/config_mediapipe_openai_chat_completions_mock.json | Formatting-only change. |
| src/test/llm/lm_cb_regular_queue.pbtxt | Add queue-enabled LLM test graph with directive header. |
| src/test/llm/llmnode_test.cpp | Add LLM HTTP tests using queue-enabled graph/config and reuse behavior. |
| src/test/llm/config_queue.json | Add LLM queue test config. |
| src/test/http_openai_handler_test.cpp | Add OpenAI handler tests for queue-enabled config and clarify non-queue behavior. |
| src/test/graph_export_test.cpp | Update graph export tests to validate queue header and strip generated headers robustly. |
| src/test/ensemble_config_change_stress.cpp | Add stress tests for queue-enabled graph add/remove/reload scenarios. |
| src/systeminfo.hpp | Minor formatting change. |
| src/systeminfo.cpp | Make getCoreCount() robust when hardware_concurrency() returns 0. |
| src/python/BUILD | Widen visibility of pythonnoderesources target (now public). |
| src/mediapipe_internal/outputstreamobserver.hpp | Introduce output stream observer interface, null observer, and holder. |
| src/mediapipe_internal/mediapipegraphexecutor.hpp | Add queue-aware execution paths, observer swapping, and LLM execution context handling. |
| src/mediapipe_internal/mediapipegraphexecutor.cpp | Implement queue-aware constructor and LLM execution context lifecycle helpers. |
| src/mediapipe_internal/mediapipegraphdefinition.hpp | Change side packets storage to shared ptr; add queue member and queue init hooks. |
| src/mediapipe_internal/mediapipegraphdefinition.cpp | Implement directive parsing and queue initialization; integrate into validation/reload/retire. |
| src/mediapipe_internal/mediapipegraphconfig.hpp | Add GraphQueueSizeValue representation and queue size resolution helpers. |
| src/mediapipe_internal/graphqueue.hpp | Add GraphQueue, GraphHelper, and GraphIdGuard API. |
| src/mediapipe_internal/graphqueue.cpp | Implement graph pool initialization, observer installation, and cleanup. |
| src/mediapipe_internal/graph_side_packets.hpp | Split side packet maps into a dedicated header including LLM execution context holders. |
| src/mediapipe_internal/graph_executor_constants.hpp | Centralize side packet tags and timestamp constants. |
| src/logging.cpp | Adjust default log pattern to use %f fractional seconds. |
| src/llm/http_llm_calculator.cc | Add optional shared execution-context side packet support and mutex-protected access. |
| src/llm/BUILD | Add dependency on new graph_side_packets library. |
| src/kfs_frontend/kfs_graph_executor_impl.hpp | Declare requestHasInputSidePackets helper for queue-stream rejection. |
| src/kfs_frontend/kfs_graph_executor_impl.cpp | Implement requestHasInputSidePackets and adjust timestamp/side-packet handling. |
| src/http_frontend/http_graph_executor_impl.hpp | Declare requestHasInputSidePackets for HTTP payload. |
| src/http_frontend/http_graph_executor_impl.cpp | Stub requestHasInputSidePackets for HTTP payload. |
| src/graph_export/graph_export.cpp | Emit queue directive line in generated graph headers. |
| src/graph_export/BUILD | Add systeminfo dependency. |
| src/cli_parser.cpp | Propagate restWorkers into export settings. |
| src/capi_frontend/server_settings.hpp | Add optional restWorkers to export settings. |
| src/BUILD | Add Bazel targets for new mediapipe_internal headers/libs and wire them into main build/test deps. |
| docs/mediapipe.md | Document graph pool feature and directive semantics. |
| demos/common/export_models/export_model.py | Prepend queue directive to exported graphs (but file currently contains an accidental build log). |
| demos/benchmark/v3/benchmark.py | Print latency percentiles in benchmark output. |
| common_settings.bzl | Remove -Wno-deprecated-declarations from static test copts. |
| |:------|:---------| | ||
| | `AUTO` | Pool size is set to the number of hardware threads (`std::thread::hardware_concurrency()`), or 16 if detection fails | | ||
| | Positive integer (e.g. `4`) | Pool size set to the given value (must not exceed hardware thread count) | | ||
| | `0` or `-1` | Graph pool disabled — falls back to per-request graph creation | |
There was a problem hiding this comment.
The documentation says # OVMS_GRAPH_QUEUE_SIZE: 0 disables the pool, but the current parser rejects 0 as invalid. Update the docs to match the implementation, or accept 0 as a valid 'disabled' value (consistent with the doc and PR description).
| | `0` or `-1` | Graph pool disabled — falls back to per-request graph creation | | |
| | `-1` | Graph pool disabled — falls back to per-request graph creation | |
| float expVal = 13.5; | ||
| std::vector<float> data{expVal - 1, 1, 2, 3, 4, 5, 6, 7, 8, 9}; | ||
| ovms::Timer<3> timer; | ||
| const std::string outputName{"output"}; | ||
| int N = 1000; | ||
|
|
||
| absl::Status absStatus; | ||
| // here starts new case of ovms | ||
| { // new case of ovms | ||
| ::mediapipe::CalculatorGraph graph; | ||
| EXPECT_EQ(graph.Initialize(graphConfig).code(), absl::StatusCode::kOk); | ||
| auto inputTensor = std::make_unique<ov::Tensor>(datatype, shape, data.data()); | ||
| // Install NullObserver | ||
| // its not per graph but per output | ||
| std::shared_ptr<ovms::OutputStreamObserverI> perGraphObserverFunctor = std::make_shared<NullOutputStreamObserver>(); | ||
| MP_ERROR_STOP(graph.ObserveOutputStream(outputStreamName, [&perGraphObserverFunctor](const ::mediapipe::Packet& packet) -> absl::Status { return perGraphObserverFunctor->handlePacket(packet); })); | ||
| // Here ends model management | ||
| // Here starts mp graph executor | ||
| // ovms::GraphIdGuard graphIdGuard(queue); // TODO timeout? | ||
| // get graphIdGuard from queue | ||
| // create FrontendAppropriateObserver | ||
| struct MyFunctor : public OutputStreamObserverI { | ||
| float expVal; | ||
| MyFunctor(float expVal) : | ||
| expVal(expVal) { | ||
| } | ||
| absl::Status handlePacket(const ::mediapipe::Packet& packet) override { | ||
| const ov::Tensor& outputTensor = | ||
| packet.Get<ov::Tensor>(); | ||
| auto datatype = ov::element::Type_t::f32; | ||
| EXPECT_EQ(datatype, outputTensor.get_element_type()); | ||
| EXPECT_THAT(outputTensor.get_shape(), testing::ElementsAre(1, 10)); | ||
| const void* outputData = outputTensor.data(); | ||
| EXPECT_EQ(*((float*)outputData), expVal); | ||
| return absl::OkStatus(); | ||
| } | ||
| }; | ||
| absStatus = graph.StartRun({}); | ||
| { | ||
| perGraphObserverFunctor = std::make_shared<MyFunctor>(expVal); | ||
| auto copyOfMyFunctor = perGraphObserverFunctor; | ||
| auto inputTensor = std::make_unique<ov::Tensor>(datatype, shape, data.data()); | ||
| MP_ERROR_STOP(graph.AddPacketToInputStream( | ||
| inputStreamName, Adopt(inputTensor.release()).At(Timestamp(timestamp++)))); | ||
| } | ||
| std::this_thread::sleep_for(std::chrono::seconds(1)); | ||
| timer.start(0); | ||
| for (auto i = 0; i < N; ++i) { // iter begin | ||
| perGraphObserverFunctor = std::make_shared<MyFunctor>(expVal); | ||
| auto copyOfMyFunctor = perGraphObserverFunctor; | ||
| auto inputTensor = std::make_unique<ov::Tensor>(datatype, shape, data.data()); | ||
| MP_ERROR_STOP(graph.AddPacketToInputStream( | ||
| inputStreamName, Adopt(inputTensor.release()).At(Timestamp(timestamp++)))); | ||
| MP_ERROR_STOP(graph.WaitUntilIdle()); | ||
| } // iter end | ||
| timer.stop(0); | ||
| } // end of new case ovms | ||
| { // current ovms case |
There was a problem hiding this comment.
This test includes a hardcoded sleep_for(1s) and runs 1000 iterations of graph execution (plus a thread-pool comparison). That will significantly slow down the unit test suite and can be flaky across CI environments. Consider moving this into a benchmark/perf test, gating it behind a flag, or reducing it to a small functional assertion without timing loops/sleeps.
| "utils", | ||
| ], | ||
| visibility = ["//visibility:private"], | ||
| visibility = ["//visibility:public"], # TODO FIXME? |
There was a problem hiding this comment.
This target’s visibility was widened to //visibility:public with a TODO. Exposing internal Python-side resources broadly can make dependency hygiene and layering worse over time. Prefer keeping it private and granting visibility only to the specific Bazel targets that need it (e.g., //src:mediapipe_internal_graphqueue), or split out a small public interface library if needed.
| visibility = ["//visibility:public"], # TODO FIXME? | |
| visibility = ["//visibility:private"], |
| destination_path = os.path.join(model_repository_path, model_name)ERROR: /ovms/src/llm/BUILD:196:16: Compiling src/llm/language_model/continuous_batching/servable.cpp failed: (Exit 1): gcc failed: error executing command (from target //src/llm:genai_servables) /usr/bin/gcc -U_FORTIFY_SOURCE -fstack-protector -Wall -Wunused-but-set-parameter -Wno-free-nonheap-object -fno-omit-frame-pointer -g0 -O2 '-D_FORTIFY_SOURCE=1' -DNDEBUG -ffunction-sections ... (remaining 156 arguments skipped) | ||
| In file included from src/llm/language_model/continuous_batching/../../../logging.hpp:24, | ||
| from src/llm/language_model/continuous_batching/servable.cpp:22: | ||
| src/llm/language_model/continuous_batching/llm_executor.hpp: In member function 'void ovms::LLMExecutor::printMetrics()': | ||
| src/llm/language_model/continuous_batching/llm_executor.hpp:105:104: error: 'struct ov::genai::PipelineMetrics' has no member named 'kv_cache_size_in_bytes' | ||
| 105 | metrics.requests, metrics.scheduled_requests, formatCacheInfo(metrics.cache_usage, metrics.kv_cache_size_in_bytes, this->isDynamicKVCache)); | ||
| | ^~~~~~~~~~~~~~~~~~~~~~ | ||
| Target //src:ovms failed to build | ||
| Use --verbose_failures to see the command lines of failed build steps. | ||
| INFO: Elapsed time: 9.590s, Critical Path: 8.22s | ||
| INFO: 64 processes: 64 internal. | ||
| FAILED: Build did NOT complete successfully | ||
| root@b6674760ad87:/ovms# bazel build --config mp_on_py_off //src:ovms |
There was a problem hiding this comment.
The line assigning destination_path appears to have an entire Bazel build log appended to it, which will make this script syntactically invalid and break model export. Remove the pasted build output and keep the assignment as a valid Python statement (the build log should not be committed into this file).
| destination_path = os.path.join(model_repository_path, model_name)ERROR: /ovms/src/llm/BUILD:196:16: Compiling src/llm/language_model/continuous_batching/servable.cpp failed: (Exit 1): gcc failed: error executing command (from target //src/llm:genai_servables) /usr/bin/gcc -U_FORTIFY_SOURCE -fstack-protector -Wall -Wunused-but-set-parameter -Wno-free-nonheap-object -fno-omit-frame-pointer -g0 -O2 '-D_FORTIFY_SOURCE=1' -DNDEBUG -ffunction-sections ... (remaining 156 arguments skipped) | |
| In file included from src/llm/language_model/continuous_batching/../../../logging.hpp:24, | |
| from src/llm/language_model/continuous_batching/servable.cpp:22: | |
| src/llm/language_model/continuous_batching/llm_executor.hpp: In member function 'void ovms::LLMExecutor::printMetrics()': | |
| src/llm/language_model/continuous_batching/llm_executor.hpp:105:104: error: 'struct ov::genai::PipelineMetrics' has no member named 'kv_cache_size_in_bytes' | |
| 105 | metrics.requests, metrics.scheduled_requests, formatCacheInfo(metrics.cache_usage, metrics.kv_cache_size_in_bytes, this->isDynamicKVCache)); | |
| | ^~~~~~~~~~~~~~~~~~~~~~ | |
| Target //src:ovms failed to build | |
| Use --verbose_failures to see the command lines of failed build steps. | |
| INFO: Elapsed time: 9.590s, Critical Path: 8.22s | |
| INFO: 64 processes: 64 internal. | |
| FAILED: Build did NOT complete successfully | |
| root@b6674760ad87:/ovms# bazel build --config mp_on_py_off //src:ovms | |
| destination_path = os.path.join(model_repository_path, model_name) |
| #pragma warning(pop) | ||
| #include "mediapipe_utils.hpp" | ||
| #include "packettypes.hpp" | ||
| #include "graphqueue.hpp" |
There was a problem hiding this comment.
outputstreamobserver.hpp includes graphqueue.hpp, but this header doesn’t appear to use any GraphQueue/GraphIdGuard types. Keeping this include creates a circular dependency (graphqueue.hpp also includes outputstreamobserver.hpp) and increases build times. Consider removing the #include "graphqueue.hpp" here and forward-declaring only what’s needed.
| #include "graphqueue.hpp" |
| GraphHelper(GraphHelper&& gh) : | ||
| graph(std::move(gh.graph)), | ||
| outStreamObservers(std::move(const_cast<std::unordered_map<std::string, std::shared_ptr<ObserverHolder>>&>(gh.outStreamObservers))), | ||
| genAiExecutionContextMap(std::move(gh.genAiExecutionContextMap)), | ||
| currentTimestamp(gh.currentTimestamp) {} |
There was a problem hiding this comment.
GraphHelper's move constructor casts away constness and moves from a const member (outStreamObservers). Modifying an object declared const is undefined behavior, even in a moved-from state. Consider removing the custom move ctor and keeping outStreamObservers non-const, or store the observer map behind a std::shared_ptr<const Map> so the keys are logically immutable without const_cast.
| NullOutputStreamObserver() = default; | ||
| absl::Status handlePacket(const ::mediapipe::Packet& packet) override { | ||
| SPDLOG_ERROR("NullOutputStreamObserver::handlePacket called - graph observer was not replaced before execution"); | ||
| throw std::runtime_error("NullOutputStreamObserver should have been replaced before graph execution"); |
There was a problem hiding this comment.
NullOutputStreamObserver::handlePacket throws an exception from inside the MediaPipe ObserveOutputStream callback. Exceptions crossing MediaPipe/absl boundaries can lead to std::terminate and bring down the server. Prefer returning a non-OK absl::Status (and avoid throwing) so the graph transitions to an error state safely.
| throw std::runtime_error("NullOutputStreamObserver should have been replaced before graph execution"); | |
| return absl::Status(absl::StatusCode::kInternal, | |
| "NullOutputStreamObserver should have been replaced before graph execution"); |
| Status inferWithQueue(const RequestType* request, ResponseType* response, ExecutionContext executionContext, MetricCounterGuard& failedRequestsGuard) { | ||
| ::mediapipe::CalculatorGraph& graph = this->guard->graph; | ||
| auto llmContextStatus = initializeLlmExecutionContexts(this->guard->gh->genAiExecutionContextMap); | ||
| if (!llmContextStatus.ok()) { | ||
| return llmContextStatus; | ||
| } | ||
| for (auto& name : this->outputNames) { |
There was a problem hiding this comment.
In the queue path for unary inference, input side packets from the request are currently not rejected (only the streaming queue path checks requestHasInputSidePackets). This can silently ignore user-provided side packets while the non-queue path would apply them via StartRun(). Add the same requestHasInputSidePackets check (and return a clear error) in inferWithQueue to keep behavior consistent and avoid surprising clients.
| MP_RETURN_ON_FAIL(graph.Initialize(this->config), std::string("failed initialization of MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR); | ||
| enum : unsigned int { | ||
| PROCESS, | ||
| TIMER_END2 | ||
| }; | ||
| auto llmContextStatus = initializeLlmExecutionContexts(this->sidePacketMaps.genAiExecutionContextMap); | ||
| if (!llmContextStatus.ok()) { | ||
| return llmContextStatus; | ||
| } |
There was a problem hiding this comment.
initializeLlmExecutionContexts(this->sidePacketMaps.genAiExecutionContextMap) mutates GenAiExecutionContextHolder instances that originate from MediapipeGraphDefinition::sidePacketMaps. Because GenAiExecutionContextMap stores shared_ptr holders, copying GraphSidePackets into multiple executors will share the same holders across concurrent requests in the non-queue path, causing races and cross-request context corruption. Consider creating a fresh per-executor/per-graph GenAiExecutionContextMap (new holders per node) and passing that as the llm_ctx side packet, instead of reusing the shared holders from the definition.
| Status MediapipeGraphDefinition::resolveGraphQueueSize() { | ||
| // 1. Explicit pbtxt directive: # OVMS_GRAPH_QUEUE_SIZE: <value> | ||
| // Always honored regardless of env var or calculator checks. | ||
| // Value -1 disables the queue, AUTO or positive integer enables it. | ||
| // Value 0 is rejected as invalid. | ||
| static const std::regex directiveRegex( | ||
| R"((?:^|\n)\s*#\s*OVMS_GRAPH_QUEUE_SIZE\s*:\s*(\S+)\s*(?:\r?\n|$))"); | ||
| std::smatch match; | ||
| if (std::regex_search(this->chosenConfig, match, directiveRegex)) { | ||
| std::string value = match[1].str(); | ||
| if (value == "AUTO") { | ||
| this->mgconfig.setGraphQueueSizeAuto(); | ||
| return StatusCode::OK; | ||
| } | ||
| auto parsed = stoi32(value); | ||
| if (!parsed.has_value()) { | ||
| SPDLOG_ERROR("Invalid OVMS_GRAPH_QUEUE_SIZE value: '{}'. Expected integer or 'AUTO'.", value); | ||
| return StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID; | ||
| } | ||
| int queueSize = parsed.value(); | ||
| if (queueSize < -1 || queueSize == 0) { | ||
| SPDLOG_ERROR("Invalid OVMS_GRAPH_QUEUE_SIZE value: {}. Must be -1 (disabled) or a positive integer.", queueSize); | ||
| return StatusCode::MEDIAPIPE_GRAPH_CONFIG_FILE_INVALID; | ||
| } | ||
| if (queueSize == -1) { | ||
| SPDLOG_DEBUG("Graph queue explicitly disabled (OVMS_GRAPH_QUEUE_SIZE=-1) for mediapipe: {}", getName()); | ||
| return StatusCode::OK; | ||
| } | ||
| unsigned int maxThreads = std::thread::hardware_concurrency(); | ||
| if (maxThreads > 0 && queueSize > static_cast<int>(maxThreads)) { | ||
| SPDLOG_WARN("OVMS_GRAPH_QUEUE_SIZE value: {} exceeds available hardware threads: {}. Clamping to {}.", queueSize, maxThreads, maxThreads); | ||
| queueSize = static_cast<int>(maxThreads); | ||
| } | ||
| this->mgconfig.setGraphQueueSize(queueSize); | ||
| return StatusCode::OK; | ||
| } | ||
|
|
||
| // 2. Default: queue disabled unless graph explicitly provides directive. | ||
| SPDLOG_DEBUG("Graph queue disabled by default for mediapipe: {}. Add '# OVMS_GRAPH_QUEUE_SIZE: <value>' directive in graph.pbtxt to enable.", getName()); | ||
| return StatusCode::OK; |
There was a problem hiding this comment.
OVMS_GRAPH_QUEUE_SIZE handling here rejects a value of 0, clamps values above hardware_concurrency(), and disables the queue by default when the directive is absent. This diverges from the PR description (0/-1 disable, and default enable with AUTO sizing unless overridden) and from the docs table which states 0 disables the pool. Please align the implementation with the documented decision logic (or update the PR description/docs + tests accordingly), and consider whether oversize values should be rejected vs clamped.
The graph queue pre-initializes a pool of MediaPipe graph instances that are reused across requests, avoiding the overhead of creating and destroying graphs per request. Queue size is resolved during graph validation.
Decision logic (in priority order)
Explicit pbtxt directive — Add a comment in the graph .pbtxt file:
# OVMS_GRAPH_QUEUE_SIZE: AUTO— pool sized tostd::thread::hardware_concurrency()# OVMS_GRAPH_QUEUE_SIZE: <N>— pool of exactly N graphs (must be ≤ hardware threads)# OVMS_GRAPH_QUEUE_SIZE: 0or # OVMS_GRAPH_QUEUE_SIZE: -1 — explicitly disable poolThis directive is always honored, regardless of env var or calculator type.
Environment variable
OVMS_GRAPH_QUEUE_OFF=1— Suppresses the auto-enable default. Does not override an explicit pbtxt directive.If there is no comment in pbtxt file nor environment variable set then graph pool is enabled by default.
Python calculator — If the graph contains PythonExecutorCalculator, the pool is automatically disabled (not yet safe for reuse).
Default — If none of the above apply, the pool is enabled with AUTO sizing.