From 7fdbbe37b9804d40d3017b1c6ba32e30ffcc6a45 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Sat, 2 Aug 2025 09:14:04 +0000 Subject: [PATCH 01/19] Adding files Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 53 +++++++++++++++++++++++++++++++++++ tests/test_external_server.py | 32 +++++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 tests/test_external_server.py diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 2915f43..b78f313 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -61,6 +61,41 @@ class ValkeyAction(Enum): AOF_REWRITE = 1 +class ExternalValkeyServerHandle(object): + """Handle to an external valkey server""" + + def __init__(self, bind_ip, port): + self.bind_ip = bind_ip + self.port = port + self.client = None + + def connect(self): + print(f"ATTEMPTING CONNECTION TO: {self.bind_ip}:{self.port}") + self.client = StrictValkey(host=self.bind_ip, port=self.port) + try: + self.client.ping() + print(f"PING SUCCESSFUL TO EXTERNAL SERVER: {self.bind_ip}:{self.port}") + except Exception as e: + print(f"CONNECTION FAILED TO: {self.bind_ip}:{self.port} - {e}") + raise RuntimeError( + f"Failed to connect to external server at {self.bind_ip}:{self.port}: {e}" + ) + return self.client + + def get_new_client(self): + return StrictValkey(host=self.bind_ip, port=self.port) + + def exit(self, cleanup=True, remove_nodes_conf=True): + """No-op for external servers""" + pass + + def is_alive(self): + try: + return self.client.ping() + except: + return False + + class ValkeyServerHandle(object): """Handle to a valkey server process""" @@ -495,12 +530,29 @@ def create_server( args="", skip_teardown=False, conf_file=None, + external_server=False, ): + + if external_server: + if not bind_ip: + bind_ip = "localhost" + if not port: + raise ValueError("Port must be specified for external server") + + print(f"CONNECTING TO EXTERNAL SERVER: {bind_ip}:{port}") + external_handle = ExternalValkeyServerHandle(bind_ip, port) + valkey_cli = external_handle.connect() + print(f"CONNECTED TO EXTERNAL SERVER: {bind_ip}:{port}") + return external_handle, valkey_cli + + # Original server creation logic if not bind_ip: bind_ip = self.get_bind_ip() if not port: port = self.get_bind_port() + + print(f"CREATING LOCAL SERVER: {bind_ip}:{port}") valkey_server_handle = self.get_valkey_handle() self.server_path = server_path valkey_server = valkey_server_handle( @@ -515,6 +567,7 @@ def create_server( valkey_server.conf_file = conf_file valkey_server.args.update(args) valkey_cli = valkey_server.start() + print(f"LOCAL SERVER STARTED: {bind_ip}:{port}") return valkey_server, valkey_cli def wait_for_all_replicas_online(self, n): diff --git a/tests/test_external_server.py b/tests/test_external_server.py new file mode 100644 index 0000000..0038bc4 --- /dev/null +++ b/tests/test_external_server.py @@ -0,0 +1,32 @@ +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from valkey_test_case import ValkeyTestCase + + +class TestExternalServer(ValkeyTestCase): + def test_connect_to_external_server(self): + """Example: Connect to external Valkey server running on localhost:6379""" + print("\n=== TESTING EXTERNAL SERVER CONNECTION ===") + try: + # Connect to external server instead of creating one + server, client = self.create_server( + testdir=self.testdir, + bind_ip="localhost", + port=6379, + external_server=True, + ) + + print("\n RUNNING TEST COMMANDS ON EXTERNAL SERVER") + # Test basic operations + client.set("test_key", "test_value") + assert client.get("test_key") == b"test_value" + print("External server test completed successfully!") + + except RuntimeError as e: + # Skip test if external server not available + import pytest + + pytest.skip(f"External server not available: {e}") From 20d644e3651f93163f5459c32d973a828d83c858 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Sat, 2 Aug 2025 19:09:07 +0000 Subject: [PATCH 02/19] Adding conftest Signed-off-by: Nikhil Manglore --- tests/conftest.py | 85 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 tests/conftest.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..07512ca --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,85 @@ +""" +Minimal conftest.py for external server tests +""" + +import pytest +import fcntl +import socket +import os +import tempfile +from pathlib import Path + + +class PortTracker(object): + CLUSTER_BUS_PORT_OFFSET = 10000 + MIN_PORT = 5000 + MAX_PORT = 32768 + MAX_BASE_PORT = MAX_PORT - CLUSTER_BUS_PORT_OFFSET - MIN_PORT + MAX_RETRIES = 100 + LOCKS_DIR = os.path.join(tempfile.gettempdir(), "port_tracker") + + def __init__(self, node_id): + self._hash = hash(str(node_id)) + if not os.path.exists(Path(PortTracker.LOCKS_DIR)): + Path(PortTracker.LOCKS_DIR).mkdir(parents=True, exist_ok=True) + + def __enter__(self): + self.open_and_locked_files = {} + return self + + def __exit__(self, type, value, tb): + for lockfile in self.open_and_locked_files.values(): + self._try_remove(lockfile) + + def _try_remove(self, lockfile): + lockfile.close() + try: + os.remove(lockfile.name) + except: + pass + + def _next_port(self): + self._hash = hash(str(self._hash)) + return (self._hash % PortTracker.MAX_BASE_PORT) + PortTracker.MIN_PORT + + def _try_lock_port(self, port): + lockfilename = os.path.join(self.LOCKS_DIR, "port%d.lock" % port) + lockfile = open(lockfilename, "w") + try: + fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) + except OSError: + self._try_remove(lockfile) + return False + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + sock.bind(("0.0.0.0", port)) + except OSError: + lockfile.close() + return False + self.open_and_locked_files[port] = lockfile + return True + + def _unlock_port(self, port): + lockfile = self.open_and_locked_files.get(port) + if lockfile: + lockfile.close() + del self.open_and_locked_files[port] + + def get_unused_port(self): + for r in range(PortTracker.MAX_RETRIES): + port = self._next_port() + if not self._try_lock_port(port): + continue + if not self._try_lock_port(port + PortTracker.CLUSTER_BUS_PORT_OFFSET): + self._unlock_port(port) + continue + return port + assert False, "Failed to find port after %d tries" % PortTracker.MAX_RETRIES + + +@pytest.fixture(scope="function", autouse=True) +def resource_port_tracker(request): + """Create port tracker for each pytest worker""" + with PortTracker(request.node.nodeid) as p: + yield p From d430a71cc10f53ea6fdb47eb94dc28b3ec8153d9 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Sat, 2 Aug 2025 20:40:51 +0000 Subject: [PATCH 03/19] Final commits Signed-off-by: Nikhil Manglore --- tests/conftest.py | 85 ----------------------------------- tests/test_external_server.py | 14 ++---- 2 files changed, 3 insertions(+), 96 deletions(-) delete mode 100644 tests/conftest.py diff --git a/tests/conftest.py b/tests/conftest.py deleted file mode 100644 index 07512ca..0000000 --- a/tests/conftest.py +++ /dev/null @@ -1,85 +0,0 @@ -""" -Minimal conftest.py for external server tests -""" - -import pytest -import fcntl -import socket -import os -import tempfile -from pathlib import Path - - -class PortTracker(object): - CLUSTER_BUS_PORT_OFFSET = 10000 - MIN_PORT = 5000 - MAX_PORT = 32768 - MAX_BASE_PORT = MAX_PORT - CLUSTER_BUS_PORT_OFFSET - MIN_PORT - MAX_RETRIES = 100 - LOCKS_DIR = os.path.join(tempfile.gettempdir(), "port_tracker") - - def __init__(self, node_id): - self._hash = hash(str(node_id)) - if not os.path.exists(Path(PortTracker.LOCKS_DIR)): - Path(PortTracker.LOCKS_DIR).mkdir(parents=True, exist_ok=True) - - def __enter__(self): - self.open_and_locked_files = {} - return self - - def __exit__(self, type, value, tb): - for lockfile in self.open_and_locked_files.values(): - self._try_remove(lockfile) - - def _try_remove(self, lockfile): - lockfile.close() - try: - os.remove(lockfile.name) - except: - pass - - def _next_port(self): - self._hash = hash(str(self._hash)) - return (self._hash % PortTracker.MAX_BASE_PORT) + PortTracker.MIN_PORT - - def _try_lock_port(self, port): - lockfilename = os.path.join(self.LOCKS_DIR, "port%d.lock" % port) - lockfile = open(lockfilename, "w") - try: - fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) - except OSError: - self._try_remove(lockfile) - return False - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - sock.bind(("0.0.0.0", port)) - except OSError: - lockfile.close() - return False - self.open_and_locked_files[port] = lockfile - return True - - def _unlock_port(self, port): - lockfile = self.open_and_locked_files.get(port) - if lockfile: - lockfile.close() - del self.open_and_locked_files[port] - - def get_unused_port(self): - for r in range(PortTracker.MAX_RETRIES): - port = self._next_port() - if not self._try_lock_port(port): - continue - if not self._try_lock_port(port + PortTracker.CLUSTER_BUS_PORT_OFFSET): - self._unlock_port(port) - continue - return port - assert False, "Failed to find port after %d tries" % PortTracker.MAX_RETRIES - - -@pytest.fixture(scope="function", autouse=True) -def resource_port_tracker(request): - """Create port tracker for each pytest worker""" - with PortTracker(request.node.nodeid) as p: - yield p diff --git a/tests/test_external_server.py b/tests/test_external_server.py index 0038bc4..f5ef0d1 100644 --- a/tests/test_external_server.py +++ b/tests/test_external_server.py @@ -1,15 +1,10 @@ -import sys -import os - -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) - +from conftest import resource_port_tracker from valkey_test_case import ValkeyTestCase class TestExternalServer(ValkeyTestCase): def test_connect_to_external_server(self): """Example: Connect to external Valkey server running on localhost:6379""" - print("\n=== TESTING EXTERNAL SERVER CONNECTION ===") try: # Connect to external server instead of creating one server, client = self.create_server( @@ -19,14 +14,11 @@ def test_connect_to_external_server(self): external_server=True, ) - print("\n RUNNING TEST COMMANDS ON EXTERNAL SERVER") # Test basic operations - client.set("test_key", "test_value") - assert client.get("test_key") == b"test_value" - print("External server test completed successfully!") + client.set("hello", "world") + assert client.get("hello") == b"world" except RuntimeError as e: # Skip test if external server not available import pytest - pytest.skip(f"External server not available: {e}") From 98d5d166be34b37d7f6d18794392c773b14cc9dc Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Sun, 3 Aug 2025 03:17:09 +0000 Subject: [PATCH 04/19] Adding framework Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 27 +++++++++++++++++++++++---- tests/test_external_server.py | 5 +++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index b78f313..78d5796 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -68,6 +68,7 @@ def __init__(self, bind_ip, port): self.bind_ip = bind_ip self.port = port self.client = None + self.clients = [] # Track all clients for cleanup def connect(self): print(f"ATTEMPTING CONNECTION TO: {self.bind_ip}:{self.port}") @@ -83,11 +84,27 @@ def connect(self): return self.client def get_new_client(self): - return StrictValkey(host=self.bind_ip, port=self.port) + client = StrictValkey(host=self.bind_ip, port=self.port) + self.clients.append(client) # Track for cleanup + return client def exit(self, cleanup=True, remove_nodes_conf=True): - """No-op for external servers""" - pass + """Close all connections to external server""" + # Close main client + if self.client: + try: + self.client.close() + except: + pass + self.client = None + + # Close all tracked clients + for client in self.clients: + try: + client.close() + except: + pass + self.clients.clear() def is_alive(self): try: @@ -542,6 +559,8 @@ def create_server( print(f"CONNECTING TO EXTERNAL SERVER: {bind_ip}:{port}") external_handle = ExternalValkeyServerHandle(bind_ip, port) valkey_cli = external_handle.connect() + if not skip_teardown: + self.server_list.append(external_handle) # Ensure cleanup print(f"CONNECTED TO EXTERNAL SERVER: {bind_ip}:{port}") return external_handle, valkey_cli @@ -708,7 +727,7 @@ def wait_for_value_propagate_to_replicas(self, key, value, db=0): wait_for_equal( lambda: self.replicas[i].clients[db].get(key), value, - timout=TEST_MAX_WAIT_TIME_SECONDS, + timeout=TEST_MAX_WAIT_TIME_SECONDS, ) def waitForReplicaOffsetToSyncUp(self, primary, replica): diff --git a/tests/test_external_server.py b/tests/test_external_server.py index f5ef0d1..87f9517 100644 --- a/tests/test_external_server.py +++ b/tests/test_external_server.py @@ -3,6 +3,10 @@ class TestExternalServer(ValkeyTestCase): + """ + Test suite for connecting to external Valkey servers + """ + def test_connect_to_external_server(self): """Example: Connect to external Valkey server running on localhost:6379""" try: @@ -21,4 +25,5 @@ def test_connect_to_external_server(self): except RuntimeError as e: # Skip test if external server not available import pytest + pytest.skip(f"External server not available: {e}") From 25252795e2ad11ecd1ba496c8c2c83f14bb80053 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Sun, 3 Aug 2025 23:08:30 +0000 Subject: [PATCH 05/19] Finaly additions Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 3 +- tests/test_external_server.py | 59 +++++++++++++++++++++++++++++------ 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 78d5796..e1a7ce9 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -560,11 +560,10 @@ def create_server( external_handle = ExternalValkeyServerHandle(bind_ip, port) valkey_cli = external_handle.connect() if not skip_teardown: - self.server_list.append(external_handle) # Ensure cleanup + self.server_list.append(external_handle) print(f"CONNECTED TO EXTERNAL SERVER: {bind_ip}:{port}") return external_handle, valkey_cli - # Original server creation logic if not bind_ip: bind_ip = self.get_bind_ip() diff --git a/tests/test_external_server.py b/tests/test_external_server.py index 87f9517..8f60007 100644 --- a/tests/test_external_server.py +++ b/tests/test_external_server.py @@ -1,29 +1,70 @@ +import subprocess +import time from conftest import resource_port_tracker from valkey_test_case import ValkeyTestCase class TestExternalServer(ValkeyTestCase): """ - Test suite for connecting to external Valkey servers + Test for connecting to a external Valkey server """ + def setup_docker_server(self): + container_name = "valkey-test-external" + image_name = "valkey/valkey-bundle:latest" + + pull_result = subprocess.run( + ["docker", "pull", image_name], capture_output=True, text=True + ) + if pull_result.returncode != 0: + raise RuntimeError(f"Failed to pull Docker image: {pull_result.stderr}") + + subprocess.run(["docker", "stop", container_name], capture_output=True) + subprocess.run(["docker", "rm", container_name], capture_output=True) + + cmd = [ + "docker", + "run", + "-d", + "-p", + "6380:6379", + "--name", + container_name, + image_name, + "valkey-server", + "--maxmemory", + "0", + "--protected-mode", + "no", + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise RuntimeError(f"Failed to start Docker container: {result.stderr}") + + time.sleep(2) + return container_name + def test_connect_to_external_server(self): - """Example: Connect to external Valkey server running on localhost:6379""" + container_name = None try: - # Connect to external server instead of creating one + container_name = self.setup_docker_server() + server, client = self.create_server( testdir=self.testdir, bind_ip="localhost", - port=6379, + port=6380, external_server=True, ) - # Test basic operations client.set("hello", "world") assert client.get("hello") == b"world" - except RuntimeError as e: - # Skip test if external server not available - import pytest + except Exception as e: + print(f"External server test failed: {e}") + raise - pytest.skip(f"External server not available: {e}") + finally: + if container_name: + subprocess.run(["docker", "stop", container_name], capture_output=True) + subprocess.run(["docker", "rm", container_name], capture_output=True) From 4f16f00a874d020d8af34c20c1beddadfc2a3a8a Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Mon, 4 Aug 2025 23:37:36 +0000 Subject: [PATCH 06/19] Added methods to allow bloom handling Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 88 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 84 insertions(+), 4 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index e1a7ce9..3dc9cf8 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -85,12 +85,10 @@ def connect(self): def get_new_client(self): client = StrictValkey(host=self.bind_ip, port=self.port) - self.clients.append(client) # Track for cleanup + self.clients.append(client) return client def exit(self, cleanup=True, remove_nodes_conf=True): - """Close all connections to external server""" - # Close main client if self.client: try: self.client.close() @@ -98,7 +96,6 @@ def exit(self, cleanup=True, remove_nodes_conf=True): pass self.client = None - # Close all tracked clients for client in self.clients: try: client.close() @@ -111,6 +108,89 @@ def is_alive(self): return self.client.ping() except: return False + + def wait_for_save_done(self, client=None): + """Wait for the save to complete on external server""" + if client is None: + client = self.client + + try: + wait_for_ne( + lambda: client.info()["rdb_bgsave_in_progress"], + 1, + timeout=TEST_MAX_WAIT_TIME_SECONDS, + ) + except: + raise RuntimeError("Save failed to complete in time") + assert client.info()["rdb_last_bgsave_status"] == "ok" + + def restart(self, remove_rdb=False, remove_nodes_conf=False, connect_client=True): + try: + result = subprocess.run( + ["docker", "ps", "--format", "{{.Names}}", "--filter", f"publish={self.port}"], + capture_output=True, text=True, check=True + ) + container_names = result.stdout.strip().split('\n') + if container_names and container_names[0]: + container_name = container_names[0] + subprocess.run(["docker", "restart", container_name], check=True) + time.sleep(3) + except: + pass + + if connect_client: + self.connect() + + def num_keys(self, db=0, client=None): + if client is None: + client = self.client + if f"db{db}" in client.info("all").keys(): + return client.info("all")[f"db{db}"]["keys"] + return 0 + + def is_rdb_done_loading(self): + """Check if RDB loading is done by examining container logs""" + try: + # Find container using this port + result = subprocess.run( + ["docker", "ps", "--format", "{{.Names}}", "--filter", f"publish={self.port}"], + capture_output=True, text=True, check=True + ) + container_names = result.stdout.strip().split('\n') + if container_names and container_names[0]: + container_name = container_names[0] + # Check container logs for RDB loading completion + logs = subprocess.run( + ["docker", "logs", container_name], + capture_output=True, text=True, check=True + ) + return "Done loading RDB" in logs.stdout or "Done loading RDB" in logs.stderr + except: + pass + return True # Fallback to assume loaded + + def _action_success_flag(self, action, client): + if action == ValkeyAction.AOF_REWRITE: + return client.info()["aof_last_bgrewrite_status"] == "ok" + else: + raise RuntimeError(f"{action} not supported") + + def wait_for_action_done(self, action, client=None): + """Wait for action to complete on external server""" + if client is None: + client = self.client + try: + if action == ValkeyAction.AOF_REWRITE: + wait_for_equal( + lambda: client.info()["aof_rewrite_in_progress"], + 1, + timeout=TEST_MAX_WAIT_TIME_SECONDS, + ) + else: + raise RuntimeError(f"{action} not supported") + except WaitTimeout: + raise RuntimeError(f"{action} failed to complete in time") + assert self._action_success_flag(action, client) class ValkeyServerHandle(object): From 515942f29616b83f76b0c7b10d223b73c1784310 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Tue, 5 Aug 2025 15:53:34 +0000 Subject: [PATCH 07/19] Fixed formatting Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 47 ++++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 3dc9cf8..ae67e3d 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -85,7 +85,7 @@ def connect(self): def get_new_client(self): client = StrictValkey(host=self.bind_ip, port=self.port) - self.clients.append(client) + self.clients.append(client) return client def exit(self, cleanup=True, remove_nodes_conf=True): @@ -108,7 +108,7 @@ def is_alive(self): return self.client.ping() except: return False - + def wait_for_save_done(self, client=None): """Wait for the save to complete on external server""" if client is None: @@ -127,17 +127,26 @@ def wait_for_save_done(self, client=None): def restart(self, remove_rdb=False, remove_nodes_conf=False, connect_client=True): try: result = subprocess.run( - ["docker", "ps", "--format", "{{.Names}}", "--filter", f"publish={self.port}"], - capture_output=True, text=True, check=True + [ + "docker", + "ps", + "--format", + "{{.Names}}", + "--filter", + f"publish={self.port}", + ], + capture_output=True, + text=True, + check=True, ) - container_names = result.stdout.strip().split('\n') + container_names = result.stdout.strip().split("\n") if container_names and container_names[0]: container_name = container_names[0] subprocess.run(["docker", "restart", container_name], check=True) - time.sleep(3) + time.sleep(3) except: pass - + if connect_client: self.connect() @@ -153,18 +162,32 @@ def is_rdb_done_loading(self): try: # Find container using this port result = subprocess.run( - ["docker", "ps", "--format", "{{.Names}}", "--filter", f"publish={self.port}"], - capture_output=True, text=True, check=True + [ + "docker", + "ps", + "--format", + "{{.Names}}", + "--filter", + f"publish={self.port}", + ], + capture_output=True, + text=True, + check=True, ) - container_names = result.stdout.strip().split('\n') + container_names = result.stdout.strip().split("\n") if container_names and container_names[0]: container_name = container_names[0] # Check container logs for RDB loading completion logs = subprocess.run( ["docker", "logs", container_name], - capture_output=True, text=True, check=True + capture_output=True, + text=True, + check=True, + ) + return ( + "Done loading RDB" in logs.stdout + or "Done loading RDB" in logs.stderr ) - return "Done loading RDB" in logs.stdout or "Done loading RDB" in logs.stderr except: pass return True # Fallback to assume loaded From 6aede76221210bd851eea1644c310779a67f0b16 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Tue, 5 Aug 2025 18:33:50 +0000 Subject: [PATCH 08/19] Added new base class Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 159 ++++++++++++++-------------------------- 1 file changed, 56 insertions(+), 103 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index ae67e3d..115a334 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -61,7 +61,61 @@ class ValkeyAction(Enum): AOF_REWRITE = 1 -class ExternalValkeyServerHandle(object): +class BaseValkeyHandle(object): + """Base class with common methods for both local and external servers""" + + def num_keys(self, db=0, client=None): + if client is None: + client = self.client + if f"db{db}" in client.info("all").keys(): + return client.info("all")[f"db{db}"]["keys"] + return 0 + + def _action_success_flag(self, action, client): + if action == ValkeyAction.AOF_REWRITE: + return client.info()["aof_last_bgrewrite_status"] == "ok" + else: + raise RuntimeError(f"{action} not supported") + + def wait_for_action_done(self, action, client=None): + if client is None: + client = self.client + try: + if action == ValkeyAction.AOF_REWRITE: + wait_for_equal( + lambda: client.info()["aof_rewrite_in_progress"], + 1, + timeout=TEST_MAX_WAIT_TIME_SECONDS, + ) + else: + raise RuntimeError("{} not support".format(action)) + except WaitTimeout: + raise RuntimeError("{} failed to complete in time".format(action)) + assert self._action_success_flag(action, client) + + def wait_for_save_done(self, client=None): + """Wait for the save to complete, failing if it does not complete successfully in the timeout""" + if client is None: + client = self.client + try: + wait_for_ne( + lambda: client.info()["rdb_bgsave_in_progress"], + 1, + timeout=TEST_MAX_WAIT_TIME_SECONDS, + ) + except WaitTimeout: + raise RuntimeError("Save failed to complete in time") + assert client.info()["rdb_last_bgsave_status"] == "ok" + + def is_alive(self): + try: + self.client.ping() + return True + except: + return False + + +class ExternalValkeyServerHandle(BaseValkeyHandle): """Handle to an external valkey server""" def __init__(self, bind_ip, port): @@ -103,27 +157,6 @@ def exit(self, cleanup=True, remove_nodes_conf=True): pass self.clients.clear() - def is_alive(self): - try: - return self.client.ping() - except: - return False - - def wait_for_save_done(self, client=None): - """Wait for the save to complete on external server""" - if client is None: - client = self.client - - try: - wait_for_ne( - lambda: client.info()["rdb_bgsave_in_progress"], - 1, - timeout=TEST_MAX_WAIT_TIME_SECONDS, - ) - except: - raise RuntimeError("Save failed to complete in time") - assert client.info()["rdb_last_bgsave_status"] == "ok" - def restart(self, remove_rdb=False, remove_nodes_conf=False, connect_client=True): try: result = subprocess.run( @@ -150,13 +183,6 @@ def restart(self, remove_rdb=False, remove_nodes_conf=False, connect_client=True if connect_client: self.connect() - def num_keys(self, db=0, client=None): - if client is None: - client = self.client - if f"db{db}" in client.info("all").keys(): - return client.info("all")[f"db{db}"]["keys"] - return 0 - def is_rdb_done_loading(self): """Check if RDB loading is done by examining container logs""" try: @@ -192,31 +218,8 @@ def is_rdb_done_loading(self): pass return True # Fallback to assume loaded - def _action_success_flag(self, action, client): - if action == ValkeyAction.AOF_REWRITE: - return client.info()["aof_last_bgrewrite_status"] == "ok" - else: - raise RuntimeError(f"{action} not supported") - - def wait_for_action_done(self, action, client=None): - """Wait for action to complete on external server""" - if client is None: - client = self.client - try: - if action == ValkeyAction.AOF_REWRITE: - wait_for_equal( - lambda: client.info()["aof_rewrite_in_progress"], - 1, - timeout=TEST_MAX_WAIT_TIME_SECONDS, - ) - else: - raise RuntimeError(f"{action} not supported") - except WaitTimeout: - raise RuntimeError(f"{action} failed to complete in time") - assert self._action_success_flag(action, client) - -class ValkeyServerHandle(object): +class ValkeyServerHandle(BaseValkeyHandle): """Handle to a valkey server process""" DEFAULT_BIND_IP = "0.0.0.0" @@ -425,13 +428,6 @@ def restart(self, remove_rdb=True, remove_nodes_conf=True, connect_client=True): self.exit(remove_rdb, remove_nodes_conf) self.start(connect_client=connect_client) - def is_alive(self): - try: - self.client.ping() - return True - except: - return False - def _waitForPing(self, c): try: wait_for_true(lambda: c.ping(), timeout=MAX_PING_WAIT_TIME) @@ -455,20 +451,6 @@ def connect(self): raise RuntimeError("Failed to connect or ping server") self.client = c - def wait_for_save_done(self, client=None): - """Wait for the save to complete, failing if it does not complete successfully in the timeout""" - if client is None: - client = self.client - try: - wait_for_ne( - lambda: client.info()["rdb_bgsave_in_progress"], - 1, - timeout=TEST_MAX_WAIT_TIME_SECONDS, - ) - except WaitTimeout: - raise RuntimeError("Save failed to complete in time") - assert client.info()["rdb_last_bgsave_status"] == "ok" - def wait_for_save_in_progress(self, client=None): if client is None: client = self.client @@ -496,13 +478,6 @@ def get_default_client(self, client): return self.client return client - def num_keys(self, db=0, client=None): - if client is None: - client = self.client - if f"db{db}".format(db) in client.info("all").keys(): - return client.info("all")["db{}".format(db)]["keys"] - return 0 - def is_primary_link_up(self, client=None): if client is None: client = self.client @@ -514,28 +489,6 @@ def is_primary_link_up(self, client=None): return True return False - def _action_success_flag(self, action, client): - if action == ValkeyAction.AOF_REWRITE: - return client.info()["aof_last_bgrewrite_status"] == "ok" - else: - raise RuntimeError("{} not support".format(action)) - - def wait_for_action_done(self, action, client=None): - if client is None: - client = self.client - try: - if action == ValkeyAction.AOF_REWRITE: - wait_for_equal( - lambda: client.info()["aof_rewrite_in_progress"], - 1, - timeout=TEST_MAX_WAIT_TIME_SECONDS, - ) - else: - raise RuntimeError("{} not support".format(action)) - except WaitTimeout: - raise RuntimeError("{} failed to complete in time".format(action)) - assert self._action_success_flag(action, client) - class ValkeyTestCaseBase: testdir = "test-data" From fc6f6f767d511526202e0d5c5fed64b07137171d Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Tue, 5 Aug 2025 20:38:39 +0000 Subject: [PATCH 09/19] Optimizations Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 371 ++++++++++++++++++++-------------------- 1 file changed, 185 insertions(+), 186 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 115a334..87f4d7e 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -61,184 +61,39 @@ class ValkeyAction(Enum): AOF_REWRITE = 1 -class BaseValkeyHandle(object): - """Base class with common methods for both local and external servers""" - - def num_keys(self, db=0, client=None): - if client is None: - client = self.client - if f"db{db}" in client.info("all").keys(): - return client.info("all")[f"db{db}"]["keys"] - return 0 - - def _action_success_flag(self, action, client): - if action == ValkeyAction.AOF_REWRITE: - return client.info()["aof_last_bgrewrite_status"] == "ok" - else: - raise RuntimeError(f"{action} not supported") - - def wait_for_action_done(self, action, client=None): - if client is None: - client = self.client - try: - if action == ValkeyAction.AOF_REWRITE: - wait_for_equal( - lambda: client.info()["aof_rewrite_in_progress"], - 1, - timeout=TEST_MAX_WAIT_TIME_SECONDS, - ) - else: - raise RuntimeError("{} not support".format(action)) - except WaitTimeout: - raise RuntimeError("{} failed to complete in time".format(action)) - assert self._action_success_flag(action, client) - - def wait_for_save_done(self, client=None): - """Wait for the save to complete, failing if it does not complete successfully in the timeout""" - if client is None: - client = self.client - try: - wait_for_ne( - lambda: client.info()["rdb_bgsave_in_progress"], - 1, - timeout=TEST_MAX_WAIT_TIME_SECONDS, - ) - except WaitTimeout: - raise RuntimeError("Save failed to complete in time") - assert client.info()["rdb_last_bgsave_status"] == "ok" - - def is_alive(self): - try: - self.client.ping() - return True - except: - return False - - -class ExternalValkeyServerHandle(BaseValkeyHandle): - """Handle to an external valkey server""" - - def __init__(self, bind_ip, port): - self.bind_ip = bind_ip - self.port = port - self.client = None - self.clients = [] # Track all clients for cleanup - - def connect(self): - print(f"ATTEMPTING CONNECTION TO: {self.bind_ip}:{self.port}") - self.client = StrictValkey(host=self.bind_ip, port=self.port) - try: - self.client.ping() - print(f"PING SUCCESSFUL TO EXTERNAL SERVER: {self.bind_ip}:{self.port}") - except Exception as e: - print(f"CONNECTION FAILED TO: {self.bind_ip}:{self.port} - {e}") - raise RuntimeError( - f"Failed to connect to external server at {self.bind_ip}:{self.port}: {e}" - ) - return self.client - - def get_new_client(self): - client = StrictValkey(host=self.bind_ip, port=self.port) - self.clients.append(client) - return client - - def exit(self, cleanup=True, remove_nodes_conf=True): - if self.client: - try: - self.client.close() - except: - pass - self.client = None - - for client in self.clients: - try: - client.close() - except: - pass - self.clients.clear() - - def restart(self, remove_rdb=False, remove_nodes_conf=False, connect_client=True): - try: - result = subprocess.run( - [ - "docker", - "ps", - "--format", - "{{.Names}}", - "--filter", - f"publish={self.port}", - ], - capture_output=True, - text=True, - check=True, - ) - container_names = result.stdout.strip().split("\n") - if container_names and container_names[0]: - container_name = container_names[0] - subprocess.run(["docker", "restart", container_name], check=True) - time.sleep(3) - except: - pass - - if connect_client: - self.connect() - - def is_rdb_done_loading(self): - """Check if RDB loading is done by examining container logs""" - try: - # Find container using this port - result = subprocess.run( - [ - "docker", - "ps", - "--format", - "{{.Names}}", - "--filter", - f"publish={self.port}", - ], - capture_output=True, - text=True, - check=True, - ) - container_names = result.stdout.strip().split("\n") - if container_names and container_names[0]: - container_name = container_names[0] - # Check container logs for RDB loading completion - logs = subprocess.run( - ["docker", "logs", container_name], - capture_output=True, - text=True, - check=True, - ) - return ( - "Done loading RDB" in logs.stdout - or "Done loading RDB" in logs.stderr - ) - except: - pass - return True # Fallback to assume loaded - - -class ValkeyServerHandle(BaseValkeyHandle): +class ValkeyServerHandle(object): """Handle to a valkey server process""" DEFAULT_BIND_IP = "0.0.0.0" def __init__( - self, bind_ip, port, port_tracker, server_path="valkey-server", cwd="." + self, + bind_ip, + port, + port_tracker, + server_path="valkey-server", + cwd=".", + external_mode=False, ): - self.server = None + self.external_mode = external_mode self.client = None self.port = port self.bind_ip = bind_ip - self.args = {} - self.args["port"] = self.port - self.args["logfile"] = f"logfile_{port}" - self.args["dbfilename"] = f"testrdb-{port}.rdb" - self.args["appenddirname"] = f"aof-{port}" - self.cwd = cwd - self.valkey_path = server_path - self.conf_file = None + + if external_mode: + # External server setup + self.clients = [] + else: + # Local server setup + self.server = None + self.args = {} + self.args["port"] = self.port + self.args["logfile"] = f"logfile_{port}" + self.args["dbfilename"] = f"testrdb-{port}.rdb" + self.args["appenddirname"] = f"aof-{port}" + self.cwd = cwd + self.valkey_path = server_path + self.conf_file = None @classmethod def create_from_server(self, server, db=0): @@ -250,9 +105,32 @@ def set_startup_args(self, args): self.args.update(args) def get_new_client(self): - return self.create_from_server(self) + if self.external_mode: + client = StrictValkey(host=self.bind_ip, port=self.port) + self.clients.append(client) + return client + else: + return self.create_from_server(self) def exit(self, cleanup=True, remove_nodes_conf=True): + if self.external_mode: + # External server cleanup + if self.client: + try: + self.client.close() + except: + pass + self.client = None + + for client in self.clients: + try: + client.close() + except: + pass + self.clients.clear() + return + + # Local server cleanup if self.client: try: self.client.shutdown("nosave") @@ -425,8 +303,43 @@ def start(self, wait_for_ping=True, connect_client=True): return self.client def restart(self, remove_rdb=True, remove_nodes_conf=True, connect_client=True): - self.exit(remove_rdb, remove_nodes_conf) - self.start(connect_client=connect_client) + if self.external_mode: + # Docker restart logic + try: + result = subprocess.run( + [ + "docker", + "ps", + "--format", + "{{.Names}}", + "--filter", + f"publish={self.port}", + ], + capture_output=True, + text=True, + check=True, + ) + container_names = result.stdout.strip().split("\n") + if container_names and container_names[0]: + container_name = container_names[0] + subprocess.run(["docker", "restart", container_name], check=True) + time.sleep(3) + except: + pass + + if connect_client: + self.connect() + else: + # Local server restart logic + self.exit(remove_rdb, remove_nodes_conf) + self.start(connect_client=connect_client) + + def is_alive(self): + try: + self.client.ping() + return True + except: + return False def _waitForPing(self, c): try: @@ -444,12 +357,35 @@ def wait_for_key(self, key, value): ) def connect(self): - c = self.create_from_server(self) + if self.external_mode: + self.client = StrictValkey(host=self.bind_ip, port=self.port) + try: + self.client.ping() + except Exception as e: + raise RuntimeError( + f"Failed to connect to external server at {self.bind_ip}:{self.port}: {e}" + ) + return self.client + else: + c = self.create_from_server(self) + try: + self._waitForPing(c) + except WaitTimeout: + raise RuntimeError("Failed to connect or ping server") + self.client = c + + def wait_for_save_done(self, client=None): + if client is None: + client = self.client try: - self._waitForPing(c) + wait_for_ne( + lambda: client.info()["rdb_bgsave_in_progress"], + 1, + timeout=TEST_MAX_WAIT_TIME_SECONDS, + ) except WaitTimeout: - raise RuntimeError("Failed to connect or ping server") - self.client = c + raise RuntimeError("Save failed to complete in time") + assert client.info()["rdb_last_bgsave_status"] == "ok" def wait_for_save_in_progress(self, client=None): if client is None: @@ -461,8 +397,44 @@ def wait_for_save_in_progress(self, client=None): ) def is_rdb_done_loading(self): - rdb_load_log = "Done loading RDB" - return self.verify_string_in_logfile(rdb_load_log) == True + if self.external_mode: + # Check if RDB loading is done by examining container logs + try: + # Find container using this port + result = subprocess.run( + [ + "docker", + "ps", + "--format", + "{{.Names}}", + "--filter", + f"publish={self.port}", + ], + capture_output=True, + text=True, + check=True, + ) + container_names = result.stdout.strip().split("\n") + if container_names and container_names[0]: + container_name = container_names[0] + # Check container logs for RDB loading completion + logs = subprocess.run( + ["docker", "logs", container_name], + capture_output=True, + text=True, + check=True, + ) + return ( + "Done loading RDB" in logs.stdout + or "Done loading RDB" in logs.stderr + ) + except: + pass + return True # Fallback to assume loaded + else: + # Local server logic + rdb_load_log = "Done loading RDB" + return self.verify_string_in_logfile(rdb_load_log) == True def num_replicas_online(self, client=None): if client is None: @@ -478,6 +450,13 @@ def get_default_client(self, client): return self.client return client + def num_keys(self, db=0, client=None): + if client is None: + client = self.client + if f"db{db}".format(db) in client.info("all").keys(): + return client.info("all")["db{}".format(db)]["keys"] + return 0 + def is_primary_link_up(self, client=None): if client is None: client = self.client @@ -489,6 +468,28 @@ def is_primary_link_up(self, client=None): return True return False + def _action_success_flag(self, action, client): + if action == ValkeyAction.AOF_REWRITE: + return client.info()["aof_last_bgrewrite_status"] == "ok" + else: + raise RuntimeError("{} not support".format(action)) + + def wait_for_action_done(self, action, client=None): + if client is None: + client = self.client + try: + if action == ValkeyAction.AOF_REWRITE: + wait_for_equal( + lambda: client.info()["aof_rewrite_in_progress"], + 1, + timeout=TEST_MAX_WAIT_TIME_SECONDS, + ) + else: + raise RuntimeError("{} not support".format(action)) + except WaitTimeout: + raise RuntimeError("{} failed to complete in time".format(action)) + assert self._action_success_flag(action, client) + class ValkeyTestCaseBase: testdir = "test-data" @@ -612,13 +613,13 @@ def create_server( if not port: raise ValueError("Port must be specified for external server") - print(f"CONNECTING TO EXTERNAL SERVER: {bind_ip}:{port}") - external_handle = ExternalValkeyServerHandle(bind_ip, port) - valkey_cli = external_handle.connect() + valkey_server = ValkeyServerHandle( + bind_ip, port, port_tracker=None, external_mode=True + ) + valkey_cli = valkey_server.connect() if not skip_teardown: - self.server_list.append(external_handle) - print(f"CONNECTED TO EXTERNAL SERVER: {bind_ip}:{port}") - return external_handle, valkey_cli + self.server_list.append(valkey_server) + return valkey_server, valkey_cli if not bind_ip: bind_ip = self.get_bind_ip() @@ -626,7 +627,6 @@ def create_server( if not port: port = self.get_bind_port() - print(f"CREATING LOCAL SERVER: {bind_ip}:{port}") valkey_server_handle = self.get_valkey_handle() self.server_path = server_path valkey_server = valkey_server_handle( @@ -641,7 +641,6 @@ def create_server( valkey_server.conf_file = conf_file valkey_server.args.update(args) valkey_cli = valkey_server.start() - print(f"LOCAL SERVER STARTED: {bind_ip}:{port}") return valkey_server, valkey_cli def wait_for_all_replicas_online(self, n): From f5bf1ef2b2a6737b51f86a23c130320202338445 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Tue, 5 Aug 2025 20:51:49 +0000 Subject: [PATCH 10/19] Minor formatting changes Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 87f4d7e..37c86a8 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -375,6 +375,7 @@ def connect(self): self.client = c def wait_for_save_done(self, client=None): + """Wait for the save to complete, failing if it does not complete successfully in the timeout""" if client is None: client = self.client try: From 38aa8fedd3f68cdcf8845aedde4ad8955b893858 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Wed, 6 Aug 2025 19:33:19 +0000 Subject: [PATCH 11/19] Addressed suggestions Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 158 ++++++++++++++++++---------------------- 1 file changed, 69 insertions(+), 89 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 37c86a8..886f3c9 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -95,50 +95,38 @@ def __init__( self.valkey_path = server_path self.conf_file = None - @classmethod - def create_from_server(self, server, db=0): - logging.info(("Created regular client for port {}".format(server.port))) - r = StrictValkey(host="localhost", port=server.port, db=db) + def create_from_server(self, db=0): + logging.info(("Created regular client for port {}".format(self.port))) + r = StrictValkey(host=self.bind_ip, port=self.port, db=db) return r def set_startup_args(self, args): self.args.update(args) def get_new_client(self): + client = self.create_from_server() if self.external_mode: - client = StrictValkey(host=self.bind_ip, port=self.port) self.clients.append(client) - return client - else: - return self.create_from_server(self) + return client def exit(self, cleanup=True, remove_nodes_conf=True): - if self.external_mode: - # External server cleanup - if self.client: + # Client cleanup (same for both modes) + if self.client: + if not self.external_mode: try: - self.client.close() + self.client.shutdown("nosave") except: - pass - self.client = None + logging.warning("SHUTDOWN was unsuccessful") + self.client.close() + self.client = None + # External mode: clean up additional clients + if self.external_mode: for client in self.clients: - try: - client.close() - except: - pass + client.close() self.clients.clear() return - # Local server cleanup - if self.client: - try: - self.client.shutdown("nosave") - except: - logging.warning("SHUTDOWN was unsuccessful") - - self.client = None - if self.server: self._waitForExit() self.server = None @@ -305,27 +293,26 @@ def start(self, wait_for_ping=True, connect_client=True): def restart(self, remove_rdb=True, remove_nodes_conf=True, connect_client=True): if self.external_mode: # Docker restart logic - try: - result = subprocess.run( - [ - "docker", - "ps", - "--format", - "{{.Names}}", - "--filter", - f"publish={self.port}", - ], - capture_output=True, - text=True, - check=True, - ) - container_names = result.stdout.strip().split("\n") - if container_names and container_names[0]: - container_name = container_names[0] - subprocess.run(["docker", "restart", container_name], check=True) - time.sleep(3) - except: - pass + result = subprocess.run( + [ + "docker", + "ps", + "--format", + "{{.Names}}", + "--filter", + f"publish={self.port}", + ], + capture_output=True, + text=True, + check=True, + ) + container_names = result.stdout.strip().split("\n") + if container_names and container_names[0]: + container_name = container_names[0] + subprocess.run(["docker", "restart", container_name], check=True) + time.sleep(3) + else: + raise RuntimeError(f"No Docker container found using port {self.port}") if connect_client: self.connect() @@ -357,22 +344,17 @@ def wait_for_key(self, key, value): ) def connect(self): - if self.external_mode: - self.client = StrictValkey(host=self.bind_ip, port=self.port) - try: - self.client.ping() - except Exception as e: + self.client = self.create_from_server() + try: + self.client.ping() + except Exception as e: + if self.external_mode: raise RuntimeError( f"Failed to connect to external server at {self.bind_ip}:{self.port}: {e}" ) - return self.client - else: - c = self.create_from_server(self) - try: - self._waitForPing(c) - except WaitTimeout: - raise RuntimeError("Failed to connect or ping server") - self.client = c + else: + raise RuntimeError(f"Failed to connect or ping server: {e}") + return self.client def wait_for_save_done(self, client=None): """Wait for the save to complete, failing if it does not complete successfully in the timeout""" @@ -400,38 +382,36 @@ def wait_for_save_in_progress(self, client=None): def is_rdb_done_loading(self): if self.external_mode: # Check if RDB loading is done by examining container logs - try: - # Find container using this port - result = subprocess.run( - [ - "docker", - "ps", - "--format", - "{{.Names}}", - "--filter", - f"publish={self.port}", - ], + # Find container using this port + result = subprocess.run( + [ + "docker", + "ps", + "--format", + "{{.Names}}", + "--filter", + f"publish={self.port}", + ], + capture_output=True, + text=True, + check=True, + ) + container_names = result.stdout.strip().split("\n") + if container_names and container_names[0]: + container_name = container_names[0] + # Check container logs for RDB loading completion + logs = subprocess.run( + ["docker", "logs", container_name], capture_output=True, text=True, check=True, ) - container_names = result.stdout.strip().split("\n") - if container_names and container_names[0]: - container_name = container_names[0] - # Check container logs for RDB loading completion - logs = subprocess.run( - ["docker", "logs", container_name], - capture_output=True, - text=True, - check=True, - ) - return ( - "Done loading RDB" in logs.stdout - or "Done loading RDB" in logs.stderr - ) - except: - pass - return True # Fallback to assume loaded + return ( + "Done loading RDB" in logs.stdout + or "Done loading RDB" in logs.stderr + ) + else: + raise RuntimeError(f"No Docker container found using port {self.port}") else: # Local server logic rdb_load_log = "Done loading RDB" From 0a7f45e7291e6f9e1d318651d2c4c031a8130da2 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Wed, 6 Aug 2025 21:29:09 +0000 Subject: [PATCH 12/19] Minor Change Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 886f3c9..7a9b6ef 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -348,12 +348,7 @@ def connect(self): try: self.client.ping() except Exception as e: - if self.external_mode: - raise RuntimeError( - f"Failed to connect to external server at {self.bind_ip}:{self.port}: {e}" - ) - else: - raise RuntimeError(f"Failed to connect or ping server: {e}") + raise RuntimeError("Failed to connect or ping server") return self.client def wait_for_save_done(self, client=None): From ccb5798f5054949586fff730ee312f4e0a8e68c5 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Wed, 6 Aug 2025 22:22:14 +0000 Subject: [PATCH 13/19] Running rdb tests now Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 44 ++++++++--------------------------------- 1 file changed, 8 insertions(+), 36 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 7a9b6ef..d44551f 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -80,18 +80,19 @@ def __init__( self.port = port self.bind_ip = bind_ip + self.args = {} + self.args["port"] = self.port + self.args["logfile"] = f"logfile_{port}" + self.args["dbfilename"] = f"testrdb-{port}.rdb" + self.cwd = cwd + if external_mode: # External server setup self.clients = [] else: # Local server setup self.server = None - self.args = {} - self.args["port"] = self.port - self.args["logfile"] = f"logfile_{port}" - self.args["dbfilename"] = f"testrdb-{port}.rdb" self.args["appenddirname"] = f"aof-{port}" - self.cwd = cwd self.valkey_path = server_path self.conf_file = None @@ -376,37 +377,8 @@ def wait_for_save_in_progress(self, client=None): def is_rdb_done_loading(self): if self.external_mode: - # Check if RDB loading is done by examining container logs - # Find container using this port - result = subprocess.run( - [ - "docker", - "ps", - "--format", - "{{.Names}}", - "--filter", - f"publish={self.port}", - ], - capture_output=True, - text=True, - check=True, - ) - container_names = result.stdout.strip().split("\n") - if container_names and container_names[0]: - container_name = container_names[0] - # Check container logs for RDB loading completion - logs = subprocess.run( - ["docker", "logs", container_name], - capture_output=True, - text=True, - check=True, - ) - return ( - "Done loading RDB" in logs.stdout - or "Done loading RDB" in logs.stderr - ) - else: - raise RuntimeError(f"No Docker container found using port {self.port}") + info = self.client.info() + return info.get('loading', 0) == 0 else: # Local server logic rdb_load_log = "Done loading RDB" From 3c09624670d991b17500d1d1dade0d6b2bf529b1 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Wed, 6 Aug 2025 22:28:29 +0000 Subject: [PATCH 14/19] Fixed formatting Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index d44551f..5e7be96 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -378,7 +378,7 @@ def wait_for_save_in_progress(self, client=None): def is_rdb_done_loading(self): if self.external_mode: info = self.client.info() - return info.get('loading', 0) == 0 + return info.get("loading", 0) == 0 else: # Local server logic rdb_load_log = "Done loading RDB" From ec8a21da5bbecf806180d80c28b594cd79d207b3 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Thu, 7 Aug 2025 21:59:44 +0000 Subject: [PATCH 15/19] Removed client stuff Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 5e7be96..1b727e8 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -86,10 +86,7 @@ def __init__( self.args["dbfilename"] = f"testrdb-{port}.rdb" self.cwd = cwd - if external_mode: - # External server setup - self.clients = [] - else: + if not external_mode: # Local server setup self.server = None self.args["appenddirname"] = f"aof-{port}" @@ -105,10 +102,7 @@ def set_startup_args(self, args): self.args.update(args) def get_new_client(self): - client = self.create_from_server() - if self.external_mode: - self.clients.append(client) - return client + return self.create_from_server() def exit(self, cleanup=True, remove_nodes_conf=True): # Client cleanup (same for both modes) @@ -121,11 +115,8 @@ def exit(self, cleanup=True, remove_nodes_conf=True): self.client.close() self.client = None - # External mode: clean up additional clients + # No server process to clean up if we're using an external server if self.external_mode: - for client in self.clients: - client.close() - self.clients.clear() return if self.server: @@ -172,7 +163,7 @@ def _waitForExit(self): logging.warning("Server did not exit in time, killing...") if self.is_alive(): # check server is still running before kill it. - self.kill() + self.server.kill() try: self.wait_for_shutdown() except WaitTimeout: From dfd6d657898f3e19ab0e267f6379164ac32aed88 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Mon, 11 Aug 2025 01:56:21 +0000 Subject: [PATCH 16/19] moved docker commands out of valkey-test-framework Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 44 ++++++++++------------------------------- 1 file changed, 10 insertions(+), 34 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 1b727e8..fe2ec3c 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -77,22 +77,19 @@ def __init__( ): self.external_mode = external_mode self.client = None + self.server = None self.port = port self.bind_ip = bind_ip + self.valkey_path = server_path + self.conf_file = None self.args = {} self.args["port"] = self.port self.args["logfile"] = f"logfile_{port}" self.args["dbfilename"] = f"testrdb-{port}.rdb" + self.args["appenddirname"] = f"aof-{port}" self.cwd = cwd - if not external_mode: - # Local server setup - self.server = None - self.args["appenddirname"] = f"aof-{port}" - self.valkey_path = server_path - self.conf_file = None - def create_from_server(self, db=0): logging.info(("Created regular client for port {}".format(self.port))) r = StrictValkey(host=self.bind_ip, port=self.port, db=db) @@ -284,32 +281,10 @@ def start(self, wait_for_ping=True, connect_client=True): def restart(self, remove_rdb=True, remove_nodes_conf=True, connect_client=True): if self.external_mode: - # Docker restart logic - result = subprocess.run( - [ - "docker", - "ps", - "--format", - "{{.Names}}", - "--filter", - f"publish={self.port}", - ], - capture_output=True, - text=True, - check=True, + return self._test_instance.restart_external_server( + self, remove_rdb, remove_nodes_conf, connect_client ) - container_names = result.stdout.strip().split("\n") - if container_names and container_names[0]: - container_name = container_names[0] - subprocess.run(["docker", "restart", container_name], check=True) - time.sleep(3) - else: - raise RuntimeError(f"No Docker container found using port {self.port}") - - if connect_client: - self.connect() else: - # Local server restart logic self.exit(remove_rdb, remove_nodes_conf) self.start(connect_client=connect_client) @@ -552,10 +527,11 @@ def create_server( if not port: raise ValueError("Port must be specified for external server") - valkey_server = ValkeyServerHandle( - bind_ip, port, port_tracker=None, external_mode=True - ) + valkey_server = ValkeyServerHandle(bind_ip, port, port_tracker=None, external_mode=True) + + valkey_server._test_instance = self valkey_cli = valkey_server.connect() + if not skip_teardown: self.server_list.append(valkey_server) return valkey_server, valkey_cli From 2fb468b157c5e9e7fccbd2a35796cb96499bdc45 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Mon, 11 Aug 2025 20:47:38 +0000 Subject: [PATCH 17/19] Fixed formatting Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index fe2ec3c..7520b0f 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -527,8 +527,10 @@ def create_server( if not port: raise ValueError("Port must be specified for external server") - valkey_server = ValkeyServerHandle(bind_ip, port, port_tracker=None, external_mode=True) - + valkey_server = ValkeyServerHandle( + bind_ip, port, port_tracker=None, external_mode=True + ) + valkey_server._test_instance = self valkey_cli = valkey_server.connect() From 121369db9e761331a3bd0c57712b34da0280a126 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Mon, 11 Aug 2025 21:16:57 +0000 Subject: [PATCH 18/19] Minor changes Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 7520b0f..24e6a3d 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -75,20 +75,19 @@ def __init__( cwd=".", external_mode=False, ): - self.external_mode = external_mode - self.client = None self.server = None + self.client = None + self.external_mode = external_mode self.port = port self.bind_ip = bind_ip - self.valkey_path = server_path - self.conf_file = None - self.args = {} self.args["port"] = self.port self.args["logfile"] = f"logfile_{port}" self.args["dbfilename"] = f"testrdb-{port}.rdb" self.args["appenddirname"] = f"aof-{port}" self.cwd = cwd + self.valkey_path = server_path + self.conf_file = None def create_from_server(self, db=0): logging.info(("Created regular client for port {}".format(self.port))) @@ -102,7 +101,6 @@ def get_new_client(self): return self.create_from_server() def exit(self, cleanup=True, remove_nodes_conf=True): - # Client cleanup (same for both modes) if self.client: if not self.external_mode: try: @@ -311,11 +309,12 @@ def wait_for_key(self, key, value): ) def connect(self): - self.client = self.create_from_server() + c = self.create_from_server() try: - self.client.ping() - except Exception as e: + self._waitForPing(c) + except WaitTimeout: raise RuntimeError("Failed to connect or ping server") + self.client = c return self.client def wait_for_save_done(self, client=None): From b7ccd979eaa1b4f917e9d7e7ce4b8edca0c3648c Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Tue, 12 Aug 2025 16:48:02 +0000 Subject: [PATCH 19/19] Addressed comments Signed-off-by: Nikhil Manglore --- src/valkey_test_case.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/valkey_test_case.py b/src/valkey_test_case.py index 24e6a3d..98d0d94 100644 --- a/src/valkey_test_case.py +++ b/src/valkey_test_case.py @@ -522,9 +522,9 @@ def create_server( if external_server: if not bind_ip: - bind_ip = "localhost" + raise ValueError("Bind ip must be specified for external server use") if not port: - raise ValueError("Port must be specified for external server") + raise ValueError("Port must be specified for external server use") valkey_server = ValkeyServerHandle( bind_ip, port, port_tracker=None, external_mode=True