From 57a3e872182273e6838eee23f915e57280a8d5e8 Mon Sep 17 00:00:00 2001 From: Lior Sventitzky Date: Sun, 22 Feb 2026 09:45:30 +0000 Subject: [PATCH 1/4] fix numa core pinning, added jvm tuning Signed-off-by: Lior Sventitzky --- .github/workflows/benchmark.yml | 2 +- .github/workflows/benchmark_orchestrator.py | 100 ++++++++++++++++---- 2 files changed, 85 insertions(+), 17 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 4e5fc705..c7f99421 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -416,7 +416,7 @@ jobs: - name: Install dependencies run: | - sudo apt-get install -y sysstat python3-pip + sudo apt-get install -y sysstat python3-pip numactl sudo python3 -m pip install psycopg2-binary boto3 hdrhistogram # async-profiler for flame graphs diff --git a/.github/workflows/benchmark_orchestrator.py b/.github/workflows/benchmark_orchestrator.py index e590dcae..99a150fa 100755 --- a/.github/workflows/benchmark_orchestrator.py +++ b/.github/workflows/benchmark_orchestrator.py @@ -885,7 +885,6 @@ def parse_perf_stat(filepath: Path, hardware_available: bool) -> dict: class BenchmarkOrchestrator: """Main benchmark orchestration class""" - INFRA_CORES = "0-3" AWS_REGION = "us-east-1" def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Path, @@ -919,20 +918,84 @@ def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Pa self.variance_control = VarianceControl() self.variance_control.setup(network_delay_ms=network_delay_ms) - # Calculate core allocation after SMT is disabled - cpu_count = os.cpu_count() or 8 - if cpu_count > 4: - self.benchmark_cores = f"4-{cpu_count - 1}" - else: - self.benchmark_cores = "0-3" - print(f"WARNING: Only {cpu_count} cores, benchmark shares " - f"cores with server") - print(f"Core allocation: Server={self.INFRA_CORES}, " - f"Benchmark={self.benchmark_cores}") + # Detect NUMA topology and allocate cores accordingly + self._setup_numa_aware_cores() # Java JAR path - find the shaded JAR dynamically self.java_jar = self._find_java_jar() + def _get_numa_topology(self) -> dict: + """ + Detect NUMA topology from /sys filesystem. + Returns dict: {node_id: [list of cpu cores]} + """ + numa_nodes = {} + numa_base = Path("/sys/devices/system/node") + + if not numa_base.exists(): + print(" ⚠ NUMA topology not available, assuming single node") + cpu_count = os.cpu_count() or 8 + return {0: list(range(cpu_count))} + + for node_dir in sorted(numa_base.glob("node[0-9]*")): + node_id = int(node_dir.name.replace("node", "")) + cpulist_file = node_dir / "cpulist" + if cpulist_file.exists(): + cpulist_str = cpulist_file.read_text().strip() + cores = self._parse_cpulist(cpulist_str) + numa_nodes[node_id] = cores + + if not numa_nodes: + cpu_count = os.cpu_count() or 8 + return {0: list(range(cpu_count))} + + return numa_nodes + + def _parse_cpulist(self, cpulist: str) -> list: + """Parse CPU list format like '0-3,8-11' into [0,1,2,3,8,9,10,11]""" + cores = [] + for part in cpulist.split(","): + if "-" in part: + start, end = part.split("-") + cores.extend(range(int(start), int(end) + 1)) + else: + cores.append(int(part)) + return sorted(cores) + + def _setup_numa_aware_cores(self): + """ + Detect NUMA topology and allocate cores from a single NUMA node. + Uses the NUMA node with the most cores. + """ + numa_topology = self._get_numa_topology() + + print(f"NUMA topology detected: {len(numa_topology)} node(s)") + for node_id, cores in numa_topology.items(): + print(f" Node {node_id}: {len(cores)} cores ({min(cores)}-{max(cores)})") + + # Pick the NUMA node with the most cores + best_node = max(numa_topology.keys(), key=lambda n: len(numa_topology[n])) + node_cores = numa_topology[best_node] + + self.numa_node = best_node + + # Allocate cores: first 4 for server, rest for benchmark + if len(node_cores) >= 8: + self.infra_cores = f"{node_cores[0]}-{node_cores[3]}" + self.benchmark_cores = f"{node_cores[4]}-{node_cores[-1]}" + elif len(node_cores) > 4: + self.infra_cores = f"{node_cores[0]}-{node_cores[3]}" + self.benchmark_cores = f"{node_cores[4]}-{node_cores[-1]}" + else: + # Not enough cores, share them + self.infra_cores = f"{node_cores[0]}-{node_cores[-1]}" + self.benchmark_cores = self.infra_cores + print(f"WARNING: Only {len(node_cores)} cores on NUMA node {best_node}, " + f"benchmark shares cores with server") + + print(f"NUMA node {best_node} selected") + print(f"Core allocation: Server={self.infra_cores}, Benchmark={self.benchmark_cores}") + def _find_java_jar(self) -> Path: """Find the benchmark JAR file dynamically.""" target_dir = self.resp_bench_dir / "java/target" @@ -991,7 +1054,7 @@ def _pin_server_processes(self): try: pid = int(pid_file.read_text().strip()) result = subprocess.run( - ["taskset", "-cp", self.INFRA_CORES, str(pid)], + ["taskset", "-cp", self.infra_cores, str(pid)], capture_output=True, text=True) if result.returncode == 0: pinned += 1 @@ -999,7 +1062,7 @@ def _pin_server_processes(self): print(f" Warning: Failed to pin PID {pid}: {result.stderr}") except (ValueError, FileNotFoundError) as e: print(f" Warning: Could not read PID from {pid_file}: {e}") - print(f"Pinned {pinned} server processes to cores {self.INFRA_CORES}") + print(f"Pinned {pinned} server processes to cores {self.infra_cores}") def start_infrastructure(self): if self.skip_infra: @@ -1012,7 +1075,7 @@ def start_infrastructure(self): stop_target = "server-cluster-stop" if is_cluster else "server-standalone-stop" port = self._get_server_port() - print(f"Starting Valkey {mode_name} infrastructure on cores {self.INFRA_CORES}...") + print(f"Starting Valkey {mode_name} infrastructure...") subprocess.run(["make", stop_target], cwd=self.resp_bench_dir, capture_output=True) subprocess.run(["pkill", "-f", "valkey-server"], capture_output=True) @@ -1023,7 +1086,7 @@ def start_infrastructure(self): shutil.rmtree(work_dir) result = subprocess.run( - ["taskset", "-c", self.INFRA_CORES, "make", make_target], + ["make", make_target], cwd=self.resp_bench_dir, timeout=600) if result.returncode != 0: raise RuntimeError( @@ -1060,9 +1123,14 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen: server = f"localhost:{port}" cmd = [ + "numactl", + f"--cpunodebind={self.numa_node}", + f"--membind={self.numa_node}", "taskset", "-c", self.benchmark_cores, "java", "-XX:+EnableDynamicAgentLoading", # Allow async-profiler to attach + "-Xms4g", "-Xmx4g", # Fixed heap size to avoid resize pauses + "-XX:+AlwaysPreTouch", # Pre-fault heap pages at startup "-jar", str(self.java_jar), "--server", server, "--driver", str(self.driver_config_path), @@ -1074,7 +1142,7 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen: if self.resp_bench_commit: cmd.extend(["--commit-id", self.resp_bench_commit]) - print(f"Starting Java benchmark on cores {self.benchmark_cores}") + print(f"Starting Java benchmark on NUMA node {self.numa_node}, cores {self.benchmark_cores}") print(f" Server: {server}") print(f" Driver: {self.driver_config_path}") print(f" Workload: {self.workload_config_path}") From d0daff924991b842abf643f9479bd78d6247b8be Mon Sep 17 00:00:00 2001 From: Lior Sventitzky Date: Sun, 22 Feb 2026 09:50:38 +0000 Subject: [PATCH 2/4] remove jvm tuning Signed-off-by: Lior Sventitzky --- .github/workflows/benchmark_orchestrator.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/benchmark_orchestrator.py b/.github/workflows/benchmark_orchestrator.py index 99a150fa..a416e86a 100755 --- a/.github/workflows/benchmark_orchestrator.py +++ b/.github/workflows/benchmark_orchestrator.py @@ -1129,8 +1129,6 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen: "taskset", "-c", self.benchmark_cores, "java", "-XX:+EnableDynamicAgentLoading", # Allow async-profiler to attach - "-Xms4g", "-Xmx4g", # Fixed heap size to avoid resize pauses - "-XX:+AlwaysPreTouch", # Pre-fault heap pages at startup "-jar", str(self.java_jar), "--server", server, "--driver", str(self.driver_config_path), From e66335a82e05d74726f8c1512503927eb72f24b5 Mon Sep 17 00:00:00 2001 From: Lior Sventitzky Date: Sun, 22 Feb 2026 13:17:50 +0000 Subject: [PATCH 3/4] added numa separation for server and benchmarks Signed-off-by: Lior Sventitzky --- .github/workflows/benchmark_orchestrator.py | 71 +++++++++++++-------- 1 file changed, 44 insertions(+), 27 deletions(-) diff --git a/.github/workflows/benchmark_orchestrator.py b/.github/workflows/benchmark_orchestrator.py index a416e86a..3adf7a58 100755 --- a/.github/workflows/benchmark_orchestrator.py +++ b/.github/workflows/benchmark_orchestrator.py @@ -964,8 +964,8 @@ def _parse_cpulist(self, cpulist: str) -> list: def _setup_numa_aware_cores(self): """ - Detect NUMA topology and allocate cores from a single NUMA node. - Uses the NUMA node with the most cores. + Detect NUMA topology and allocate cores across NUMA nodes. + Server runs on node 0, benchmark runs on node 1 (if available). """ numa_topology = self._get_numa_topology() @@ -973,28 +973,39 @@ def _setup_numa_aware_cores(self): for node_id, cores in numa_topology.items(): print(f" Node {node_id}: {len(cores)} cores ({min(cores)}-{max(cores)})") - # Pick the NUMA node with the most cores - best_node = max(numa_topology.keys(), key=lambda n: len(numa_topology[n])) - node_cores = numa_topology[best_node] + if len(numa_topology) >= 2: + # Two or more NUMA nodes: server on node 0, benchmark on node 1 + node0_cores = numa_topology[0] + node1_cores = numa_topology[1] - self.numa_node = best_node + self.infra_numa_node = 0 + self.benchmark_numa_node = 1 - # Allocate cores: first 4 for server, rest for benchmark - if len(node_cores) >= 8: - self.infra_cores = f"{node_cores[0]}-{node_cores[3]}" - self.benchmark_cores = f"{node_cores[4]}-{node_cores[-1]}" - elif len(node_cores) > 4: - self.infra_cores = f"{node_cores[0]}-{node_cores[3]}" - self.benchmark_cores = f"{node_cores[4]}-{node_cores[-1]}" + # Use all cores on each node + self.infra_cores = f"{node0_cores[0]}-{node0_cores[-1]}" + self.benchmark_cores = f"{node1_cores[0]}-{node1_cores[-1]}" + + print(f"Split NUMA allocation:") + print(f" Server: NUMA node {self.infra_numa_node}, cores {self.infra_cores}") + print(f" Benchmark: NUMA node {self.benchmark_numa_node}, cores {self.benchmark_cores}") else: - # Not enough cores, share them - self.infra_cores = f"{node_cores[0]}-{node_cores[-1]}" - self.benchmark_cores = self.infra_cores - print(f"WARNING: Only {len(node_cores)} cores on NUMA node {best_node}, " - f"benchmark shares cores with server") + # Single NUMA node: 1 core for server, rest for benchmark + node_cores = numa_topology[0] + self.infra_numa_node = 0 + self.benchmark_numa_node = 0 + + if len(node_cores) >= 2: + self.infra_cores = str(node_cores[0]) + self.benchmark_cores = f"{node_cores[1]}-{node_cores[-1]}" + else: + # Only 1 core, share it + self.infra_cores = str(node_cores[0]) + self.benchmark_cores = self.infra_cores + print(f"WARNING: Only 1 core, benchmark shares core with server") - print(f"NUMA node {best_node} selected") - print(f"Core allocation: Server={self.infra_cores}, Benchmark={self.benchmark_cores}") + print(f"Single NUMA node {self.infra_numa_node}:") + print(f" Server cores: {self.infra_cores}") + print(f" Benchmark cores: {self.benchmark_cores}") def _find_java_jar(self) -> Path: """Find the benchmark JAR file dynamically.""" @@ -1047,12 +1058,13 @@ def _get_server_port(self) -> int: return 7379 if self._is_cluster_mode() else 6379 def _pin_server_processes(self): - """Pin all running valkey-server processes to designated cores.""" + """Pin all running valkey-server processes to designated cores and NUMA node.""" work_dir = self.resp_bench_dir / "work" pinned = 0 for pid_file in work_dir.glob("*.pid"): try: pid = int(pid_file.read_text().strip()) + # Pin CPU to infra cores result = subprocess.run( ["taskset", "-cp", self.infra_cores, str(pid)], capture_output=True, text=True) @@ -1060,9 +1072,13 @@ def _pin_server_processes(self): pinned += 1 else: print(f" Warning: Failed to pin PID {pid}: {result.stderr}") + # Migrate memory to infra NUMA node + subprocess.run( + ["migratepages", str(pid), "all", str(self.infra_numa_node)], + capture_output=True, text=True) except (ValueError, FileNotFoundError) as e: print(f" Warning: Could not read PID from {pid_file}: {e}") - print(f"Pinned {pinned} server processes to cores {self.infra_cores}") + print(f"Pinned {pinned} server processes to NUMA node {self.infra_numa_node}, cores {self.infra_cores}") def start_infrastructure(self): if self.skip_infra: @@ -1075,7 +1091,7 @@ def start_infrastructure(self): stop_target = "server-cluster-stop" if is_cluster else "server-standalone-stop" port = self._get_server_port() - print(f"Starting Valkey {mode_name} infrastructure...") + print(f"Starting Valkey {mode_name} infrastructure on NUMA node {self.infra_numa_node}...") subprocess.run(["make", stop_target], cwd=self.resp_bench_dir, capture_output=True) subprocess.run(["pkill", "-f", "valkey-server"], capture_output=True) @@ -1086,7 +1102,8 @@ def start_infrastructure(self): shutil.rmtree(work_dir) result = subprocess.run( - ["make", make_target], + ["numactl", f"--cpunodebind={self.infra_numa_node}", + f"--membind={self.infra_numa_node}", "make", make_target], cwd=self.resp_bench_dir, timeout=600) if result.returncode != 0: raise RuntimeError( @@ -1124,8 +1141,8 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen: cmd = [ "numactl", - f"--cpunodebind={self.numa_node}", - f"--membind={self.numa_node}", + f"--cpunodebind={self.benchmark_numa_node}", + f"--membind={self.benchmark_numa_node}", "taskset", "-c", self.benchmark_cores, "java", "-XX:+EnableDynamicAgentLoading", # Allow async-profiler to attach @@ -1140,7 +1157,7 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen: if self.resp_bench_commit: cmd.extend(["--commit-id", self.resp_bench_commit]) - print(f"Starting Java benchmark on NUMA node {self.numa_node}, cores {self.benchmark_cores}") + print(f"Starting Java benchmark on NUMA node {self.benchmark_numa_node}, cores {self.benchmark_cores}") print(f" Server: {server}") print(f" Driver: {self.driver_config_path}") print(f" Workload: {self.workload_config_path}") From 1e2cf8441635df3434a9714a899d6d6f4dc56879 Mon Sep 17 00:00:00 2001 From: Lior Sventitzky Date: Sun, 22 Feb 2026 15:21:08 +0000 Subject: [PATCH 4/4] changes single numa to use all cores Signed-off-by: Lior Sventitzky --- .github/workflows/benchmark_orchestrator.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/.github/workflows/benchmark_orchestrator.py b/.github/workflows/benchmark_orchestrator.py index 3adf7a58..7d75226c 100755 --- a/.github/workflows/benchmark_orchestrator.py +++ b/.github/workflows/benchmark_orchestrator.py @@ -989,23 +989,17 @@ def _setup_numa_aware_cores(self): print(f" Server: NUMA node {self.infra_numa_node}, cores {self.infra_cores}") print(f" Benchmark: NUMA node {self.benchmark_numa_node}, cores {self.benchmark_cores}") else: - # Single NUMA node: 1 core for server, rest for benchmark + # Single NUMA node: all cores shared by server and benchmark node_cores = numa_topology[0] self.infra_numa_node = 0 self.benchmark_numa_node = 0 - if len(node_cores) >= 2: - self.infra_cores = str(node_cores[0]) - self.benchmark_cores = f"{node_cores[1]}-{node_cores[-1]}" - else: - # Only 1 core, share it - self.infra_cores = str(node_cores[0]) - self.benchmark_cores = self.infra_cores - print(f"WARNING: Only 1 core, benchmark shares core with server") - - print(f"Single NUMA node {self.infra_numa_node}:") - print(f" Server cores: {self.infra_cores}") - print(f" Benchmark cores: {self.benchmark_cores}") + all_cores = f"{node_cores[0]}-{node_cores[-1]}" + self.infra_cores = all_cores + self.benchmark_cores = all_cores + + print(f"Single NUMA node {self.infra_numa_node}: all cores shared") + print(f" Cores: {all_cores}") def _find_java_jar(self) -> Path: """Find the benchmark JAR file dynamically."""