Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions ip/ml_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2144,26 +2144,26 @@ def _read_remote_pid():
proc.terminate()
rc = proc.wait()
finally:
# Copy heap back BEFORE tearing down SSH — needs the master connection
# alive, and large heaps can take a while over WAN links.
if rc == 0:
if not copy_heap_back(host, new_argv, argv_rewrites):
rc = 2
logger.debug(f"Process exited with rc={rc}")
# Kill remote PIDs first (needs SSH master alive), then clean up
# local processes and temp files.
mgmt.shutdown()
cleanup_all()
cleanup_remote(host, remote_tmp, remote_src_dir)

logger.debug(f"Process exited with rc={rc}")

# If this was a heap build (ML_Heap.save_child in command), copy the
# built heap back from the remote to the local expected path.
if rc == 0:
copy_heap_back(host, new_argv, argv_rewrites)

sys.exit(rc)


def copy_heap_back(host, new_argv, argv_rewrites):
"""After a successful heap build, rsync the heap from remote to local.

Reverses argv_rewrites to determine the local destination path.
Returns True on success, False on failure.
"""
for arg in new_argv:
m = re.search(r'ML_Heap\.save_child "([^"]+)"', arg)
Expand All @@ -2172,13 +2172,24 @@ def copy_heap_back(host, new_argv, argv_rewrites):
local_heap = backward_rewrite(remote_heap, argv_rewrites, label="heap_back")
local_heap_dir = os.path.dirname(local_heap)
os.makedirs(local_heap_dir, exist_ok=True)
logger.info(f"Copying built heap from remote: {remote_heap} -> {local_heap}")
# Check remote size first so we can report progress.
try:
remote_size_str = ssh_run_stdout(
host, "stat", "--printf=%s", remote_heap).strip()
remote_size = int(remote_size_str)
logger.info(f"Copying built heap from remote: {remote_heap} "
f"({remote_size / 1e9:.1f} GB) -> {local_heap}")
except Exception:
remote_size = None
logger.info(f"Copying built heap from remote: {remote_heap} -> {local_heap}")
try:
t0 = time.time()
subprocess.run(
["rsync", "-az", "-e", "ssh " + " ".join(ssh_control_flags()),
["rsync", "-az",
"-e", "ssh " + " ".join(ssh_control_flags()),
f"{host}:{remote_heap}", local_heap],
check=True, timeout=300)
check=True,
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
elapsed = time.time() - t0
size = os.path.getsize(local_heap)
logger.info(f"Heap copied successfully ({size} bytes, {elapsed:.1f}s)")
Expand All @@ -2192,8 +2203,11 @@ def copy_heap_back(host, new_argv, argv_rewrites):
logger.info(f"Appended digest to remote heap")
except Exception as e:
logger.debug(f"Failed to append digest to remote heap: {e}")
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
logger.warning(f"Failed to copy heap: {e}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to copy heap: {e}")
return False
return True


def cleanup_remote(host, remote_tmp, remote_src_dir):
Expand Down
Loading