Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions craft_cli/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import time
import weakref
from collections.abc import Callable
from contextlib import suppress
from dataclasses import dataclass, field
from datetime import datetime
from functools import lru_cache
Expand All @@ -50,6 +51,21 @@
ANSI_CLEAR_LINE_TO_END = "\x1b[K" # ANSI escape code to clear the rest of the line.
ANSI_HIDE_CURSOR = "\x1b[?25l"
ANSI_SHOW_CURSOR = "\x1b[?25h"
ANSI_RESET = "\x1b[0m"


def _safe_print(*args: Any, **kwargs: Any) -> None:
"""Print to a stream, ignoring BrokenPipeError from downstream consumers."""
with suppress(BrokenPipeError):
print(*args, **kwargs)


def reset_terminal_style(stream: TextIO | None) -> None:
"""Reset ANSI terminal style on the given stream if supported."""
if stream is None:
return
if _stream_is_terminal(stream) and _supports_ansi_escape_sequences():
_safe_print(ANSI_RESET, end="", flush=True, file=stream)


@dataclass
Expand Down Expand Up @@ -121,6 +137,24 @@ def _format_term_line(
return previous_line_end + _fill_line(text + spintext)


def _format_term_lines(
previous_line_end: str, text: str, spintext: str, *, ephemeral: bool
) -> str:
"""Format one or more terminal lines, clearing each rendered line fully."""
lines = text.split("\n")
if len(lines) == 1:
return _format_term_line(previous_line_end, text, spintext, ephemeral=ephemeral)

formatted = [
_format_term_line(previous_line_end, lines[0], "", ephemeral=ephemeral)
]
formatted.extend(
_format_term_line("\n", line, "", ephemeral=ephemeral) for line in lines[1:-1]
)
formatted.append(_format_term_line("\n", lines[-1], spintext, ephemeral=ephemeral))
return "".join(formatted)


class _Spinner(threading.Thread):
"""A supervisor thread that will repeat long-standing messages with a spinner besides it.

Expand Down Expand Up @@ -317,7 +351,7 @@ def _write_line_terminal(
# We don't need to rewrite the same ephemeral message repeatedly.
should_overwrite = spintext or message.end_line or not message.ephemeral
if should_overwrite or message != self.prv_msg:
line = _format_term_line(
line = _format_term_lines(
previous_line_end, text, spintext, ephemeral=message.ephemeral
)
print(line, end="", flush=True, file=message.stream)
Expand Down Expand Up @@ -436,9 +470,9 @@ def _show(self, msg: _MessageInfo) -> None:

def _log(self, message: _MessageInfo) -> None:
"""Write the line message to the log file."""
# prepare the text with (maybe) the timestamp
timestamp_str = message.created_at.isoformat(sep=" ", timespec="milliseconds")
self.log.write(f"{timestamp_str} {message.text}\n")
for line in message.text.split("\n"):
self.log.write(f"{timestamp_str} {line}\n")
# Flush the file: protect a bit in case of crashes, and multiprocess-based
# parallelism.
self.log.flush()
Expand Down
24 changes: 24 additions & 0 deletions tests/integration/test_messages_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def remove_control_characters(string: str) -> str:
string.replace(printer.ANSI_CLEAR_LINE_TO_END, "")
.replace(printer.ANSI_HIDE_CURSOR, "")
.replace(printer.ANSI_SHOW_CURSOR, "")
.replace(printer.ANSI_RESET, "")
)


Expand Down Expand Up @@ -1186,6 +1187,29 @@ def test_error_multiline_brief(capsys):
assert_outputs(capsys, emit, expected_err=expected, expected_log=expected)


@pytest.mark.parametrize("output_is_terminal", [True])
def test_multiline_permanent_progress_overwrites_temporary_progress(capsys):
"""A permanent multiline progress should fully overwrite a previous temporary one."""
emit = Emitter()
emit.init(EmitterMode.BRIEF, "testapp", GREETING)

emit.progress("foo foo foo", permanent=False)
emit.progress("bar\nbar", permanent=True)
emit.ended_ok()

expected_err = [
Line("foo foo foo", permanent=False),
Line("bar", permanent=True),
Line("bar", permanent=True),
]
expected_log = [
Line("foo foo foo"),
Line("bar"),
Line("bar"),
]
assert_outputs(capsys, emit, expected_err=expected_err, expected_log=expected_log)


@pytest.mark.parametrize(
"mode",
[
Expand Down
Loading