diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 4e5fc705..c7f99421 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -416,7 +416,7 @@ jobs: - name: Install dependencies run: | - sudo apt-get install -y sysstat python3-pip + sudo apt-get install -y sysstat python3-pip numactl sudo python3 -m pip install psycopg2-binary boto3 hdrhistogram # async-profiler for flame graphs diff --git a/.github/workflows/benchmark_orchestrator.py b/.github/workflows/benchmark_orchestrator.py index e590dcae..7d75226c 100755 --- a/.github/workflows/benchmark_orchestrator.py +++ b/.github/workflows/benchmark_orchestrator.py @@ -885,7 +885,6 @@ def parse_perf_stat(filepath: Path, hardware_available: bool) -> dict: class BenchmarkOrchestrator: """Main benchmark orchestration class""" - INFRA_CORES = "0-3" AWS_REGION = "us-east-1" def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Path, @@ -919,20 +918,89 @@ def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Pa self.variance_control = VarianceControl() self.variance_control.setup(network_delay_ms=network_delay_ms) - # Calculate core allocation after SMT is disabled - cpu_count = os.cpu_count() or 8 - if cpu_count > 4: - self.benchmark_cores = f"4-{cpu_count - 1}" - else: - self.benchmark_cores = "0-3" - print(f"WARNING: Only {cpu_count} cores, benchmark shares " - f"cores with server") - print(f"Core allocation: Server={self.INFRA_CORES}, " - f"Benchmark={self.benchmark_cores}") + # Detect NUMA topology and allocate cores accordingly + self._setup_numa_aware_cores() # Java JAR path - find the shaded JAR dynamically self.java_jar = self._find_java_jar() + def _get_numa_topology(self) -> dict: + """ + Detect NUMA topology from /sys filesystem. + Returns dict: {node_id: [list of cpu cores]} + """ + numa_nodes = {} + numa_base = Path("/sys/devices/system/node") + + if not numa_base.exists(): + print(" ⚠ NUMA topology not available, assuming single node") + cpu_count = os.cpu_count() or 8 + return {0: list(range(cpu_count))} + + for node_dir in sorted(numa_base.glob("node[0-9]*")): + node_id = int(node_dir.name.replace("node", "")) + cpulist_file = node_dir / "cpulist" + if cpulist_file.exists(): + cpulist_str = cpulist_file.read_text().strip() + cores = self._parse_cpulist(cpulist_str) + numa_nodes[node_id] = cores + + if not numa_nodes: + cpu_count = os.cpu_count() or 8 + return {0: list(range(cpu_count))} + + return numa_nodes + + def _parse_cpulist(self, cpulist: str) -> list: + """Parse CPU list format like '0-3,8-11' into [0,1,2,3,8,9,10,11]""" + cores = [] + for part in cpulist.split(","): + if "-" in part: + start, end = part.split("-") + cores.extend(range(int(start), int(end) + 1)) + else: + cores.append(int(part)) + return sorted(cores) + + def _setup_numa_aware_cores(self): + """ + Detect NUMA topology and allocate cores across NUMA nodes. + Server runs on node 0, benchmark runs on node 1 (if available). + """ + numa_topology = self._get_numa_topology() + + print(f"NUMA topology detected: {len(numa_topology)} node(s)") + for node_id, cores in numa_topology.items(): + print(f" Node {node_id}: {len(cores)} cores ({min(cores)}-{max(cores)})") + + if len(numa_topology) >= 2: + # Two or more NUMA nodes: server on node 0, benchmark on node 1 + node0_cores = numa_topology[0] + node1_cores = numa_topology[1] + + self.infra_numa_node = 0 + self.benchmark_numa_node = 1 + + # Use all cores on each node + self.infra_cores = f"{node0_cores[0]}-{node0_cores[-1]}" + self.benchmark_cores = f"{node1_cores[0]}-{node1_cores[-1]}" + + print(f"Split NUMA allocation:") + print(f" Server: NUMA node {self.infra_numa_node}, cores {self.infra_cores}") + print(f" Benchmark: NUMA node {self.benchmark_numa_node}, cores {self.benchmark_cores}") + else: + # Single NUMA node: all cores shared by server and benchmark + node_cores = numa_topology[0] + self.infra_numa_node = 0 + self.benchmark_numa_node = 0 + + all_cores = f"{node_cores[0]}-{node_cores[-1]}" + self.infra_cores = all_cores + self.benchmark_cores = all_cores + + print(f"Single NUMA node {self.infra_numa_node}: all cores shared") + print(f" Cores: {all_cores}") + def _find_java_jar(self) -> Path: """Find the benchmark JAR file dynamically.""" target_dir = self.resp_bench_dir / "java/target" @@ -984,22 +1052,27 @@ def _get_server_port(self) -> int: return 7379 if self._is_cluster_mode() else 6379 def _pin_server_processes(self): - """Pin all running valkey-server processes to designated cores.""" + """Pin all running valkey-server processes to designated cores and NUMA node.""" work_dir = self.resp_bench_dir / "work" pinned = 0 for pid_file in work_dir.glob("*.pid"): try: pid = int(pid_file.read_text().strip()) + # Pin CPU to infra cores result = subprocess.run( - ["taskset", "-cp", self.INFRA_CORES, str(pid)], + ["taskset", "-cp", self.infra_cores, str(pid)], capture_output=True, text=True) if result.returncode == 0: pinned += 1 else: print(f" Warning: Failed to pin PID {pid}: {result.stderr}") + # Migrate memory to infra NUMA node + subprocess.run( + ["migratepages", str(pid), "all", str(self.infra_numa_node)], + capture_output=True, text=True) except (ValueError, FileNotFoundError) as e: print(f" Warning: Could not read PID from {pid_file}: {e}") - print(f"Pinned {pinned} server processes to cores {self.INFRA_CORES}") + print(f"Pinned {pinned} server processes to NUMA node {self.infra_numa_node}, cores {self.infra_cores}") def start_infrastructure(self): if self.skip_infra: @@ -1012,7 +1085,7 @@ def start_infrastructure(self): stop_target = "server-cluster-stop" if is_cluster else "server-standalone-stop" port = self._get_server_port() - print(f"Starting Valkey {mode_name} infrastructure on cores {self.INFRA_CORES}...") + print(f"Starting Valkey {mode_name} infrastructure on NUMA node {self.infra_numa_node}...") subprocess.run(["make", stop_target], cwd=self.resp_bench_dir, capture_output=True) subprocess.run(["pkill", "-f", "valkey-server"], capture_output=True) @@ -1023,7 +1096,8 @@ def start_infrastructure(self): shutil.rmtree(work_dir) result = subprocess.run( - ["taskset", "-c", self.INFRA_CORES, "make", make_target], + ["numactl", f"--cpunodebind={self.infra_numa_node}", + f"--membind={self.infra_numa_node}", "make", make_target], cwd=self.resp_bench_dir, timeout=600) if result.returncode != 0: raise RuntimeError( @@ -1060,6 +1134,9 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen: server = f"localhost:{port}" cmd = [ + "numactl", + f"--cpunodebind={self.benchmark_numa_node}", + f"--membind={self.benchmark_numa_node}", "taskset", "-c", self.benchmark_cores, "java", "-XX:+EnableDynamicAgentLoading", # Allow async-profiler to attach @@ -1074,7 +1151,7 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen: if self.resp_bench_commit: cmd.extend(["--commit-id", self.resp_bench_commit]) - print(f"Starting Java benchmark on cores {self.benchmark_cores}") + print(f"Starting Java benchmark on NUMA node {self.benchmark_numa_node}, cores {self.benchmark_cores}") print(f" Server: {server}") print(f" Driver: {self.driver_config_path}") print(f" Workload: {self.workload_config_path}")