Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ jobs:

- name: Install dependencies
run: |
sudo apt-get install -y sysstat python3-pip
sudo apt-get install -y sysstat python3-pip numactl
sudo python3 -m pip install psycopg2-binary boto3 hdrhistogram

# async-profiler for flame graphs
Expand Down
111 changes: 94 additions & 17 deletions .github/workflows/benchmark_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,6 @@ def parse_perf_stat(filepath: Path, hardware_available: bool) -> dict:
class BenchmarkOrchestrator:
"""Main benchmark orchestration class"""

INFRA_CORES = "0-3"
AWS_REGION = "us-east-1"

def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Path,
Expand Down Expand Up @@ -919,20 +918,89 @@ def __init__(self, resp_bench_dir: Path, resp_bench_commit: str, output_file: Pa
self.variance_control = VarianceControl()
self.variance_control.setup(network_delay_ms=network_delay_ms)

# Calculate core allocation after SMT is disabled
cpu_count = os.cpu_count() or 8
if cpu_count > 4:
self.benchmark_cores = f"4-{cpu_count - 1}"
else:
self.benchmark_cores = "0-3"
print(f"WARNING: Only {cpu_count} cores, benchmark shares "
f"cores with server")
print(f"Core allocation: Server={self.INFRA_CORES}, "
f"Benchmark={self.benchmark_cores}")
# Detect NUMA topology and allocate cores accordingly
self._setup_numa_aware_cores()

# Java JAR path - find the shaded JAR dynamically
self.java_jar = self._find_java_jar()

def _get_numa_topology(self) -> dict:
"""
Detect NUMA topology from /sys filesystem.
Returns dict: {node_id: [list of cpu cores]}
"""
numa_nodes = {}
numa_base = Path("/sys/devices/system/node")

if not numa_base.exists():
print(" ⚠ NUMA topology not available, assuming single node")
cpu_count = os.cpu_count() or 8
return {0: list(range(cpu_count))}

for node_dir in sorted(numa_base.glob("node[0-9]*")):
node_id = int(node_dir.name.replace("node", ""))
cpulist_file = node_dir / "cpulist"
if cpulist_file.exists():
cpulist_str = cpulist_file.read_text().strip()
cores = self._parse_cpulist(cpulist_str)
numa_nodes[node_id] = cores

if not numa_nodes:
cpu_count = os.cpu_count() or 8
return {0: list(range(cpu_count))}

return numa_nodes

def _parse_cpulist(self, cpulist: str) -> list:
"""Parse CPU list format like '0-3,8-11' into [0,1,2,3,8,9,10,11]"""
cores = []
for part in cpulist.split(","):
if "-" in part:
start, end = part.split("-")
cores.extend(range(int(start), int(end) + 1))
else:
cores.append(int(part))
return sorted(cores)

def _setup_numa_aware_cores(self):
"""
Detect NUMA topology and allocate cores across NUMA nodes.
Server runs on node 0, benchmark runs on node 1 (if available).
"""
numa_topology = self._get_numa_topology()

print(f"NUMA topology detected: {len(numa_topology)} node(s)")
for node_id, cores in numa_topology.items():
print(f" Node {node_id}: {len(cores)} cores ({min(cores)}-{max(cores)})")

if len(numa_topology) >= 2:
# Two or more NUMA nodes: server on node 0, benchmark on node 1
node0_cores = numa_topology[0]
node1_cores = numa_topology[1]

self.infra_numa_node = 0
self.benchmark_numa_node = 1

# Use all cores on each node
self.infra_cores = f"{node0_cores[0]}-{node0_cores[-1]}"
self.benchmark_cores = f"{node1_cores[0]}-{node1_cores[-1]}"

print(f"Split NUMA allocation:")
print(f" Server: NUMA node {self.infra_numa_node}, cores {self.infra_cores}")
print(f" Benchmark: NUMA node {self.benchmark_numa_node}, cores {self.benchmark_cores}")
else:
# Single NUMA node: all cores shared by server and benchmark
node_cores = numa_topology[0]
self.infra_numa_node = 0
self.benchmark_numa_node = 0

all_cores = f"{node_cores[0]}-{node_cores[-1]}"
self.infra_cores = all_cores
self.benchmark_cores = all_cores

print(f"Single NUMA node {self.infra_numa_node}: all cores shared")
print(f" Cores: {all_cores}")

def _find_java_jar(self) -> Path:
"""Find the benchmark JAR file dynamically."""
target_dir = self.resp_bench_dir / "java/target"
Expand Down Expand Up @@ -984,22 +1052,27 @@ def _get_server_port(self) -> int:
return 7379 if self._is_cluster_mode() else 6379

def _pin_server_processes(self):
"""Pin all running valkey-server processes to designated cores."""
"""Pin all running valkey-server processes to designated cores and NUMA node."""
work_dir = self.resp_bench_dir / "work"
pinned = 0
for pid_file in work_dir.glob("*.pid"):
try:
pid = int(pid_file.read_text().strip())
# Pin CPU to infra cores
result = subprocess.run(
["taskset", "-cp", self.INFRA_CORES, str(pid)],
["taskset", "-cp", self.infra_cores, str(pid)],
capture_output=True, text=True)
if result.returncode == 0:
pinned += 1
else:
print(f" Warning: Failed to pin PID {pid}: {result.stderr}")
# Migrate memory to infra NUMA node
subprocess.run(
["migratepages", str(pid), "all", str(self.infra_numa_node)],
capture_output=True, text=True)
except (ValueError, FileNotFoundError) as e:
print(f" Warning: Could not read PID from {pid_file}: {e}")
print(f"Pinned {pinned} server processes to cores {self.INFRA_CORES}")
print(f"Pinned {pinned} server processes to NUMA node {self.infra_numa_node}, cores {self.infra_cores}")

def start_infrastructure(self):
if self.skip_infra:
Expand All @@ -1012,7 +1085,7 @@ def start_infrastructure(self):
stop_target = "server-cluster-stop" if is_cluster else "server-standalone-stop"
port = self._get_server_port()

print(f"Starting Valkey {mode_name} infrastructure on cores {self.INFRA_CORES}...")
print(f"Starting Valkey {mode_name} infrastructure on NUMA node {self.infra_numa_node}...")
subprocess.run(["make", stop_target], cwd=self.resp_bench_dir,
capture_output=True)
subprocess.run(["pkill", "-f", "valkey-server"], capture_output=True)
Expand All @@ -1023,7 +1096,8 @@ def start_infrastructure(self):
shutil.rmtree(work_dir)

result = subprocess.run(
["taskset", "-c", self.INFRA_CORES, "make", make_target],
["numactl", f"--cpunodebind={self.infra_numa_node}",
f"--membind={self.infra_numa_node}", "make", make_target],
cwd=self.resp_bench_dir, timeout=600)
if result.returncode != 0:
raise RuntimeError(
Expand Down Expand Up @@ -1060,6 +1134,9 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen:
server = f"localhost:{port}"

cmd = [
"numactl",
f"--cpunodebind={self.benchmark_numa_node}",
f"--membind={self.benchmark_numa_node}",
"taskset", "-c", self.benchmark_cores,
"java",
"-XX:+EnableDynamicAgentLoading", # Allow async-profiler to attach
Expand All @@ -1074,7 +1151,7 @@ def run_benchmark(self, output_metrics: Path) -> subprocess.Popen:
if self.resp_bench_commit:
cmd.extend(["--commit-id", self.resp_bench_commit])

print(f"Starting Java benchmark on cores {self.benchmark_cores}")
print(f"Starting Java benchmark on NUMA node {self.benchmark_numa_node}, cores {self.benchmark_cores}")
print(f" Server: {server}")
print(f" Driver: {self.driver_config_path}")
print(f" Workload: {self.workload_config_path}")
Expand Down