diff --git a/mcrit/SpawningWorker.py b/mcrit/SpawningWorker.py index c9d318c..27759d2 100644 --- a/mcrit/SpawningWorker.py +++ b/mcrit/SpawningWorker.py @@ -12,6 +12,7 @@ from itertools import zip_longest from multiprocessing import Pool, cpu_count import subprocess +import threading from typing import Dict, List, Optional, TYPE_CHECKING, Tuple import tqdm @@ -92,24 +93,46 @@ def _executeJobPayload(self, job_payload, job): console_handle = subprocess.Popen(["python", "-m", "mcrit", "singlejobworker", "--job_id", str(job.job_id)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # extract result_id from console_output result_id = None + stdout_lines = [] + + def reader(pipe, label, accum): + try: + for line in iter(pipe.readline, b''): + decoded_line = line.decode("utf-8", errors="replace").rstrip() + if decoded_line: + LOGGER.info("%s logs from subprocess: %s", label, decoded_line) + if accum is not None: + accum.append(decoded_line) + except Exception: + LOGGER.exception("Exception in subprocess reader thread") + finally: + pipe.close() + + t1 = threading.Thread(target=reader, args=(console_handle.stdout, "STDOUT", stdout_lines)) + t2 = threading.Thread(target=reader, args=(console_handle.stderr, "STDERR", None)) + t1.daemon = True + t2.daemon = True + t1.start() + t2.start() + try: - stdout_result, stderr_result = console_handle.communicate(timeout=self._queue_config.QUEUE_SPAWNINGWORKER_CHILDREN_TIMEOUT) - stdout_result = stdout_result.strip().decode("utf-8") - # TODO: log output from subprocess in the order it arrived - # instead of the split to stdout, stderr - if stdout_result: - LOGGER.info("STDOUT logs from subprocess: %s", stdout_result) - if stderr_result: - stderr_result = stderr_result.strip().decode("utf-8") - LOGGER.info("STDERR logs from subprocess: %s", stderr_result) - - last_line = stdout_result.split("\n")[-1] - # successful output should be just the result_id in a single line - match = re.match("(?P[0-9a-fA-F]{24})", last_line) - if match: - result_id = match.group("result_id") + console_handle.wait(timeout=self._queue_config.QUEUE_SPAWNINGWORKER_CHILDREN_TIMEOUT) except subprocess.TimeoutExpired: LOGGER.error(f"Job {str(job.job_id)} running as child from SpawningWorker timed out during processing.") + console_handle.kill() + console_handle.wait() + + t1.join() + t2.join() + + if stdout_lines: + # Search backwards for result_id in case there are trailing empty lines or other output + for line in reversed(stdout_lines): + if line.strip(): + match = re.match("(?P[0-9a-fA-F]{24})", line.strip()) + if match: + result_id = match.group("result_id") + break return result_id def _executeJob(self, job):