diff --git a/modelopt/onnx/quantization/autotune/__init__.py b/modelopt/onnx/quantization/autotune/__init__.py new file mode 100644 index 000000000..91e86889f --- /dev/null +++ b/modelopt/onnx/quantization/autotune/__init__.py @@ -0,0 +1,57 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pattern-Based Q/DQ Autotuning for ONNX Models. + +This package provides automated optimization of Quantize/Dequantize (Q/DQ) node placement +in ONNX computation graphs to minimize TensorRT inference latency. It uses pattern-based +region analysis to efficiently explore and optimize Q/DQ insertion strategies. +""" + +# Core data structures +from .benchmark import TensorRTPyBenchmark, TrtExecBenchmark +from .common import ( + AutotunerError, + AutotunerNotInitializedError, + InsertionScheme, + InvalidSchemeError, + Region, + RegionType, +) +from .insertion_points import ( + ChildRegionInputInsertionPoint, + ChildRegionOutputInsertionPoint, + NodeInputInsertionPoint, + ResolvedInsertionPoint, +) +from .region_pattern import RegionPattern +from .region_search import CombinedRegionSearch + +__all__ = [ + "AutotunerError", + "AutotunerNotInitializedError", + "ChildRegionInputInsertionPoint", + "ChildRegionOutputInsertionPoint", + "CombinedRegionSearch", + "InsertionScheme", + "InvalidSchemeError", + "NodeInputInsertionPoint", + "Region", + "RegionPattern", + "RegionType", + "ResolvedInsertionPoint", + "TensorRTPyBenchmark", + "TrtExecBenchmark", +] diff --git a/modelopt/onnx/quantization/autotune/benchmark.py b/modelopt/onnx/quantization/autotune/benchmark.py new file mode 100644 index 000000000..c0850765f --- /dev/null +++ b/modelopt/onnx/quantization/autotune/benchmark.py @@ -0,0 +1,722 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""TensorRT Utilities and Benchmark Module. + +This module provides comprehensive TensorRT utilities including: +- Benchmark framework for measuring TensorRT engine performance +- Graph utilities for tensor analysis + +**Benchmark Classes:** +- Benchmark: Abstract base class defining the benchmarking interface +- TrtExecBenchmark: Uses trtexec command-line tool for benchmarking +- TensorRTPyBenchmark: Uses TensorRT Python API for direct engine profiling +""" + +import contextlib +import ctypes +import importlib.util +import os +import re +import shutil +import subprocess # nosec B404 +import tempfile +import time +from abc import ABC, abstractmethod +from pathlib import Path + +import numpy as np + +from modelopt.onnx.logging_config import logger +from modelopt.onnx.quantization.ort_utils import _check_for_tensorrt + +TRT_AVAILABLE = importlib.util.find_spec("tensorrt") is not None +if TRT_AVAILABLE: + import tensorrt as trt + +CUDART_AVAILABLE = importlib.util.find_spec("cuda") is not None +if CUDART_AVAILABLE: + try: + from cuda import cudart + except ImportError: + with contextlib.suppress(ImportError): + from cuda.bindings import runtime as cudart + + +class Benchmark(ABC): + """Abstract base class for TensorRT model benchmarking. + + This class defines the interface that all benchmark implementations must follow. + It provides a consistent API for measuring inference latency of ONNX models + when converted to TensorRT engines. + + Attributes: + timing_cache_file: Path to the TensorRT timing cache file. + warmup_runs: Number of warmup iterations before timing. + timing_runs: Number of iterations for latency measurement. + plugin_libraries: List of paths to plugin libraries. + logger: Logger instance for this benchmark. + + Subclasses must implement: + run(): Execute the benchmark and return latency in milliseconds. + """ + + def __init__( + self, + timing_cache_file: str | None = None, + warmup_runs: int = 5, + timing_runs: int = 10, + plugin_libraries: list[str] | None = None, + ): + """Initialize the benchmark. + + Args: + timing_cache_file: Path to timing cache file to accelerate engine builds. + If None, uses '/tmp/trtexec_timing.cache' as default. + warmup_runs: Number of warmup iterations before timing measurements. + timing_runs: Number of iterations for latency measurement. Results + are averaged across these runs. + plugin_libraries: List of paths to TensorRT plugin shared libraries (.so files). + These plugins will be loaded during engine building. + If None, no custom plugins are loaded. + """ + self.timing_cache_file = timing_cache_file or "/tmp/trtexec_timing.cache" # nosec B108 + self.warmup_runs = warmup_runs + self.timing_runs = timing_runs + self.plugin_libraries = plugin_libraries or [] + self.logger = logger + + @abstractmethod + def run(self, path_or_bytes: str | bytes, log_file: str | None = None) -> float: + """Run benchmark on the given ONNX model. + + Args: + path_or_bytes: Path to the ONNX model (str) or raw model data (bytes) + log_file: Optional path to save benchmark logs + + Returns: + Measured latency in milliseconds, or float("inf") on failure + """ + raise NotImplementedError("Subclasses must implement this method") + + def __call__(self, path_or_bytes: str | bytes, log_file: str | None = None) -> float: + """Convenience method to call benchmark as a function. + + Args: + path_or_bytes: Path to the ONNX model (str) or raw model data (bytes) + log_file: Optional path to save benchmark logs + + Returns: + Measured latency in milliseconds + """ + return self.run(path_or_bytes, log_file) + + def _write_log_file(self, file: Path | str | None, content: str) -> None: + if file is None: + return + if isinstance(file, str): + file = Path(file) + try: + file.parent.mkdir(parents=True, exist_ok=True) + file.write_text(content) + self.logger.debug(f"Saved logs to: {file}") + except Exception as e: + self.logger.warning(f"Failed to save logs to {file}: {e}") + + +class TrtExecBenchmark(Benchmark): + """TensorRT benchmark using trtexec command-line tool. + + This implementation uses the trtexec binary to build engines and measure + inference latency. It is the most straightforward method and closely + mirrors standard TensorRT workflows. + """ + + def __init__( + self, + timing_cache_file: str | None = None, + warmup_runs: int = 5, + timing_runs: int = 10, + plugin_libraries: list[str] | None = None, + trtexec_path: str = "trtexec", + trtexec_args: list[str] | None = None, + ): + """Initialize the trtexec benchmark. + + Args: + timing_cache_file: Path to TensorRT timing cache file for faster + subsequent builds. Defaults to '/tmp/trtexec_timing.cache'. + warmup_runs: Number of warmup iterations before timing measurements. + timing_runs: Number of iterations for latency measurement. Results + are averaged across these runs. + plugin_libraries: List of paths to TensorRT plugin shared libraries (.so files). + These plugins will be loaded by trtexec during engine building. + If None, no custom plugins are loaded. + trtexec_path: Path to trtexec binary. Defaults to 'trtexec' which + looks for the binary in PATH. + trtexec_args: Additional command-line arguments to pass to trtexec. + These are appended after the standard arguments. + Example: ['--fp16', '--workspace=4096', '--verbose'] + """ + super().__init__(timing_cache_file, warmup_runs, timing_runs, plugin_libraries) + self.trtexec_path = trtexec_path + self.trtexec_args = trtexec_args if trtexec_args is not None else [] + self.temp_dir = tempfile.mkdtemp(prefix="trtexec_benchmark_") + self.engine_path = os.path.join(self.temp_dir, "engine.trt") + self.temp_model_path = os.path.join(self.temp_dir, "temp_model.onnx") + self.logger.debug(f"Created temporary engine directory: {self.temp_dir}") + self.logger.debug(f"Temporary model path: {self.temp_model_path}") + self.latency_pattern = r"\[I\]\s+Latency:.*?median\s*=\s*([\d.]+)\s*ms" + + self._base_cmd = [ + self.trtexec_path, + f"--avgRuns={self.timing_runs}", + f"--iterations={self.timing_runs}", + f"--warmUp={self.warmup_runs}", + "--stronglyTyped", + f"--saveEngine={self.engine_path}", + f"--timingCacheFile={self.timing_cache_file}", + ] + + for plugin_lib in self.plugin_libraries: + plugin_path = Path(plugin_lib).resolve() + if not plugin_path.exists(): + self.logger.warning(f"Plugin library not found: {plugin_path}") + continue + self._base_cmd.append(f"--staticPlugins={plugin_path}") + self.logger.debug(f"Added plugin library: {plugin_path}") + + trtexec_args = self.trtexec_args or [] + has_remote_config = any("--remoteAutoTuningConfig" in arg for arg in trtexec_args) + + if has_remote_config: + try: + _check_for_tensorrt(min_version="10.16") + self.logger.debug("TensorRT Python API version >= 10.16 detected") + return + except ImportError: + self.logger.warning( + "Remote autotuning is not supported with TensorRT version < 10.16" + "Removing --remoteAutoTuningConfig from trtexec arguments" + ) + trtexec_args = [ + arg for arg in trtexec_args if "--remoteAutoTuningConfig" not in arg + ] + self._base_cmd.extend(trtexec_args) + + self.logger.debug(f"Base command template: {' '.join(self._base_cmd)}") + + def __del__(self): + """Cleanup temporary directory.""" + if hasattr(self, "temp_dir"): + try: + shutil.rmtree(self.temp_dir, ignore_errors=True) + self.logger.debug(f"Cleaned up temporary directory: {self.temp_dir}") + except Exception as e: + self.logger.warning(f"Failed to cleanup temporary directory: {e}") + + def run( + self, + path_or_bytes: str | bytes, + log_file: str | None = None, + ) -> float: + """Run benchmark using trtexec. + + Args: + path_or_bytes: Path to the ONNX model (str) or raw model data (bytes) + log_file: Optional path to save trtexec logs + + Returns: + Measured median latency in milliseconds + """ + if not os.path.exists(self.timing_cache_file): + self.logger.debug(f"Will create timing cache: {self.timing_cache_file}") + + try: + model_path = path_or_bytes + if isinstance(model_path, bytes): + with open(self.temp_model_path, "wb") as f: + f.write(model_path) + model_path = self.temp_model_path + self.logger.debug(f"Wrote model bytes to temporary file: {model_path}") + + cmd = [*self._base_cmd, f"--onnx={model_path}"] + self.logger.debug(f"Running: {' '.join(cmd)}") + result = subprocess.run(cmd, capture_output=True, text=True) # nosec B603 + self._write_log_file( + log_file, + "\n".join( + [ + f"Command: {' '.join(cmd)}", + f"Return code: {result.returncode}", + "=" * 80, + "STDOUT:", + "=" * 80, + result.stdout, + "\n" + "=" * 80, + "STDERR:", + "=" * 80, + result.stderr, + "\n" + "=" * 80, + ] + ), + ) + if result.returncode != 0: + self.logger.error(f"trtexec failed with return code {result.returncode}") + self.logger.error(f"stderr: {result.stderr}") + return float("inf") + + if not (match := re.search(self.latency_pattern, result.stdout, re.IGNORECASE)): + self.logger.warning("Could not parse median latency from trtexec output") + self.logger.debug(f"trtexec stdout:\n{result.stdout}") + return float("inf") + latency = float(match.group(1)) + self.logger.info(f"TrtExec benchmark (median): {latency:.2f} ms") + return latency + except FileNotFoundError: + self.logger.error(f"trtexec binary not found: {self.trtexec_path}") + self.logger.error("Please ensure TensorRT is installed and trtexec path is correct") + return float("inf") + except Exception as e: + self.logger.error(f"Benchmark failed: {e}") + return float("inf") + + +class TensorRTPyBenchmark(Benchmark): + """TensorRT benchmark using Python API with plugin support. + + This implementation directly uses the TensorRT Python API to build engines + and measure inference latency. It provides more control than trtexec and + can be faster for certain workflows as it avoids subprocess overhead. + """ + + def __init__( + self, + timing_cache_file: str | None = None, + warmup_runs: int = 5, + timing_runs: int = 20, + plugin_libraries: list[str] | None = None, + ): + """Initialize the TensorRT Python API benchmark. + + Creates persistent TensorRT objects (Logger, Builder, Runtime) and + loads the timing cache from disk if available. Optionally loads custom + TensorRT plugin libraries for models with custom operations. + + Args: + timing_cache_file: Path to TensorRT timing cache file. If None, + defaults to '/tmp/trtexec_timing.cache'. + warmup_runs: Number of warmup iterations before timing measurements. + timing_runs: Number of iterations for latency measurement. + plugin_libraries: List of paths to TensorRT plugin shared libraries (.so files). + These plugins will be loaded and registered for use during + engine building. If None, no custom plugins are loaded. + + Raises: + ImportError: If tensorrt or cuda-python (cudart) packages are not available. + FileNotFoundError: If a specified plugin library file does not exist. + RuntimeError: If plugin library loading fails. + """ + super().__init__(timing_cache_file, warmup_runs, timing_runs, plugin_libraries) + + if not TRT_AVAILABLE: + raise ImportError("TensorRT Python API not available. Please install tensorrt package.") + if not CUDART_AVAILABLE or cudart is None: + raise ImportError( + "CUDA Runtime (cudart) not available. Please install cuda-python package: pip install cuda-python" + ) + + self.trt_logger = trt.Logger(trt.Logger.WARNING) + self.builder = trt.Builder(self.trt_logger) + self.runtime = trt.Runtime(self.trt_logger) + self._loaded_plugin_handles = [] + if self.plugin_libraries: + self._load_plugin_libraries() + trt.init_libnvinfer_plugins(self.trt_logger, "") + self._plugin_registry = trt.get_plugin_registry() + + self.network_flags = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) + self.network_flags |= 1 << int(trt.NetworkDefinitionCreationFlag.STRONGLY_TYPED) + self._timing_cache = None + self._load_timing_cache() + self._shape_configs = {} + + def _load_plugin_libraries(self): + """Load custom TensorRT plugin libraries from shared object files. + + This method loads plugin libraries using ctypes and initializes them + with the TensorRT plugin registry. Plugins must export the + initLibNvInferPlugins function to register their implementations. + + The loaded library handles are stored to prevent them from being + garbage collected during the benchmark's lifetime. + + Raises: + FileNotFoundError: If a plugin library file does not exist. + RuntimeError: If plugin initialization fails. + """ + for plugin_lib in self.plugin_libraries: + plugin_path = Path(plugin_lib).resolve() + + if not plugin_path.exists(): + raise FileNotFoundError(f"Plugin library not found: {plugin_path}") + + self.logger.info(f"Loading TensorRT plugin: {plugin_path}") + + try: + if hasattr(os, "RTLD_LAZY") and hasattr(os, "RTLD_GLOBAL"): + plugin_handle = ctypes.CDLL( + str(plugin_path), mode=os.RTLD_LAZY | os.RTLD_GLOBAL + ) + else: + # Fallback for platforms without RTLD flags (e.g., Windows) + plugin_handle = ctypes.CDLL(str(plugin_path)) + + # Store handle to prevent garbage collection + self._loaded_plugin_handles.append(plugin_handle) + + # Try to initialize plugin with TensorRT registry + # Most TensorRT plugins export initLibNvInferPlugins function + if hasattr(plugin_handle, "initLibNvInferPlugins"): + init_func = plugin_handle.initLibNvInferPlugins + # Function signature: bool initLibNvInferPlugins(void* logger, const char* namespace) + init_func.argtypes = [ctypes.c_void_p, ctypes.c_char_p] + init_func.restype = ctypes.c_bool + + # Initialize with the TensorRT logger and default namespace + success = init_func(None, b"") + if not success: + self.logger.warning( + f"Plugin initialization returned false for: {plugin_path}" + ) + else: + self.logger.info(f"Successfully initialized plugin: {plugin_path.name}") + else: + self.logger.info( + f"Plugin loaded (no initLibNvInferPlugins function): {plugin_path.name}" + ) + + except Exception as e: + raise RuntimeError(f"Failed to load plugin library {plugin_path}: {e}") from e + + def set_shapes(self, input_name: str, min_shape: list, opt_shape: list, max_shape: list): + """Set custom min/opt/max shapes for a dynamic input. + + This method allows you to specify custom shape ranges for dynamic inputs + (inputs with -1 dimensions). If not specified, the benchmark will use + default shapes (all -1 dimensions become 1). + + Args: + input_name: Name of the input tensor to configure. + min_shape: Minimum shape for this input. List of integers. + opt_shape: Optimal/default shape for this input. List of integers. + max_shape: Maximum shape for this input. List of integers. + """ + if len(min_shape) != len(opt_shape) or len(opt_shape) != len(max_shape): + raise ValueError("min_shape, opt_shape, and max_shape must have the same length") + + for i, (min_dim, opt_dim, max_dim) in enumerate(zip(min_shape, opt_shape, max_shape)): + if not (min_dim <= opt_dim <= max_dim): + raise ValueError( + f"Invalid shape range at dimension {i}: " + f"min={min_dim}, opt={opt_dim}, max={max_dim}. " + f"Must satisfy min <= opt <= max" + ) + + self._shape_configs[input_name] = (min_shape, opt_shape, max_shape) + self.logger.debug( + f"Set shapes for input '{input_name}': " + f"min={min_shape}, opt={opt_shape}, max={max_shape}" + ) + + def run( + self, + path_or_bytes: str | bytes, + log_file: str | None = None, + flush_timing_cache: bool = False, + ) -> float: + """Run benchmark using TensorRT Python API. + + Args: + path_or_bytes: Path to the ONNX model (str) or raw model data (bytes) + log_file: Optional path to save benchmark logs + + Returns: + Measured median latency in milliseconds + """ + config = network = parser = serialized_engine = engine = context = stream_handle = None + inputs, outputs = [], [] + + try: + self.logger.debug("Creating TensorRT builder...") + config = self.builder.create_builder_config() + config.set_flag(trt.BuilderFlag.DIRECT_IO) + if not config.set_timing_cache(self._timing_cache, ignore_mismatch=True): + self.logger.warning("Failed to set timing cache to builder config") + network = self.builder.create_network(self.network_flags) + parser = trt.OnnxParser(network, self.trt_logger) + if isinstance(path_or_bytes, bytes): + self.logger.debug(f"Parsing ONNX model from bytes (size: {len(path_or_bytes)})") + model_data = path_or_bytes + else: + self.logger.debug(f"Parsing ONNX model: {path_or_bytes}") + with open(path_or_bytes, "rb") as f: + model_data = f.read() + + if not parser.parse(model_data): + self.logger.error("Failed to parse ONNX model") + for error_idx in range(parser.num_errors): + self.logger.error(f" {parser.get_error(error_idx)}") + return float("inf") + + has_dynamic_shapes = any( + any(dim == -1 for dim in network.get_input(i).shape) + for i in range(network.num_inputs) + ) + + if has_dynamic_shapes: + profile = self.builder.create_optimization_profile() + for i in range(network.num_inputs): + input_tensor = network.get_input(i) + input_name = input_tensor.name + shape = list(input_tensor.shape) + + if input_name in self._shape_configs: + min_shape, opt_shape, max_shape = self._shape_configs[input_name] + self.logger.debug( + f"Using custom shapes for input '{input_name}': " + f"min={min_shape}, opt={opt_shape}, max={max_shape}" + ) + else: + min_shape = [1 if dim == -1 else dim for dim in shape] + opt_shape = [1 if dim == -1 else dim for dim in shape] + max_shape = [1 if dim == -1 else dim for dim in shape] + self.logger.debug( + f"Using default shapes for input '{input_name}': {opt_shape}" + ) + + profile.set_shape(input_name, min_shape, opt_shape, max_shape) + + config.add_optimization_profile(profile) + + self.logger.debug("Building TensorRT engine...") + build_start = time.perf_counter() + serialized_engine = self.builder.build_serialized_network(network, config) + build_time = time.perf_counter() - build_start + + if serialized_engine is None: + self.logger.error("Failed to build TensorRT engine") + return float("inf") + + self.logger.debug(f"Engine built successfully in {build_time:.2f}s") + + if flush_timing_cache: + self._save_timing_cache() + + engine = self.runtime.deserialize_cuda_engine(serialized_engine) + + if engine is None: + self.logger.error("Failed to deserialize engine") + return float("inf") + + context = engine.create_execution_context() + + inputs = [] + outputs = [] + stream_handle = None + + def _alloc_pinned_host(size: int, dtype: np.dtype): + nbytes = size * np.dtype(dtype).itemsize + err, host_ptr = cudart.cudaMallocHost(nbytes) + if err != cudart.cudaError_t.cudaSuccess: + raise RuntimeError(f"cudaMallocHost failed: {err}") + addr = int(host_ptr) if hasattr(host_ptr, "__int__") else host_ptr + ctype = np.ctypeslib.as_ctypes_type(dtype) + arr = np.ctypeslib.as_array((ctype * size).from_address(addr)) + return host_ptr, arr + + for i in range(engine.num_io_tensors): + tensor_name = engine.get_tensor_name(i) + dtype = trt.nptype(engine.get_tensor_dtype(tensor_name)) + shape = context.get_tensor_shape(tensor_name) + + size = trt.volume(shape) + nbytes = size * np.dtype(dtype).itemsize + + err, device_ptr = cudart.cudaMalloc(nbytes) + if err != cudart.cudaError_t.cudaSuccess: + raise RuntimeError(f"cudaMalloc failed: {err}") + + host_ptr, host_mem = _alloc_pinned_host(size, dtype) + + if engine.get_tensor_mode(tensor_name) == trt.TensorIOMode.INPUT: + np.copyto(host_mem, np.random.randn(size).astype(dtype)) + inputs.append( + { + "host_ptr": host_ptr, + "host": host_mem, + "device_ptr": device_ptr, + "nbytes": nbytes, + "name": tensor_name, + } + ) + else: + outputs.append( + { + "host_ptr": host_ptr, + "host": host_mem, + "device_ptr": device_ptr, + "nbytes": nbytes, + "name": tensor_name, + } + ) + + context.set_tensor_address(tensor_name, int(device_ptr)) + + err, stream_handle = cudart.cudaStreamCreate() + if err != cudart.cudaError_t.cudaSuccess: + raise RuntimeError(f"cudaStreamCreate failed: {err}") + + h2d = cudart.cudaMemcpyKind.cudaMemcpyHostToDevice + d2h = cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost + + self.logger.debug(f"Running {self.warmup_runs} warmup iterations...") + for _ in range(self.warmup_runs): + for inp in inputs: + cudart.cudaMemcpyAsync( + inp["device_ptr"], inp["host_ptr"], inp["nbytes"], h2d, stream_handle + ) + context.execute_async_v3(stream_handle) + for out in outputs: + cudart.cudaMemcpyAsync( + out["host_ptr"], out["device_ptr"], out["nbytes"], d2h, stream_handle + ) + cudart.cudaStreamSynchronize(stream_handle) + + self.logger.debug(f"Running {self.timing_runs} timing iterations...") + latencies = [] + + for _ in range(self.timing_runs): + for inp in inputs: + cudart.cudaMemcpyAsync( + inp["device_ptr"], inp["host_ptr"], inp["nbytes"], h2d, stream_handle + ) + + cudart.cudaStreamSynchronize(stream_handle) + start = time.perf_counter() + context.execute_async_v3(stream_handle) + cudart.cudaStreamSynchronize(stream_handle) + end = time.perf_counter() + + latency_ms = (end - start) * 1000.0 + latencies.append(latency_ms) + + for out in outputs: + cudart.cudaMemcpyAsync( + out["host_ptr"], out["device_ptr"], out["nbytes"], d2h, stream_handle + ) + + latencies = np.array(latencies) + median_latency = float(np.median(latencies)) + mean_latency = float(np.mean(latencies)) + std_latency = float(np.std(latencies)) + min_latency = float(np.min(latencies)) + max_latency = float(np.max(latencies)) + + self.logger.info( + f"TensorRT Python API benchmark: min={min_latency:.3f}ms, max={max_latency:.3f}ms, " + f"mean={mean_latency:.3f}ms, std={std_latency:.3f}ms, median={median_latency:.3f}ms" + ) + + model_info = ( + f"" + if isinstance(path_or_bytes, bytes) + else path_or_bytes + ) + self._write_log_file( + log_file, + "\n".join( + [ + "TensorRT Python API Benchmark", + f"Model: {model_info}", + f"Build time: {build_time:.2f}s", + f"Warmup runs: {self.warmup_runs}", + f"Timing runs: {self.timing_runs}", + "Latency Statistics:", + f" Min: {min_latency:.3f} ms", + f" Max: {max_latency:.3f} ms", + f" Mean: {mean_latency:.3f} ms", + f" Std: {std_latency:.3f} ms", + f" Median: {median_latency:.3f} ms", + f"All latencies: {latencies.tolist()}", + ] + ), + ) + return median_latency + except Exception as e: + self.logger.error(f"Benchmark failed: {e}", exc_info=True) + return float("inf") + finally: + try: + for buf in inputs + outputs: + if "host_ptr" in buf: + cudart.cudaFreeHost(buf["host_ptr"]) + if "device_ptr" in buf: + cudart.cudaFree(buf["device_ptr"]) + if stream_handle is not None: + cudart.cudaStreamDestroy(stream_handle) + del ( + inputs, + outputs, + stream_handle, + context, + engine, + serialized_engine, + parser, + network, + config, + ) + except Exception as cleanup_error: + self.logger.warning(f"Error during cleanup: {cleanup_error}") + + def _load_timing_cache(self): + """Load timing cache from file or create a new one.""" + config = self.builder.create_builder_config() + if os.path.exists(self.timing_cache_file): + try: + with open(self.timing_cache_file, "rb") as f: + timing_cache_data = f.read() + self._timing_cache = config.create_timing_cache(timing_cache_data) + self.logger.debug(f"Loaded timing cache from: {self.timing_cache_file}") + except Exception as e: + self.logger.warning(f"Failed to load timing cache: {e}") + self.logger.debug("Creating new timing cache") + self._timing_cache = None + + if self._timing_cache is None: + self._timing_cache = config.create_timing_cache(b"") + self.logger.debug("Created new timing cache") + del config + + def _save_timing_cache(self): + """Save timing cache to file.""" + try: + if self._timing_cache is not None: + timing_cache_data = self._timing_cache.serialize() + with open(self.timing_cache_file, "wb") as f: + f.write(timing_cache_data) + self.logger.debug(f"Saved timing cache to: {self.timing_cache_file}") + except Exception as e: + self.logger.warning(f"Failed to save timing cache: {e}") diff --git a/tests/_test_utils/onnx/quantization/autotune/models.py b/tests/_test_utils/onnx/quantization/autotune/models.py new file mode 100644 index 000000000..4090cfef3 --- /dev/null +++ b/tests/_test_utils/onnx/quantization/autotune/models.py @@ -0,0 +1,47 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Shared test ONNX models for autotuner unit tests. + +Model creation functions live here; tests import and call them directly. +""" + +import onnx +from onnx import helper + + +def _create_simple_conv_onnx_model(): + """Build ONNX model: Input -> Conv -> Relu -> Output (minimal for autotuner tests).""" + input_tensor = helper.make_tensor_value_info("input", onnx.TensorProto.FLOAT, [1, 3, 224, 224]) + output_tensor = helper.make_tensor_value_info( + "output", onnx.TensorProto.FLOAT, [1, 64, 224, 224] + ) + conv_node = helper.make_node( + "Conv", inputs=["input", "conv_weight"], outputs=["conv_out"], name="conv" + ) + relu_node = helper.make_node("Relu", inputs=["conv_out"], outputs=["output"], name="relu") + graph = helper.make_graph( + [conv_node, relu_node], + "simple_conv", + [input_tensor], + [output_tensor], + initializer=[ + helper.make_tensor( + "conv_weight", onnx.TensorProto.FLOAT, [64, 3, 3, 3], [0.1] * (64 * 3 * 3 * 3) + ) + ], + ) + return helper.make_model(graph, producer_name="test") diff --git a/tests/gpu/onnx/quantization/autotune/test_benchmark.py b/tests/gpu/onnx/quantization/autotune/test_benchmark.py new file mode 100644 index 000000000..a089d99d9 --- /dev/null +++ b/tests/gpu/onnx/quantization/autotune/test_benchmark.py @@ -0,0 +1,124 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""GPU tests for autotune Benchmark (TrtExecBenchmark, TensorRTPyBenchmark).""" + +import contextlib +import os +import shutil +import tempfile + +import pytest +from _test_utils.onnx.quantization.autotune.models import _create_simple_conv_onnx_model + +from modelopt.onnx.quantization.autotune import TensorRTPyBenchmark, TrtExecBenchmark + + +@pytest.fixture +def simple_conv_model_bytes(): + """ONNX model bytes: Input -> Conv -> Relu -> Output (from _test_utils).""" + model = _create_simple_conv_onnx_model() + return model.SerializeToString() + + +@pytest.fixture +def simple_conv_model_path(simple_conv_model_bytes, tmp_path): + """Path to ONNX model file (same graph as simple_conv_model_bytes).""" + path = tmp_path / "simple_conv.onnx" + path.write_bytes(simple_conv_model_bytes) + return str(path) + + +class TestTensorRTPyBenchmark: + """Tests for TensorRTPyBenchmark (TensorRT Python API + cudart).""" + + @pytest.fixture(autouse=True) + def _require_tensorrt_and_cudart(self): + pytest.importorskip("tensorrt") + try: + from cuda import cudart # noqa: F401 + except ImportError: + try: + from cuda.bindings import runtime # noqa: F401 + except ImportError: + pytest.skip("cuda-python (cudart) not available", allow_module_level=False) + + def test_run_with_bytes(self, simple_conv_model_bytes): + """TensorRTPyBenchmark accepts model bytes and returns finite latency.""" + benchmark = TensorRTPyBenchmark(warmup_runs=1, timing_runs=2) + latency_ms = benchmark.run(simple_conv_model_bytes) + assert isinstance(latency_ms, float) + assert latency_ms > 0 + assert latency_ms != float("inf") + + def test_run_with_path(self, simple_conv_model_path): + """TensorRTPyBenchmark accepts model path and returns finite latency.""" + benchmark = TensorRTPyBenchmark(warmup_runs=1, timing_runs=2) + latency_ms = benchmark.run(simple_conv_model_path) + assert isinstance(latency_ms, float) + assert latency_ms > 0 + assert latency_ms != float("inf") + + def test_callable(self, simple_conv_model_bytes): + """Benchmark is callable and returns same as run().""" + benchmark = TensorRTPyBenchmark(warmup_runs=1, timing_runs=2) + latency_ms = benchmark(simple_conv_model_bytes) + assert isinstance(latency_ms, float) + assert latency_ms > 0 + + +class TestTrtExecBenchmark: + """Tests for TrtExecBenchmark (trtexec CLI).""" + + @pytest.fixture(autouse=True) + def _require_trtexec(self): + if shutil.which("trtexec") is None: + pytest.skip("trtexec not found in PATH", allow_module_level=False) + + def test_run_with_path(self, simple_conv_model_path): + """TrtExecBenchmark accepts model path and returns finite latency.""" + with tempfile.NamedTemporaryFile(suffix=".cache", delete=False) as f: + cache_path = f.name + try: + benchmark = TrtExecBenchmark( + timing_cache_file=cache_path, + warmup_runs=1, + timing_runs=2, + ) + latency_ms = benchmark.run(simple_conv_model_path) + assert isinstance(latency_ms, float) + assert latency_ms > 0 + assert latency_ms != float("inf") + finally: + with contextlib.suppress(OSError): + os.unlink(cache_path) + + def test_run_with_bytes(self, simple_conv_model_bytes): + """TrtExecBenchmark accepts model bytes (writes temp file) and returns finite latency.""" + with tempfile.NamedTemporaryFile(suffix=".cache", delete=False) as f: + cache_path = f.name + try: + benchmark = TrtExecBenchmark( + timing_cache_file=cache_path, + warmup_runs=1, + timing_runs=2, + ) + latency_ms = benchmark.run(simple_conv_model_bytes) + assert isinstance(latency_ms, float) + assert latency_ms > 0 + assert latency_ms != float("inf") + finally: + with contextlib.suppress(OSError): + os.unlink(cache_path)