From ea654227eae74cdd8731eaf8b397b49a780cc489 Mon Sep 17 00:00:00 2001 From: r0ny123 <49360849+r0ny123@users.noreply.github.com> Date: Mon, 6 Apr 2026 08:14:45 +0000 Subject: [PATCH 1/2] Log subprocess output in arrival order in SpawningWorker Modified SpawningWorker._executeJobPayload to use daemon threads for reading and logging stdout and stderr from the child process in real-time. This ensures that logs from both streams are interleaved in arrival order and appear as they happen, rather than being captured and logged only after the process terminates. The implementation is cross-platform compatible and robustly extracts the result_id from the last non-empty line of stdout. It also ensures the child process is killed upon timeout. Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> --- mcrit/SpawningWorker.py | 52 +++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/mcrit/SpawningWorker.py b/mcrit/SpawningWorker.py index c9d318c..1f1ddb8 100644 --- a/mcrit/SpawningWorker.py +++ b/mcrit/SpawningWorker.py @@ -12,6 +12,7 @@ from itertools import zip_longest from multiprocessing import Pool, cpu_count import subprocess +import threading from typing import Dict, List, Optional, TYPE_CHECKING, Tuple import tqdm @@ -92,24 +93,45 @@ def _executeJobPayload(self, job_payload, job): console_handle = subprocess.Popen(["python", "-m", "mcrit", "singlejobworker", "--job_id", str(job.job_id)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # extract result_id from console_output result_id = None + stdout_lines = [] + + def reader(pipe, label, accum): + try: + for line in iter(pipe.readline, b''): + decoded_line = line.decode("utf-8", errors="replace").rstrip() + if decoded_line: + LOGGER.info("%s logs from subprocess: %s", label, decoded_line) + if accum is not None: + accum.append(decoded_line) + except Exception: + pass + finally: + pipe.close() + + t1 = threading.Thread(target=reader, args=(console_handle.stdout, "STDOUT", stdout_lines)) + t2 = threading.Thread(target=reader, args=(console_handle.stderr, "STDERR", None)) + t1.daemon = True + t2.daemon = True + t1.start() + t2.start() + try: - stdout_result, stderr_result = console_handle.communicate(timeout=self._queue_config.QUEUE_SPAWNINGWORKER_CHILDREN_TIMEOUT) - stdout_result = stdout_result.strip().decode("utf-8") - # TODO: log output from subprocess in the order it arrived - # instead of the split to stdout, stderr - if stdout_result: - LOGGER.info("STDOUT logs from subprocess: %s", stdout_result) - if stderr_result: - stderr_result = stderr_result.strip().decode("utf-8") - LOGGER.info("STDERR logs from subprocess: %s", stderr_result) - - last_line = stdout_result.split("\n")[-1] - # successful output should be just the result_id in a single line - match = re.match("(?P[0-9a-fA-F]{24})", last_line) - if match: - result_id = match.group("result_id") + console_handle.wait(timeout=self._queue_config.QUEUE_SPAWNINGWORKER_CHILDREN_TIMEOUT) except subprocess.TimeoutExpired: LOGGER.error(f"Job {str(job.job_id)} running as child from SpawningWorker timed out during processing.") + console_handle.kill() + + t1.join(timeout=1) + t2.join(timeout=1) + + if stdout_lines: + # Search backwards for result_id in case there are trailing empty lines or other output + for line in reversed(stdout_lines): + if line.strip(): + match = re.match("(?P[0-9a-fA-F]{24})", line.strip()) + if match: + result_id = match.group("result_id") + break return result_id def _executeJob(self, job): From fe4b6deba372e04318f0ebcb1261efa58d7ffbfd Mon Sep 17 00:00:00 2001 From: Rony <49360849+r0ny123@users.noreply.github.com> Date: Mon, 6 Apr 2026 13:48:58 +0530 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- mcrit/SpawningWorker.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mcrit/SpawningWorker.py b/mcrit/SpawningWorker.py index 1f1ddb8..27759d2 100644 --- a/mcrit/SpawningWorker.py +++ b/mcrit/SpawningWorker.py @@ -104,7 +104,7 @@ def reader(pipe, label, accum): if accum is not None: accum.append(decoded_line) except Exception: - pass + LOGGER.exception("Exception in subprocess reader thread") finally: pipe.close() @@ -120,9 +120,10 @@ def reader(pipe, label, accum): except subprocess.TimeoutExpired: LOGGER.error(f"Job {str(job.job_id)} running as child from SpawningWorker timed out during processing.") console_handle.kill() + console_handle.wait() - t1.join(timeout=1) - t2.join(timeout=1) + t1.join() + t2.join() if stdout_lines: # Search backwards for result_id in case there are trailing empty lines or other output