diff --git a/craft_cli/printer.py b/craft_cli/printer.py index 59b8a79b..07e12f96 100644 --- a/craft_cli/printer.py +++ b/craft_cli/printer.py @@ -29,6 +29,7 @@ import time import weakref from collections.abc import Callable +from contextlib import suppress from dataclasses import dataclass, field from datetime import datetime from functools import lru_cache @@ -50,6 +51,21 @@ ANSI_CLEAR_LINE_TO_END = "\x1b[K" # ANSI escape code to clear the rest of the line. ANSI_HIDE_CURSOR = "\x1b[?25l" ANSI_SHOW_CURSOR = "\x1b[?25h" +ANSI_RESET = "\x1b[0m" + + +def _safe_print(*args: Any, **kwargs: Any) -> None: + """Print to a stream, ignoring BrokenPipeError from downstream consumers.""" + with suppress(BrokenPipeError): + print(*args, **kwargs) + + +def reset_terminal_style(stream: TextIO | None) -> None: + """Reset ANSI terminal style on the given stream if supported.""" + if stream is None: + return + if _stream_is_terminal(stream) and _supports_ansi_escape_sequences(): + _safe_print(ANSI_RESET, end="", flush=True, file=stream) @dataclass @@ -121,6 +137,24 @@ def _format_term_line( return previous_line_end + _fill_line(text + spintext) +def _format_term_lines( + previous_line_end: str, text: str, spintext: str, *, ephemeral: bool +) -> str: + """Format one or more terminal lines, clearing each rendered line fully.""" + lines = text.split("\n") + if len(lines) == 1: + return _format_term_line(previous_line_end, text, spintext, ephemeral=ephemeral) + + formatted = [ + _format_term_line(previous_line_end, lines[0], "", ephemeral=ephemeral) + ] + formatted.extend( + _format_term_line("\n", line, "", ephemeral=ephemeral) for line in lines[1:-1] + ) + formatted.append(_format_term_line("\n", lines[-1], spintext, ephemeral=ephemeral)) + return "".join(formatted) + + class _Spinner(threading.Thread): """A supervisor thread that will repeat long-standing messages with a spinner besides it. @@ -317,7 +351,7 @@ def _write_line_terminal( # We don't need to rewrite the same ephemeral message repeatedly. should_overwrite = spintext or message.end_line or not message.ephemeral if should_overwrite or message != self.prv_msg: - line = _format_term_line( + line = _format_term_lines( previous_line_end, text, spintext, ephemeral=message.ephemeral ) print(line, end="", flush=True, file=message.stream) @@ -436,9 +470,9 @@ def _show(self, msg: _MessageInfo) -> None: def _log(self, message: _MessageInfo) -> None: """Write the line message to the log file.""" - # prepare the text with (maybe) the timestamp timestamp_str = message.created_at.isoformat(sep=" ", timespec="milliseconds") - self.log.write(f"{timestamp_str} {message.text}\n") + for line in message.text.split("\n"): + self.log.write(f"{timestamp_str} {line}\n") # Flush the file: protect a bit in case of crashes, and multiprocess-based # parallelism. self.log.flush() diff --git a/tests/integration/test_messages_integration.py b/tests/integration/test_messages_integration.py index d9074df9..b7c6c0be 100644 --- a/tests/integration/test_messages_integration.py +++ b/tests/integration/test_messages_integration.py @@ -84,6 +84,7 @@ def remove_control_characters(string: str) -> str: string.replace(printer.ANSI_CLEAR_LINE_TO_END, "") .replace(printer.ANSI_HIDE_CURSOR, "") .replace(printer.ANSI_SHOW_CURSOR, "") + .replace(printer.ANSI_RESET, "") ) @@ -1186,6 +1187,29 @@ def test_error_multiline_brief(capsys): assert_outputs(capsys, emit, expected_err=expected, expected_log=expected) +@pytest.mark.parametrize("output_is_terminal", [True]) +def test_multiline_permanent_progress_overwrites_temporary_progress(capsys): + """A permanent multiline progress should fully overwrite a previous temporary one.""" + emit = Emitter() + emit.init(EmitterMode.BRIEF, "testapp", GREETING) + + emit.progress("foo foo foo", permanent=False) + emit.progress("bar\nbar", permanent=True) + emit.ended_ok() + + expected_err = [ + Line("foo foo foo", permanent=False), + Line("bar", permanent=True), + Line("bar", permanent=True), + ] + expected_log = [ + Line("foo foo foo"), + Line("bar"), + Line("bar"), + ] + assert_outputs(capsys, emit, expected_err=expected_err, expected_log=expected_log) + + @pytest.mark.parametrize( "mode", [