Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions mcrit/SpawningWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from itertools import zip_longest
from multiprocessing import Pool, cpu_count
import subprocess
import threading
from typing import Dict, List, Optional, TYPE_CHECKING, Tuple

import tqdm
Expand Down Expand Up @@ -92,24 +93,46 @@ def _executeJobPayload(self, job_payload, job):
console_handle = subprocess.Popen(["python", "-m", "mcrit", "singlejobworker", "--job_id", str(job.job_id)], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# extract result_id from console_output
result_id = None
stdout_lines = []

def reader(pipe, label, accum):
try:
for line in iter(pipe.readline, b''):
decoded_line = line.decode("utf-8", errors="replace").rstrip()
if decoded_line:
LOGGER.info("%s logs from subprocess: %s", label, decoded_line)
if accum is not None:
accum.append(decoded_line)
except Exception:
LOGGER.exception("Exception in subprocess reader thread")
finally:
pipe.close()

t1 = threading.Thread(target=reader, args=(console_handle.stdout, "STDOUT", stdout_lines))
t2 = threading.Thread(target=reader, args=(console_handle.stderr, "STDERR", None))
t1.daemon = True
t2.daemon = True
t1.start()
t2.start()

try:
stdout_result, stderr_result = console_handle.communicate(timeout=self._queue_config.QUEUE_SPAWNINGWORKER_CHILDREN_TIMEOUT)
stdout_result = stdout_result.strip().decode("utf-8")
# TODO: log output from subprocess in the order it arrived
# instead of the split to stdout, stderr
if stdout_result:
LOGGER.info("STDOUT logs from subprocess: %s", stdout_result)
if stderr_result:
stderr_result = stderr_result.strip().decode("utf-8")
LOGGER.info("STDERR logs from subprocess: %s", stderr_result)

last_line = stdout_result.split("\n")[-1]
# successful output should be just the result_id in a single line
match = re.match("(?P<result_id>[0-9a-fA-F]{24})", last_line)
if match:
result_id = match.group("result_id")
console_handle.wait(timeout=self._queue_config.QUEUE_SPAWNINGWORKER_CHILDREN_TIMEOUT)
except subprocess.TimeoutExpired:
LOGGER.error(f"Job {str(job.job_id)} running as child from SpawningWorker timed out during processing.")
console_handle.kill()
console_handle.wait()

t1.join()
t2.join()

if stdout_lines:
# Search backwards for result_id in case there are trailing empty lines or other output
for line in reversed(stdout_lines):
if line.strip():
match = re.match("(?P<result_id>[0-9a-fA-F]{24})", line.strip())
if match:
result_id = match.group("result_id")
break
return result_id

def _executeJob(self, job):
Expand Down
Loading