From e43cf7bf4e49295cc1a168e6943bf6769c5d0fed Mon Sep 17 00:00:00 2001 From: Gertjan van Zwieten Date: Tue, 17 Mar 2026 10:34:35 +0100 Subject: [PATCH 1/3] Ruff format, check --- docs/conf.py | 25 +- tests.py | 829 +++++++++++++++++++++-------------------- treelog/__init__.py | 53 ++- treelog/_data.py | 12 +- treelog/_filter.py | 13 +- treelog/_html.py | 95 +++-- treelog/_logging.py | 9 +- treelog/_null.py | 1 - treelog/_path.py | 16 +- treelog/_record.py | 47 ++- treelog/_richoutput.py | 46 ++- treelog/_state.py | 48 +-- treelog/_stdout.py | 10 +- treelog/_tee.py | 6 +- treelog/iter.py | 484 ++++++++++++++++++------ treelog/proto.py | 8 +- 16 files changed, 1022 insertions(+), 680 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index c59af31..667a5d8 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -6,27 +6,28 @@ import sys import os -sys.path.insert(0, os.path.abspath('..')) + +sys.path.insert(0, os.path.abspath("..")) # -- Project information ----------------------------------------------------- -project = 'Treelog' -copyright = '2018, Evalf' -author = 'Evalf' +project = "Treelog" +copyright = "2018, Evalf" +author = "Evalf" # -- General configuration --------------------------------------------------- extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.autosummary', - 'sphinx.ext.doctest', - 'sphinx.ext.napoleon', + "sphinx.ext.autodoc", + "sphinx.ext.autosummary", + "sphinx.ext.doctest", + "sphinx.ext.napoleon", ] -master_doc = 'index' +master_doc = "index" # -- Options for HTML output ------------------------------------------------- -html_theme = 'default' -html_favicon = 'favicon.ico' -html_static_path = ['_static'] +html_theme = "default" +html_favicon = "favicon.ico" +html_static_path = ["_static"] diff --git a/tests.py b/tests.py index 7ad1829..b858e12 100644 --- a/tests.py +++ b/tests.py @@ -33,42 +33,41 @@ @treelog.withcontext def generate_test(): - 'decorated function for unit testing' + "decorated function for unit testing" - with treelog.warningfile('test.dat', 'wb') as f: - f.write(b'test3') + with treelog.warningfile("test.dat", "wb") as f: + f.write(b"test3") def generate(): - 'generate log events for unit testing' + "generate log events for unit testing" - treelog.user('my message') - with treelog.infofile('test.dat', 'w') as f: - f.write('test1') - with treelog.context('my context'): - with treelog.iter.plain('iter', 'abc') as items: + treelog.user("my message") + with treelog.infofile("test.dat", "w") as f: + f.write("test1") + with treelog.context("my context"): + with treelog.iter.plain("iter", "abc") as items: for c in items: treelog.info(c) - with treelog.context('empty'): + with treelog.context("empty"): pass - treelog.error('multiple..\n ..lines') - with treelog.userfile('test.dat', 'wb') as f: - treelog.info('generating') - f.write(b'test2') + treelog.error("multiple..\n ..lines") + with treelog.userfile("test.dat", "wb") as f: + treelog.info("generating") + f.write(b"test2") generate_test() - with treelog.context('context step={}', 0) as format: - treelog.info('foo') + with treelog.context("context step={}", 0) as format: + treelog.info("foo") format(1) - treelog.info('bar') - treelog.errordata('same.dat', b'test3') - with treelog.debugfile('dbg.jpg', 'wb', type='image/jpg') as f: - f.write(b'test4') - treelog.debug('dbg') - treelog.warning('warn') + treelog.info("bar") + treelog.errordata("same.dat", b"test3") + with treelog.debugfile("dbg.jpg", "wb", type="image/jpg") as f: + f.write(b"test4") + treelog.debug("dbg") + treelog.warning("warn") class StdoutLog(unittest.TestCase): - def test_output(self): f = io.StringIO() with treelog.set(treelog.StdoutLog(f)): @@ -76,27 +75,28 @@ def test_output(self): self.check_output(f) def check_output(self, f): - self.assertEqual(f.getvalue(), - 'my message\n' - 'test.dat [5 bytes]\n' - 'my context > iter 1 > a\n' - 'my context > iter 2 > b\n' - 'my context > iter 3 > c\n' - 'my context > multiple..\n' - ' > ..lines\n' - 'my context > test.dat > generating\n' - 'my context > test.dat [5 bytes]\n' - 'generate_test > test.dat [5 bytes]\n' - 'context step=0 > foo\n' - 'context step=1 > bar\n' - 'same.dat [5 bytes]\n' - 'dbg.jpg [image/jpg; 5 bytes]\n' - 'dbg\n' - 'warn\n') + self.assertEqual( + f.getvalue(), + "my message\n" + "test.dat [5 bytes]\n" + "my context > iter 1 > a\n" + "my context > iter 2 > b\n" + "my context > iter 3 > c\n" + "my context > multiple..\n" + " > ..lines\n" + "my context > test.dat > generating\n" + "my context > test.dat [5 bytes]\n" + "generate_test > test.dat [5 bytes]\n" + "context step=0 > foo\n" + "context step=1 > bar\n" + "same.dat [5 bytes]\n" + "dbg.jpg [image/jpg; 5 bytes]\n" + "dbg\n" + "warn\n", + ) class RichOutputLog(unittest.TestCase): - def test_output(self): f = io.StringIO() with treelog.set(treelog.RichOutputLog(f)): @@ -104,47 +104,48 @@ def test_output(self): self.check_output(f) def check_output(self, f): - self.assertEqual(f.getvalue(), - '\x1b[1;34mmy message\x1b[0m\n' - 'test.dat > ' - '\r\x1b[K' - '\x1b[1mtest.dat [5 bytes]\x1b[0m\n' - 'my context > ' - 'iter 0 ' - '> \x1b[4D1 > ' - '\x1b[1ma\x1b[0m\nmy context > iter 1 > ' - '\x1b[4D2 > ' - '\x1b[1mb\x1b[0m\nmy context > iter 2 > ' - '\x1b[4D3 > ' - '\x1b[1mc\x1b[0m\nmy context > iter 3 > ' - '\x1b[9D\x1b[K' - 'empty > ' - '\x1b[8D\x1b[K' - '\x1b[1;31mmultiple..\x1b[0m\n > \x1b[1;31m ..lines\x1b[0m\nmy context > test.dat > ' - '\x1b[1mgenerating\x1b[0m\nmy context > test.dat > ' - '\x1b[11D\x1b[K' - '\x1b[1;34mtest.dat [5 bytes]\x1b[0m\nmy context > ' - '\r\x1b[Kgenerate_test > test.dat > ' - '\x1b[11D\x1b[K' - '\x1b[1;35mtest.dat [5 bytes]\x1b[0m\ngenerate_test > ' - '\r\x1b[K' - 'context step=0 > ' - '\x1b[1mfoo\x1b[0m\n' - 'context step=0 > ' - '\x1b[4D1 > ' - '\x1b[1mbar\x1b[0m\n' - 'context step=1 > ' - '\r\x1b[K' - '\x1b[1;31msame.dat [5 bytes]\x1b[0m\n' - 'dbg.jpg > ' - '\r\x1b[K' - '\x1b[1;30mdbg.jpg [image/jpg; 5 bytes]\x1b[0m\n' - '\x1b[1;30mdbg\x1b[0m\n' - '\x1b[1;35mwarn\x1b[0m\n') + self.assertEqual( + f.getvalue(), + "\x1b[1;34mmy message\x1b[0m\n" + "test.dat > " + "\r\x1b[K" + "\x1b[1mtest.dat [5 bytes]\x1b[0m\n" + "my context > " + "iter 0 " + "> \x1b[4D1 > " + "\x1b[1ma\x1b[0m\nmy context > iter 1 > " + "\x1b[4D2 > " + "\x1b[1mb\x1b[0m\nmy context > iter 2 > " + "\x1b[4D3 > " + "\x1b[1mc\x1b[0m\nmy context > iter 3 > " + "\x1b[9D\x1b[K" + "empty > " + "\x1b[8D\x1b[K" + "\x1b[1;31mmultiple..\x1b[0m\n > \x1b[1;31m ..lines\x1b[0m\nmy context > test.dat > " + "\x1b[1mgenerating\x1b[0m\nmy context > test.dat > " + "\x1b[11D\x1b[K" + "\x1b[1;34mtest.dat [5 bytes]\x1b[0m\nmy context > " + "\r\x1b[Kgenerate_test > test.dat > " + "\x1b[11D\x1b[K" + "\x1b[1;35mtest.dat [5 bytes]\x1b[0m\ngenerate_test > " + "\r\x1b[K" + "context step=0 > " + "\x1b[1mfoo\x1b[0m\n" + "context step=0 > " + "\x1b[4D1 > " + "\x1b[1mbar\x1b[0m\n" + "context step=1 > " + "\r\x1b[K" + "\x1b[1;31msame.dat [5 bytes]\x1b[0m\n" + "dbg.jpg > " + "\r\x1b[K" + "\x1b[1;30mdbg.jpg [image/jpg; 5 bytes]\x1b[0m\n" + "\x1b[1;30mdbg\x1b[0m\n" + "\x1b[1;35mwarn\x1b[0m\n", + ) class DataLog(unittest.TestCase): - def test_output(self): with tempfile.TemporaryDirectory() as tmpdir: with treelog.set(treelog.DataLog(tmpdir)): @@ -152,118 +153,127 @@ def test_output(self): self.check_output(tmpdir) def check_output(self, tmpdir): - self.assertEqual(set(os.listdir(tmpdir)), { - 'test.dat', 'test-1.dat', 'test-2.dat', 'same.dat', 'dbg.jpg'}) - with open(os.path.join(tmpdir, 'test.dat'), 'r') as f: - self.assertEqual(f.read(), 'test1') - with open(os.path.join(tmpdir, 'test-1.dat'), 'rb') as f: - self.assertEqual(f.read(), b'test2') - with open(os.path.join(tmpdir, 'test-2.dat'), 'rb') as f: - self.assertEqual(f.read(), b'test3') - with open(os.path.join(tmpdir, 'same.dat'), 'rb') as f: - self.assertEqual(f.read(), b'test3') - with open(os.path.join(tmpdir, 'dbg.jpg'), 'r') as f: - self.assertEqual(f.read(), 'test4') - - @unittest.skipIf(not _path.supports_fd, 'dir_fd not supported on platform') + self.assertEqual( + set(os.listdir(tmpdir)), + {"test.dat", "test-1.dat", "test-2.dat", "same.dat", "dbg.jpg"}, + ) + with open(os.path.join(tmpdir, "test.dat"), "r") as f: + self.assertEqual(f.read(), "test1") + with open(os.path.join(tmpdir, "test-1.dat"), "rb") as f: + self.assertEqual(f.read(), b"test2") + with open(os.path.join(tmpdir, "test-2.dat"), "rb") as f: + self.assertEqual(f.read(), b"test3") + with open(os.path.join(tmpdir, "same.dat"), "rb") as f: + self.assertEqual(f.read(), b"test3") + with open(os.path.join(tmpdir, "dbg.jpg"), "r") as f: + self.assertEqual(f.read(), "test4") + + @unittest.skipIf(not _path.supports_fd, "dir_fd not supported on platform") def test_move_outdir(self): with tempfile.TemporaryDirectory() as tmpdir: - outdira = os.path.join(tmpdir, 'a') - outdirb = os.path.join(tmpdir, 'b') + outdira = os.path.join(tmpdir, "a") + outdirb = os.path.join(tmpdir, "b") log = treelog.DataLog(outdira) os.rename(outdira, outdirb) os.mkdir(outdira) - log.write(Data('dat', b''), level=1) - self.assertEqual(os.listdir(outdirb), ['dat']) + log.write(Data("dat", b""), level=1) + self.assertEqual(os.listdir(outdirb), ["dat"]) self.assertEqual(os.listdir(outdira), []) class HtmlLog(unittest.TestCase): - def test_output(self): with tempfile.TemporaryDirectory() as tmpdir: - with treelog.HtmlLog(tmpdir, title='test') as htmllog, treelog.set(htmllog): + with treelog.HtmlLog(tmpdir, title="test") as htmllog, treelog.set(htmllog): generate() self.check_output(tmpdir, htmllog.filename) def check_output(self, tmpdir, filename): - tests = ['b444ac06613fc8d63795be9ad0beaf55011936ac.dat', '109f4b3c50d7b0df729d299bc6f8e9ef9066971f.dat', - '3ebfa301dc59196f18593c45e519287a23297589.dat', '1ff2b3704aede04eecb51e50ca698efd50a1379b.jpg'] - self.assertEqual(filename, 'log.html') - self.assertGreater(set(os.listdir(tmpdir)), {'log.html', *tests}) - with open(os.path.join(tmpdir, 'log.html'), 'r') as f: + tests = [ + "b444ac06613fc8d63795be9ad0beaf55011936ac.dat", + "109f4b3c50d7b0df729d299bc6f8e9ef9066971f.dat", + "3ebfa301dc59196f18593c45e519287a23297589.dat", + "1ff2b3704aede04eecb51e50ca698efd50a1379b.jpg", + ] + self.assertEqual(filename, "log.html") + self.assertGreater(set(os.listdir(tmpdir)), {"log.html", *tests}) + with open(os.path.join(tmpdir, "log.html"), "r") as f: lines = f.readlines() - self.assertIn('\n', lines) - self.assertEqual(lines[lines.index('\n'):], [ - '\n', - '\n', - '
\n', - '
my message
\n', - '\n', - '
my context
\n', - '
iter 1
\n', - '
a
\n', - '
\n', - '
iter 2
\n', - '
b
\n', - '
\n', - '
iter 3
\n', - '
c
\n', - '
\n', - '
multiple..\n', - ' ..lines
\n', - '
test.dat
\n', - '
generating
\n', - '
\n', - '\n', - '
\n', - '
generate_test
\n', - '\n', - '
\n', - '
context step=0
\n', - '
foo
\n', - '
\n', - '
context step=1
\n', - '
bar
\n', - '
\n', - '\n', - '\n', - '
dbg
\n', - '
warn
\n', - '
\n']) + self.assertIn("\n", lines) + self.assertEqual( + lines[lines.index("\n") :], + [ + "\n", + '\n', + '
\n', + '
my message
\n', + '\n', + '
my context
\n', + '
iter 1
\n', + '
a
\n', + '
\n', + '
iter 2
\n', + '
b
\n', + '
\n', + '
iter 3
\n', + '
c
\n', + '
\n', + '
multiple..\n', + " ..lines
\n", + '
test.dat
\n', + '
generating
\n', + '
\n', + '\n', + '
\n', + '
generate_test
\n', + '\n', + '
\n', + '
context step=0
\n', + '
foo
\n', + '
\n', + '
context step=1
\n', + '
bar
\n', + '
\n', + '\n', + '\n', + '
dbg
\n', + '
warn
\n', + "
\n", + ], + ) for i, test in enumerate(tests, 1): - with open(os.path.join(tmpdir, test), 'rb') as f: - self.assertEqual(f.read(), b'test%i' % i) + with open(os.path.join(tmpdir, test), "rb") as f: + self.assertEqual(f.read(), b"test%i" % i) - @unittest.skipIf(not _path.supports_fd, 'dir_fd not supported on platform') + @unittest.skipIf(not _path.supports_fd, "dir_fd not supported on platform") def test_move_outdir(self): with tempfile.TemporaryDirectory() as tmpdir: - outdira = os.path.join(tmpdir, 'a') - outdirb = os.path.join(tmpdir, 'b') + outdira = os.path.join(tmpdir, "a") + outdirb = os.path.join(tmpdir, "b") with treelog.HtmlLog(outdira) as log: os.rename(outdira, outdirb) os.mkdir(outdira) - log.write(Data('dat', b''), Level.info) + log.write(Data("dat", b""), Level.info) self.assertIn( - 'da39a3ee5e6b4b0d3255bfef95601890afd80709', os.listdir(outdirb)) + "da39a3ee5e6b4b0d3255bfef95601890afd80709", os.listdir(outdirb) + ) def test_filename_sequence(self): with tempfile.TemporaryDirectory() as tmpdir: - with treelog.HtmlLog(tmpdir) as log: + with treelog.HtmlLog(tmpdir): pass - self.assertTrue(os.path.exists(os.path.join(tmpdir, 'log.html'))) - with treelog.HtmlLog(tmpdir) as log: + self.assertTrue(os.path.exists(os.path.join(tmpdir, "log.html"))) + with treelog.HtmlLog(tmpdir): pass - self.assertTrue(os.path.exists(os.path.join(tmpdir, 'log-1.html'))) - with treelog.HtmlLog(tmpdir) as log: + self.assertTrue(os.path.exists(os.path.join(tmpdir, "log-1.html"))) + with treelog.HtmlLog(tmpdir): pass - self.assertTrue(os.path.exists(os.path.join(tmpdir, 'log-2.html'))) + self.assertTrue(os.path.exists(os.path.join(tmpdir, "log-2.html"))) class RecordLog(unittest.TestCase): - simplify = False def test_output(self): @@ -271,134 +281,140 @@ def test_output(self): with treelog.set(recordlog): generate() self.check_output(recordlog._messages) - with self.subTest('replay to StdoutLog'): + with self.subTest("replay to StdoutLog"): f = io.StringIO() recordlog.replay(treelog.StdoutLog(f)) StdoutLog.check_output(self, f) - with self.subTest('replay to DataLog'), tempfile.TemporaryDirectory() as tmpdir: + with self.subTest("replay to DataLog"), tempfile.TemporaryDirectory() as tmpdir: recordlog.replay(treelog.DataLog(tmpdir)) DataLog.check_output(self, tmpdir) - with self.subTest('replay to HtmlLog'), tempfile.TemporaryDirectory() as tmpdir: - with treelog.HtmlLog(tmpdir, title='test') as htmllog: + with self.subTest("replay to HtmlLog"), tempfile.TemporaryDirectory() as tmpdir: + with treelog.HtmlLog(tmpdir, title="test") as htmllog: recordlog.replay(htmllog) HtmlLog.check_output(self, tmpdir, htmllog.filename) if not self.simplify: - with self.subTest('replay to RichOutputLog'): + with self.subTest("replay to RichOutputLog"): f = io.StringIO() recordlog.replay(treelog.RichOutputLog(f)) RichOutputLog.check_output(self, f) def check_output(self, messages): - self.assertEqual(messages, [ - ('write', 'my message', Level.user), - ('pushcontext', 'test.dat'), - ('popcontext',), - ('write', Data('test.dat', b'test1'), Level.info), - ('pushcontext', 'my context'), - ('pushcontext', 'iter 0'), - ('recontext', 'iter 1'), - ('write', 'a', Level.info), - ('recontext', 'iter 2'), - ('write', 'b', Level.info), - ('recontext', 'iter 3'), - ('write', 'c', Level.info), - ('popcontext',), - ('pushcontext', 'empty'), - ('popcontext',), - ('write', 'multiple..\n ..lines', Level.error), - ('pushcontext', 'test.dat'), - ('write', 'generating', Level.info), - ('popcontext',), - ('write', Data('test.dat', b'test2'), Level.user), - ('popcontext',), - ('pushcontext', 'generate_test'), - ('pushcontext', 'test.dat'), - ('popcontext',), - ('write', Data('test.dat', b'test3'), Level.warning), - ('popcontext',), - ('pushcontext', 'context step=0'), - ('write', 'foo', Level.info), - ('recontext', 'context step=1'), - ('write', 'bar', Level.info), - ('popcontext',), - ('write', Data('same.dat', b'test3'), Level.error), - ('pushcontext', 'dbg.jpg'), - ('popcontext',), - ('write', Data('dbg.jpg', b'test4', type='image/jpg'), Level.debug), - ('write', 'dbg', Level.debug), - ('write', 'warn', Level.warning)]) + self.assertEqual( + messages, + [ + ("write", "my message", Level.user), + ("pushcontext", "test.dat"), + ("popcontext",), + ("write", Data("test.dat", b"test1"), Level.info), + ("pushcontext", "my context"), + ("pushcontext", "iter 0"), + ("recontext", "iter 1"), + ("write", "a", Level.info), + ("recontext", "iter 2"), + ("write", "b", Level.info), + ("recontext", "iter 3"), + ("write", "c", Level.info), + ("popcontext",), + ("pushcontext", "empty"), + ("popcontext",), + ("write", "multiple..\n ..lines", Level.error), + ("pushcontext", "test.dat"), + ("write", "generating", Level.info), + ("popcontext",), + ("write", Data("test.dat", b"test2"), Level.user), + ("popcontext",), + ("pushcontext", "generate_test"), + ("pushcontext", "test.dat"), + ("popcontext",), + ("write", Data("test.dat", b"test3"), Level.warning), + ("popcontext",), + ("pushcontext", "context step=0"), + ("write", "foo", Level.info), + ("recontext", "context step=1"), + ("write", "bar", Level.info), + ("popcontext",), + ("write", Data("same.dat", b"test3"), Level.error), + ("pushcontext", "dbg.jpg"), + ("popcontext",), + ("write", Data("dbg.jpg", b"test4", type="image/jpg"), Level.debug), + ("write", "dbg", Level.debug), + ("write", "warn", Level.warning), + ], + ) def test_replay_in_current(self): recordlog = treelog.RecordLog(simplify=self.simplify) - recordlog.write('test', level=Level.info) - with treelog.set(treelog.LoggingLog()), self.assertLogs('nutils'): + recordlog.write("test", level=Level.info) + with treelog.set(treelog.LoggingLog()), self.assertLogs("nutils"): recordlog.replay() class SimplifiedRecordLog(RecordLog): - simplify = True def check_output(self, messages): - self.assertEqual(messages, [ - ('write', 'my message', Level.user), - ('write', Data('test.dat', b'test1'), Level.info), - ('pushcontext', 'my context'), - ('pushcontext', 'iter 1'), - ('write', 'a', Level.info), - ('recontext', 'iter 2'), - ('write', 'b', Level.info), - ('recontext', 'iter 3'), - ('write', 'c', Level.info), - ('popcontext',), - ('write', 'multiple..\n ..lines', Level.error), - ('pushcontext', 'test.dat'), - ('write', 'generating', Level.info), - ('popcontext',), - ('write', Data('test.dat', b'test2'), Level.user), - ('recontext', 'generate_test'), - ('write', Data('test.dat', b'test3'), Level.warning), - ('recontext', 'context step=0'), - ('write', 'foo', Level.info), - ('recontext', 'context step=1'), - ('write', 'bar', Level.info), - ('popcontext',), - ('write', Data('same.dat', b'test3'), Level.error), - ('write', Data('dbg.jpg', b'test4', type='image/jpg'), Level.debug), - ('write', 'dbg', Level.debug), - ('write', 'warn', Level.warning)]) + self.assertEqual( + messages, + [ + ("write", "my message", Level.user), + ("write", Data("test.dat", b"test1"), Level.info), + ("pushcontext", "my context"), + ("pushcontext", "iter 1"), + ("write", "a", Level.info), + ("recontext", "iter 2"), + ("write", "b", Level.info), + ("recontext", "iter 3"), + ("write", "c", Level.info), + ("popcontext",), + ("write", "multiple..\n ..lines", Level.error), + ("pushcontext", "test.dat"), + ("write", "generating", Level.info), + ("popcontext",), + ("write", Data("test.dat", b"test2"), Level.user), + ("recontext", "generate_test"), + ("write", Data("test.dat", b"test3"), Level.warning), + ("recontext", "context step=0"), + ("write", "foo", Level.info), + ("recontext", "context step=1"), + ("write", "bar", Level.info), + ("popcontext",), + ("write", Data("same.dat", b"test3"), Level.error), + ("write", Data("dbg.jpg", b"test4", type="image/jpg"), Level.debug), + ("write", "dbg", Level.debug), + ("write", "warn", Level.warning), + ], + ) class TeeLog(unittest.TestCase): - def test_output(self): f = io.StringIO() with tempfile.TemporaryDirectory() as tmpdir: datalog = treelog.DataLog(tmpdir) recordlog = treelog.RecordLog(simplify=False) richoutputlog = treelog.RichOutputLog(f) - with treelog.set(treelog.TeeLog(richoutputlog, treelog.TeeLog(datalog, recordlog))): + with treelog.set( + treelog.TeeLog(richoutputlog, treelog.TeeLog(datalog, recordlog)) + ): generate() - with self.subTest('DataLog'): + with self.subTest("DataLog"): DataLog.check_output(self, tmpdir) - with self.subTest('RecordLog'): + with self.subTest("RecordLog"): RecordLog.check_output(self, recordlog._messages) - with self.subTest('RichOutputLog'): + with self.subTest("RichOutputLog"): RichOutputLog.check_output(self, f) def test_open_datalog_datalog_samedir(self): with tempfile.TemporaryDirectory() as tmpdir: - teelog = treelog.TeeLog(treelog.DataLog( - tmpdir), treelog.DataLog(tmpdir)) - teelog.write(Data('test', b'test'), Level.info) - with open(os.path.join(tmpdir, 'test'), 'rb') as f: - self.assertEqual(f.read(), b'test') - with open(os.path.join(tmpdir, 'test-1'), 'rb') as f: - self.assertEqual(f.read(), b'test') + teelog = treelog.TeeLog(treelog.DataLog(tmpdir), treelog.DataLog(tmpdir)) + teelog.write(Data("test", b"test"), Level.info) + with open(os.path.join(tmpdir, "test"), "rb") as f: + self.assertEqual(f.read(), b"test") + with open(os.path.join(tmpdir, "test-1"), "rb") as f: + self.assertEqual(f.read(), b"test") class FilterMinLog(unittest.TestCase): - def test_output(self): recordlog = treelog.RecordLog() with treelog.set(treelog.FilterLog(recordlog, minlevel=Level.user)): @@ -406,20 +422,23 @@ def test_output(self): self.check_output(recordlog._messages) def check_output(self, messages): - self.assertEqual(messages, [ - ('write', 'my message', Level.user), - ('pushcontext', 'my context'), - ('write', 'multiple..\n ..lines', Level.error), - ('write', Data('test.dat', b'test2'), Level.user), - ('recontext', 'generate_test'), - ('write', Data('test.dat', b'test3'), Level.warning), - ('popcontext',), - ('write', Data('same.dat', b'test3'), Level.error), - ('write', 'warn', Level.warning)]) + self.assertEqual( + messages, + [ + ("write", "my message", Level.user), + ("pushcontext", "my context"), + ("write", "multiple..\n ..lines", Level.error), + ("write", Data("test.dat", b"test2"), Level.user), + ("recontext", "generate_test"), + ("write", Data("test.dat", b"test3"), Level.warning), + ("popcontext",), + ("write", Data("same.dat", b"test3"), Level.error), + ("write", "warn", Level.warning), + ], + ) class FilterMaxLog(unittest.TestCase): - def test_output(self): recordlog = treelog.RecordLog() with treelog.set(treelog.FilterLog(recordlog, maxlevel=Level.user)): @@ -427,88 +446,99 @@ def test_output(self): self.check_output(recordlog._messages) def check_output(self, messages): - self.assertEqual(messages, [ - ('write', 'my message', Level.user), - ('write', Data('test.dat', b'test1'), Level.info), - ('pushcontext', 'my context'), - ('pushcontext', 'iter 1'), - ('write', 'a', Level.info), - ('recontext', 'iter 2'), - ('write', 'b', Level.info), - ('recontext', 'iter 3'), - ('write', 'c', Level.info), - ('recontext', 'test.dat'), - ('write', 'generating', Level.info), - ('popcontext',), - ('write', Data('test.dat', b'test2'), Level.user), - ('recontext', 'context step=0'), - ('write', 'foo', Level.info), - ('recontext', 'context step=1'), - ('write', 'bar', Level.info), - ('popcontext',), - ('write', Data('dbg.jpg', b'test4', type='image/jpg'), Level.debug), - ('write', 'dbg', Level.debug)]) + self.assertEqual( + messages, + [ + ("write", "my message", Level.user), + ("write", Data("test.dat", b"test1"), Level.info), + ("pushcontext", "my context"), + ("pushcontext", "iter 1"), + ("write", "a", Level.info), + ("recontext", "iter 2"), + ("write", "b", Level.info), + ("recontext", "iter 3"), + ("write", "c", Level.info), + ("recontext", "test.dat"), + ("write", "generating", Level.info), + ("popcontext",), + ("write", Data("test.dat", b"test2"), Level.user), + ("recontext", "context step=0"), + ("write", "foo", Level.info), + ("recontext", "context step=1"), + ("write", "bar", Level.info), + ("popcontext",), + ("write", Data("dbg.jpg", b"test4", type="image/jpg"), Level.debug), + ("write", "dbg", Level.debug), + ], + ) class FilterMinMaxLog(unittest.TestCase): - def test_output(self): recordlog = treelog.RecordLog() - with treelog.set(treelog.FilterLog(recordlog, minlevel=Level.info, maxlevel=Level.warning)): + with treelog.set( + treelog.FilterLog(recordlog, minlevel=Level.info, maxlevel=Level.warning) + ): generate() self.check_output(recordlog._messages) def check_output(self, messages): - self.assertEqual(messages, [ - ('write', 'my message', Level.user), - ('write', Data('test.dat', b'test1'), Level.info), - ('pushcontext', 'my context'), - ('pushcontext', 'iter 1'), - ('write', 'a', Level.info), - ('recontext', 'iter 2'), - ('write', 'b', Level.info), - ('recontext', 'iter 3'), - ('write', 'c', Level.info), - ('recontext', 'test.dat'), - ('write', 'generating', Level.info), - ('popcontext',), - ('write', Data('test.dat', b'test2'), Level.user), - ('recontext', 'generate_test'), - ('write', Data('test.dat', b'test3'), Level.warning), - ('recontext', 'context step=0'), - ('write', 'foo', Level.info), - ('recontext', 'context step=1'), - ('write', 'bar', Level.info), - ('popcontext',), - ('write', 'warn', Level.warning)]) + self.assertEqual( + messages, + [ + ("write", "my message", Level.user), + ("write", Data("test.dat", b"test1"), Level.info), + ("pushcontext", "my context"), + ("pushcontext", "iter 1"), + ("write", "a", Level.info), + ("recontext", "iter 2"), + ("write", "b", Level.info), + ("recontext", "iter 3"), + ("write", "c", Level.info), + ("recontext", "test.dat"), + ("write", "generating", Level.info), + ("popcontext",), + ("write", Data("test.dat", b"test2"), Level.user), + ("recontext", "generate_test"), + ("write", Data("test.dat", b"test3"), Level.warning), + ("recontext", "context step=0"), + ("write", "foo", Level.info), + ("recontext", "context step=1"), + ("write", "bar", Level.info), + ("popcontext",), + ("write", "warn", Level.warning), + ], + ) class LoggingLog(unittest.TestCase): - def test_output(self): - with self.assertLogs('nutils') as cm, treelog.set(treelog.LoggingLog()): + with self.assertLogs("nutils") as cm, treelog.set(treelog.LoggingLog()): generate() self.check_output(cm.output) def check_output(self, output): - self.assertEqual(output, [ - 'Level 25:nutils:my message', - 'INFO:nutils:test.dat [5 bytes]', - 'INFO:nutils:my context > iter 1 > a', - 'INFO:nutils:my context > iter 2 > b', - 'INFO:nutils:my context > iter 3 > c', - 'ERROR:nutils:my context > multiple..\n ..lines', - 'INFO:nutils:my context > test.dat > generating', - 'Level 25:nutils:my context > test.dat [5 bytes]', - 'WARNING:nutils:generate_test > test.dat [5 bytes]', - 'INFO:nutils:context step=0 > foo', - 'INFO:nutils:context step=1 > bar', - 'ERROR:nutils:same.dat [5 bytes]', - 'WARNING:nutils:warn']) + self.assertEqual( + output, + [ + "Level 25:nutils:my message", + "INFO:nutils:test.dat [5 bytes]", + "INFO:nutils:my context > iter 1 > a", + "INFO:nutils:my context > iter 2 > b", + "INFO:nutils:my context > iter 3 > c", + "ERROR:nutils:my context > multiple..\n ..lines", + "INFO:nutils:my context > test.dat > generating", + "Level 25:nutils:my context > test.dat [5 bytes]", + "WARNING:nutils:generate_test > test.dat [5 bytes]", + "INFO:nutils:context step=0 > foo", + "INFO:nutils:context step=1 > bar", + "ERROR:nutils:same.dat [5 bytes]", + "WARNING:nutils:warn", + ], + ) class NullLog(unittest.TestCase): - def test_output(self): with treelog.set(treelog.NullLog()): generate() @@ -519,7 +549,6 @@ def test_disable(self): class Iter(unittest.TestCase): - def setUp(self): self.recordlog = treelog.RecordLog(simplify=False) c = treelog.set(self.recordlog) @@ -530,125 +559,137 @@ def assertMessages(self, *msg): self.assertEqual(self.recordlog._messages, list(msg)) def test_context(self): - with treelog.iter.plain('test', enumerate('abc')) as myiter: + with treelog.iter.plain("test", enumerate("abc")) as myiter: for i, c in myiter: - self.assertEqual(c, 'abc'[i]) - treelog.info('hi') + self.assertEqual(c, "abc"[i]) + treelog.info("hi") self.assertMessages( - ('pushcontext', 'test 0'), - ('recontext', 'test 1'), - ('write', 'hi', Level.info), - ('recontext', 'test 2'), - ('write', 'hi', Level.info), - ('recontext', 'test 3'), - ('write', 'hi', Level.info), - ('popcontext',)) + ("pushcontext", "test 0"), + ("recontext", "test 1"), + ("write", "hi", Level.info), + ("recontext", "test 2"), + ("write", "hi", Level.info), + ("recontext", "test 3"), + ("write", "hi", Level.info), + ("popcontext",), + ) def test_nocontext(self): - for i, c in treelog.iter.plain('test', enumerate('abc')): - self.assertEqual(c, 'abc'[i]) - treelog.info('hi') + for i, c in treelog.iter.plain("test", enumerate("abc")): + self.assertEqual(c, "abc"[i]) + treelog.info("hi") self.assertMessages( - ('pushcontext', 'test 0'), - ('recontext', 'test 1'), - ('write', 'hi', Level.info), - ('recontext', 'test 2'), - ('write', 'hi', Level.info), - ('recontext', 'test 3'), - ('write', 'hi', Level.info), - ('popcontext',)) + ("pushcontext", "test 0"), + ("recontext", "test 1"), + ("write", "hi", Level.info), + ("recontext", "test 2"), + ("write", "hi", Level.info), + ("recontext", "test 3"), + ("write", "hi", Level.info), + ("popcontext",), + ) def test_break_entered(self): - with warnings.catch_warnings(record=True) as w, treelog.iter.plain('test', [1, 2, 3]) as myiter: + with ( + warnings.catch_warnings(record=True) as w, + treelog.iter.plain("test", [1, 2, 3]) as myiter, + ): for item in myiter: self.assertEqual(item, 1) - treelog.info('hi') + treelog.info("hi") break gc.collect() self.assertEqual(w, []) self.assertMessages( - ('pushcontext', 'test 0'), - ('recontext', 'test 1'), - ('write', 'hi', Level.info), - ('popcontext',)) + ("pushcontext", "test 0"), + ("recontext", "test 1"), + ("write", "hi", Level.info), + ("popcontext",), + ) def test_break_notentered(self): with self.assertWarns(ResourceWarning): - for item in treelog.iter.plain('test', [1, 2, 3]): + for item in treelog.iter.plain("test", [1, 2, 3]): self.assertEqual(item, 1) - treelog.info('hi') + treelog.info("hi") break gc.collect() self.assertMessages( - ('pushcontext', 'test 0'), - ('recontext', 'test 1'), - ('write', 'hi', Level.info), - ('popcontext',)) + ("pushcontext", "test 0"), + ("recontext", "test 1"), + ("write", "hi", Level.info), + ("popcontext",), + ) def test_multiple(self): - with treelog.iter.plain('test', 'abc', [1, 2]) as items: - self.assertEqual(list(items), [('a', 1), ('b', 2)]) + with treelog.iter.plain("test", "abc", [1, 2]) as items: + self.assertEqual(list(items), [("a", 1), ("b", 2)]) def test_plain(self): - with treelog.iter.plain('test', 'abc') as items: - self.assertEqual(list(items), list('abc')) + with treelog.iter.plain("test", "abc") as items: + self.assertEqual(list(items), list("abc")) self.assertMessages( - ('pushcontext', 'test 0'), - ('recontext', 'test 1'), - ('recontext', 'test 2'), - ('recontext', 'test 3'), - ('popcontext',)) + ("pushcontext", "test 0"), + ("recontext", "test 1"), + ("recontext", "test 2"), + ("recontext", "test 3"), + ("popcontext",), + ) def test_plain_withbraces(self): - with treelog.iter.plain('t{es}t', 'abc') as items: - self.assertEqual(list(items), list('abc')) + with treelog.iter.plain("t{es}t", "abc") as items: + self.assertEqual(list(items), list("abc")) self.assertMessages( - ('pushcontext', 't{es}t 0'), - ('recontext', 't{es}t 1'), - ('recontext', 't{es}t 2'), - ('recontext', 't{es}t 3'), - ('popcontext',)) + ("pushcontext", "t{es}t 0"), + ("recontext", "t{es}t 1"), + ("recontext", "t{es}t 2"), + ("recontext", "t{es}t 3"), + ("popcontext",), + ) def test_fraction(self): - with treelog.iter.fraction('test', 'abc') as items: - self.assertEqual(list(items), list('abc')) + with treelog.iter.fraction("test", "abc") as items: + self.assertEqual(list(items), list("abc")) self.assertMessages( - ('pushcontext', 'test 0/3'), - ('recontext', 'test 1/3'), - ('recontext', 'test 2/3'), - ('recontext', 'test 3/3'), - ('popcontext',)) + ("pushcontext", "test 0/3"), + ("recontext", "test 1/3"), + ("recontext", "test 2/3"), + ("recontext", "test 3/3"), + ("popcontext",), + ) def test_percentage(self): - with treelog.iter.percentage('test', 'abc') as items: - self.assertEqual(list(items), list('abc')) + with treelog.iter.percentage("test", "abc") as items: + self.assertEqual(list(items), list("abc")) self.assertMessages( - ('pushcontext', 'test 0%'), - ('recontext', 'test 33%'), - ('recontext', 'test 67%'), - ('recontext', 'test 100%'), - ('popcontext',)) + ("pushcontext", "test 0%"), + ("recontext", "test 33%"), + ("recontext", "test 67%"), + ("recontext", "test 100%"), + ("popcontext",), + ) def test_send(self): def titles(): - a = yield 'value' + a = yield "value" while True: - a = yield 'value={!r}'.format(a) - with treelog.iter.wrap(titles(), 'abc') as items: + a = yield "value={!r}".format(a) + + with treelog.iter.wrap(titles(), "abc") as items: for i, item in enumerate(items): - self.assertEqual(item, 'abc'[i]) - treelog.info('hi') + self.assertEqual(item, "abc"[i]) + treelog.info("hi") self.assertMessages( - ('pushcontext', 'value'), - ('recontext', "value='a'"), - ('recontext', "value='b'"), - ('recontext', "value='c'"), - ('write', 'hi', Level.info), - ('popcontext',)) + ("pushcontext", "value"), + ("recontext", "value='a'"), + ("recontext", "value='b'"), + ("recontext", "value='c'"), + ("write", "hi", Level.info), + ("popcontext",), + ) class DocTest(unittest.TestCase): - def test_docs(self): doctest.testmod(treelog) diff --git a/treelog/__init__.py b/treelog/__init__.py index 3c983ad..0b4ab94 100644 --- a/treelog/__init__.py +++ b/treelog/__init__.py @@ -1,58 +1,57 @@ -'Logging framework that organizes messages in a tree' +"Logging framework that organizes messages in a tree" -__version__ = '2.0' +__version__ = "2.0" from importlib import import_module -_sub_mods = { - 'proto', - 'iter' -} +_sub_mods = {"proto", "iter"} _state_attrs = { - 'set', - 'add', - 'disable', - 'context', - 'withcontext', + "set", + "add", + "disable", + "context", + "withcontext", } _state_funcs = { level + op - for level in ['debug', 'info', 'user', 'warning', 'error'] - for op in ['', 'data', 'file'] + for level in ["debug", "info", "user", "warning", "error"] + for op in ["", "data", "file"] } _log_objs = { - 'DataLog', - 'FilterLog', - 'HtmlLog', - 'LoggingLog', - 'NullLog', - 'RecordLog', - 'RichOutputLog', - 'StdoutLog', - 'TeeLog', + "DataLog", + "FilterLog", + "HtmlLog", + "LoggingLog", + "NullLog", + "RecordLog", + "RichOutputLog", + "StdoutLog", + "TeeLog", } + def __dir__(): return ( - '__version__', + "__version__", *_sub_mods, *_state_attrs, *_state_funcs, *_log_objs, ) + def __getattr__(attr): if attr in _state_attrs: - _state = import_module(f'._state', 'treelog') + _state = import_module("._state", "treelog") obj = getattr(_state, attr) elif attr in _state_funcs: - _state = import_module(f'._state', 'treelog') + _state = import_module("._state", "treelog") obj = _state.partial(attr) elif attr in _log_objs: - m = import_module(f'._{attr[:-3].lower()}', 'treelog') + m = import_module(f"._{attr[:-3].lower()}", "treelog") obj = getattr(m, attr) elif attr in _sub_mods: - obj = import_module(f'.{attr}', 'treelog') + obj = import_module(f".{attr}", "treelog") else: raise AttributeError(attr) globals()[attr] = obj diff --git a/treelog/_data.py b/treelog/_data.py index e51e685..22ca6cf 100644 --- a/treelog/_data.py +++ b/treelog/_data.py @@ -7,9 +7,13 @@ class DataLog: - '''Output only data.''' + """Output only data.""" - def __init__(self, dirpath: str = os.curdir, names: typing.Callable[[str], typing.Iterable[str]] = sequence) -> None: + def __init__( + self, + dirpath: str = os.curdir, + names: typing.Callable[[str], typing.Iterable[str]] = sequence, + ) -> None: self._names = functools.lru_cache(maxsize=32)(names) self._path = makedirs(dirpath) @@ -24,6 +28,8 @@ def recontext(self, title: str) -> None: def write(self, msg, level: Level) -> None: if isinstance(msg, Data): - _, f = non_existent(self._path, self._names(msg.name), lambda p: p.open('xb')) + _, f = non_existent( + self._path, self._names(msg.name), lambda p: p.open("xb") + ) with f: f.write(msg.data) diff --git a/treelog/_filter.py b/treelog/_filter.py index 59eb5b1..76b4661 100644 --- a/treelog/_filter.py +++ b/treelog/_filter.py @@ -4,9 +4,14 @@ class FilterLog: - '''Filter messages based on level.''' - - def __init__(self, baselog: Log, minlevel: typing.Optional[Level] = None, maxlevel: typing.Optional[Level] = None) -> None: + """Filter messages based on level.""" + + def __init__( + self, + baselog: Log, + minlevel: typing.Optional[Level] = None, + maxlevel: typing.Optional[Level] = None, + ) -> None: self._baselog = baselog self._minlevel = minlevel self._maxlevel = maxlevel @@ -21,7 +26,7 @@ def recontext(self, title: str) -> None: self._baselog.recontext(title) def _passthrough(self, level: Level) -> bool: - '''Return True if messages of the given level should pass through.''' + """Return True if messages of the given level should pass through.""" if self._minlevel is not None and level.value < self._minlevel.value: return False if self._maxlevel is not None and level.value > self._maxlevel.value: diff --git a/treelog/_html.py b/treelog/_html.py index 467091b..fad9d4a 100644 --- a/treelog/_html.py +++ b/treelog/_html.py @@ -1,4 +1,3 @@ -import contextlib import hashlib import html import os @@ -13,21 +12,34 @@ class HtmlLog: - '''Output html nested lists.''' - - def __init__(self, dirpath: str, *, filename: str = 'log.html', title: typing.Optional[str] = None, htmltitle: typing.Optional[str] = None, favicon: typing.Optional[str] = None) -> None: + """Output html nested lists.""" + + def __init__( + self, + dirpath: str, + *, + filename: str = "log.html", + title: typing.Optional[str] = None, + htmltitle: typing.Optional[str] = None, + favicon: typing.Optional[str] = None, + ) -> None: self._path = makedirs(dirpath) - self.filename, self._file = non_existent(self._path, sequence(filename), lambda p: p.open('x', encoding='utf-8')) - css = self._write_hash(CSS.encode(), '.css') - js = self._write_hash(JS.encode(), '.js') + self.filename, self._file = non_existent( + self._path, sequence(filename), lambda p: p.open("x", encoding="utf-8") + ) + css = self._write_hash(CSS.encode(), ".css") + js = self._write_hash(JS.encode(), ".js") if title is None: - title = ' '.join(sys.argv) + title = " ".join(sys.argv) if htmltitle is None: htmltitle = html.escape(title) if favicon is None: favicon = FAVICON - self._file.write(HTMLHEAD.format( - title=title, htmltitle=htmltitle, css=css, js=js, favicon=favicon)) + self._file.write( + HTMLHEAD.format( + title=title, htmltitle=htmltitle, css=css, js=js, favicon=favicon + ) + ) # active contexts that are not yet opened as html elements self._unopened = [] # type: typing.List[str] @@ -46,46 +58,61 @@ def recontext(self, title: str) -> None: def write(self, msg, level: Level) -> None: for c in self._unopened: - print('
{}
'.format( - html.escape(c)), file=self._file) + print( + '
{}
'.format( + html.escape(c) + ), + file=self._file, + ) self._unopened.clear() if isinstance(msg, Data): _, ext = os.path.splitext(msg.name) filename = self._write_hash(msg.data, ext) - text = '{name}'.format(href=urllib.parse.quote(filename), name=html.escape(msg.name)) + text = '{name}'.format( + href=urllib.parse.quote(filename), name=html.escape(msg.name) + ) else: text = html.escape(msg) - print('
{}
'.format(level.value, text), file=self._file, flush=True) + print( + '
{}
'.format(level.value, text), + file=self._file, + flush=True, + ) def close(self) -> bool: - if hasattr(self, '_file') and not self._file.closed: + if hasattr(self, "_file") and not self._file.closed: self._file.write(HTMLFOOT) self._file.close() return True else: return False - def __enter__(self) -> 'HtmlLog': + def __enter__(self) -> "HtmlLog": return self - def __exit__(self, t: typing.Optional[typing.Type[BaseException]], value: typing.Optional[BaseException], traceback: typing.Optional[types.TracebackType]) -> None: + def __exit__( + self, + t: typing.Optional[typing.Type[BaseException]], + value: typing.Optional[BaseException], + traceback: typing.Optional[types.TracebackType], + ) -> None: self.close() def __del__(self) -> None: if self.close(): - warnings.warn('unclosed object {!r}'.format(self), ResourceWarning) + warnings.warn("unclosed object {!r}".format(self), ResourceWarning) def _write_hash(self, data, ext): filename = hashlib.sha1(data).hexdigest() + ext try: - with (self._path / filename).open('xb') as f: + with (self._path / filename).open("xb") as f: f.write(data) except FileExistsError: pass return filename -HTMLHEAD = '''\ +HTMLHEAD = """\ @@ -99,13 +126,13 @@ def _write_hash(self, data, ext):
-''' +""" -HTMLFOOT = '''\ +HTMLFOOT = """\
-''' +""" -CSS = '''\ +CSS = """\ body { font-family: monospace; font-size: 12px; } a, a:visited, a:hover { color: inherit; text-decoration: underline; } @@ -192,9 +219,9 @@ def _write_hash(self, data, ext): #theater.overview .plot_container3 { height: calc(100% - 20px); display: flex; align-items: center; justify-content: center; } #theater.overview .plot { background: white; max-width: 100%; max-height: 100%; } #theater.overview .label { position: absolute; width: 100%; left: 0px; right: 0px; bottom: 0px; text-align: center; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } -''' +""" -JS = '''\ +JS = """\ 'use strict'; // LOW LEVEL UTILS @@ -750,10 +777,12 @@ def _write_hash(self, data, ext): apply_state(state); state_control = 'enabled'; }); -''' - -FAVICON = 'data:image/png;base64,' \ - 'iVBORw0KGgoAAAANSUhEUgAAANIAAADSAgMAAABC93bRAAAACVBMVEUAAGcAAAD////NzL25' \ - 'AAAAAXRSTlMAQObYZgAAAFtJREFUaN7t2SEOACEMRcEa7ofh/ldBsJJAS1bO86Ob/MZY9ViN' \ - 'TD0oiqIo6qrOURRFUVRepQ4TRVEURdXVV6MoiqKoV2UJpCiKov7+p1AURVFUWZWiKIqiqI2a' \ - '8O8qJ0n+GP4AAAAASUVORK5CYII=' +""" + +FAVICON = ( + "data:image/png;base64," + "iVBORw0KGgoAAAANSUhEUgAAANIAAADSAgMAAABC93bRAAAACVBMVEUAAGcAAAD////NzL25" + "AAAAAXRSTlMAQObYZgAAAFtJREFUaN7t2SEOACEMRcEa7ofh/ldBsJJAS1bO86Ob/MZY9ViN" + "TD0oiqIo6qrOURRFUVRepQ4TRVEURdXVV6MoiqKoV2UJpCiKov7+p1AURVFUWZWiKIqiqI2a" + "8O8qJ0n+GP4AAAAASUVORK5CYII=" +) diff --git a/treelog/_logging.py b/treelog/_logging.py index 93f6d0f..4258506 100644 --- a/treelog/_logging.py +++ b/treelog/_logging.py @@ -5,12 +5,12 @@ class LoggingLog: - '''Log to Python's built-in logging facility.''' + """Log to Python's built-in logging facility.""" # type: typing.ClassVar[typing.Tuple[int, int, int, int, int]] _levels = logging.DEBUG, logging.INFO, 25, logging.WARNING, logging.ERROR - def __init__(self, name: str = 'nutils') -> None: + def __init__(self, name: str = "nutils") -> None: self._logger = logging.getLogger(name) self.currentcontext = [] # type: typing.List[str] @@ -24,5 +24,6 @@ def recontext(self, title: str) -> None: self.currentcontext[-1] = title def write(self, msg, level: Level, data: typing.Optional[bytes] = None) -> None: - self._logger.log(self._levels[level.value], ' > '.join( - (*self.currentcontext, str(msg)))) + self._logger.log( + self._levels[level.value], " > ".join((*self.currentcontext, str(msg))) + ) diff --git a/treelog/_null.py b/treelog/_null.py index 27f048e..249f87d 100644 --- a/treelog/_null.py +++ b/treelog/_null.py @@ -2,7 +2,6 @@ class NullLog: - def pushcontext(self, title: str) -> None: pass diff --git a/treelog/_path.py b/treelog/_path.py index ee8398c..ac9da2c 100644 --- a/treelog/_path.py +++ b/treelog/_path.py @@ -17,13 +17,13 @@ def makedirs(*pathsegments): def sequence(filename: str) -> typing.Generator[str, None, None]: - '''Generate file names a.b, a-1.b, a-2.b, etc.''' + """Generate file names a.b, a-1.b, a-2.b, etc.""" yield filename splitext = os.path.splitext(filename) i = 1 while True: - yield '-{}'.format(i).join(splitext) + yield "-{}".format(i).join(splitext) i += 1 @@ -41,11 +41,10 @@ def non_existent(path, names, f): # continue to the next name only if the path indeed exists. if not isinstance(path, pathlib.Path) or not (path / name).exists(): raise - raise Exception('names exhausted') + raise Exception("names exhausted") class _FDDirPath: - def __init__(self, dir_fd: int) -> None: self._opener = functools.partial(os.open, dir_fd=dir_fd) self._close = functools.partial(os.close, dir_fd) @@ -59,10 +58,13 @@ def __del__(self) -> None: class _FDFilePath: - def __init__(self, directory, filename): self._directory = directory self._filename = filename - def open(self, mode: str, *, encoding: typing.Optional[str] = None) -> typing.IO[typing.Any]: - return open(self._filename, mode, encoding=encoding, opener=self._directory._opener) + def open( + self, mode: str, *, encoding: typing.Optional[str] = None + ) -> typing.IO[typing.Any]: + return open( + self._filename, mode, encoding=encoding, opener=self._directory._opener + ) diff --git a/treelog/_record.py b/treelog/_record.py index 399ce21..07d9fd7 100644 --- a/treelog/_record.py +++ b/treelog/_record.py @@ -1,12 +1,10 @@ -import contextlib -import tempfile import typing from .proto import Level, Log class RecordLog: - '''Record log messages. + """Record log messages. The recorded messages can be replayed to the logs that are currently active by :meth:`replay`. Typical usage is caching expensive operations: @@ -26,7 +24,7 @@ class RecordLog: .. Note:: Exceptions raised while in a :meth:`Log.context` are not recorded. - ''' + """ def __init__(self, simplify: bool = True): # Replayable log messages. Each entry is a tuple of `(cmd, *args)`, where @@ -37,41 +35,50 @@ def __init__(self, simplify: bool = True): self._fid = 0 # internal file counter def pushcontext(self, title: str) -> None: - if self._simplify and self._messages and self._messages[-1][0] == 'popcontext': - self._messages[-1] = 'recontext', title + if self._simplify and self._messages and self._messages[-1][0] == "popcontext": + self._messages[-1] = "recontext", title else: - self._messages.append(('pushcontext', title)) + self._messages.append(("pushcontext", title)) def recontext(self, title: str) -> None: - if self._simplify and self._messages and self._messages[-1][0] in ('pushcontext', 'recontext'): + if ( + self._simplify + and self._messages + and self._messages[-1][0] in ("pushcontext", "recontext") + ): self._messages[-1] = self._messages[-1][0], title else: - self._messages.append(('recontext', title)) + self._messages.append(("recontext", title)) def popcontext(self) -> None: - if not self._simplify or not self._messages or self._messages[-1][0] not in ('pushcontext', 'recontext') or self._messages.pop()[0] == 'recontext': - self._messages.append(('popcontext',)) + if ( + not self._simplify + or not self._messages + or self._messages[-1][0] not in ("pushcontext", "recontext") + or self._messages.pop()[0] == "recontext" + ): + self._messages.append(("popcontext",)) def write(self, msg, level: Level) -> None: - self._messages.append(('write', msg, level)) + self._messages.append(("write", msg, level)) def replay(self, log: typing.Optional[Log] = None) -> None: - '''Replay this recorded log. + """Replay this recorded log. All recorded messages and files will be written to the log that is either - directly specified or currently active.''' + directly specified or currently active.""" if log is None: from ._state import current as log for cmd, *args in self._messages: - if cmd == 'pushcontext': - title, = args + if cmd == "pushcontext": + (title,) = args log.pushcontext(title) - elif cmd == 'recontext': - title, = args + elif cmd == "recontext": + (title,) = args log.recontext(title) - elif cmd == 'popcontext': + elif cmd == "popcontext": log.popcontext() - elif cmd == 'write': + elif cmd == "write": msg, level = args log.write(msg, level) diff --git a/treelog/_richoutput.py b/treelog/_richoutput.py index a0fd774..f82697f 100644 --- a/treelog/_richoutput.py +++ b/treelog/_richoutput.py @@ -5,17 +5,18 @@ class RichOutputLog: - '''Output rich (colored,unicode) text to stream.''' + """Output rich (colored,unicode) text to stream.""" _cmap = ( - '\033[1;30m', # debug: bold gray - '\033[1m', # info: bold - '\033[1;34m', # user: bold blue - '\033[1;35m', # warning: bold purple - '\033[1;31m') # error: bold red + "\033[1;30m", # debug: bold gray + "\033[1m", # info: bold + "\033[1;34m", # user: bold blue + "\033[1;35m", # warning: bold purple + "\033[1;31m", + ) # error: bold red def __init__(self, file=sys.stdout) -> None: - self._current = '' # currently printed context + self._current = "" # currently printed context self.file = file set_ansi_console() self.currentcontext = [] # type: typing.List[str] @@ -33,33 +34,37 @@ def recontext(self, title: str) -> None: self.contextchangedhook() def contextchangedhook(self) -> None: - _current = ''.join(item + ' > ' for item in self.currentcontext) + _current = "".join(item + " > " for item in self.currentcontext) if _current == self._current: return n = first(c1 != c2 for c1, c2 in zip(_current, self._current)) items = [] if n == 0 and self._current: - items.append('\r') + items.append("\r") elif n < len(self._current): - items.append('\033[{}D'.format(len(self._current)-n)) + items.append("\033[{}D".format(len(self._current) - n)) if n < len(_current): items.append(_current[n:]) if len(_current) < len(self._current): - items.append('\033[K') - self.file.write(''.join(items)) + items.append("\033[K") + self.file.write("".join(items)) self.file.flush() self._current = _current def write(self, msg, level: Level) -> None: msg = str(msg) - if self._current and '\n' in msg: - msg = msg.replace('\n', '\033[0m\n' + ' > '.rjust(len(self._current)) + self._cmap[level.value]) + if self._current and "\n" in msg: + msg = msg.replace( + "\n", + "\033[0m\n" + " > ".rjust(len(self._current)) + self._cmap[level.value], + ) self.file.write( - ''.join([self._cmap[level.value], msg, '\033[0m\n', self._current])) + "".join([self._cmap[level.value], msg, "\033[0m\n", self._current]) + ) def first(items: typing.Iterable[bool]) -> int: - 'return index of first truthy item, or len(items) of all items are falsy' + "return index of first truthy item, or len(items) of all items are falsy" i = 0 for item in items: if item: @@ -71,10 +76,15 @@ def first(items: typing.Iterable[bool]) -> int: def set_ansi_console() -> None: if sys.platform == "win32": import platform - if platform.version() < '10.': + + if platform.version() < "10.": raise RuntimeError( - 'ANSI console mode requires Windows 10 or higher, detected {}'.format(platform.version())) + "ANSI console mode requires Windows 10 or higher, detected {}".format( + platform.version() + ) + ) import ctypes + # https://docs.microsoft.com/en-us/windows/console/getstdhandle handle = ctypes.windll.kernel32.GetStdHandle(-11) # https://docs.microsoft.com/en-us/windows/desktop/WinProg/windows-data-types#lpdword diff --git a/treelog/_state.py b/treelog/_state.py index 444d86a..82bcde5 100644 --- a/treelog/_state.py +++ b/treelog/_state.py @@ -16,7 +16,7 @@ @contextlib.contextmanager def set(logger: Log) -> typing.Generator[Log, None, None]: - '''Set logger as current.''' + """Set logger as current.""" global current old = current @@ -28,32 +28,35 @@ def set(logger: Log) -> typing.Generator[Log, None, None]: def add(logger: Log) -> typing.ContextManager[Log]: - '''Add logger to current.''' + """Add logger to current.""" return set(TeeLog(current, logger)) def disable() -> typing.ContextManager[Log]: - '''Disable logger.''' + """Disable logger.""" return set(NullLog()) @contextlib.contextmanager -def context(title: str, *initargs: typing.Any, **initkwargs: typing.Any) -> typing.Generator[typing.Optional[typing.Callable[..., None]], None, None]: - '''Enterable context. +def context( + title: str, *initargs: typing.Any, **initkwargs: typing.Any +) -> typing.Generator[typing.Optional[typing.Callable[..., None]], None, None]: + """Enterable context. Returns an enterable object which upon enter creates a context with a given title, to be automatically closed upon exit. In case additional arguments are given the title is used as a format string, and a callable is returned that - allows for recontextualization from within the current with-block.''' + allows for recontextualization from within the current with-block.""" log = current if initargs or initkwargs: format = title.format - # type: typing.Optional[typing.Callable[..., None]] - reformat = lambda *args, **kwargs: log.recontext( - format(*args, **kwargs)) + + def reformat(*args, **kwargs): + log.recontext(format(*args, **kwargs)) + title = title.format(*initargs, **initkwargs) else: reformat = None @@ -64,20 +67,22 @@ def context(title: str, *initargs: typing.Any, **initkwargs: typing.Any) -> typi log.popcontext() -T = typing.TypeVar('T') +T = typing.TypeVar("T") + def withcontext(f: typing.Callable[..., T]) -> typing.Callable[..., T]: - '''Decorator; executes the wrapped function in its own logging context.''' + """Decorator; executes the wrapped function in its own logging context.""" @functools.wraps(f) def wrapped(*args: typing.Any, **kwargs: typing.Any) -> T: with context(f.__name__): return f(*args, **kwargs) + return wrapped -def write(level: Level, *args: typing.Any, sep: str = ' ') -> None: - '''Write message to log. +def write(level: Level, *args: typing.Any, sep: str = " ") -> None: + """Write message to log. Args ---- @@ -85,28 +90,27 @@ def write(level: Level, *args: typing.Any, sep: str = ' ') -> None: Values to be printed to the log. sep : :class:`str` String inserted between values, default a space. - ''' + """ current.write(sep.join(map(str, args)), level) @contextlib.contextmanager def file(level: Level, name: str, mode: str, type: typing.Optional[str] = None): - '''Open file in logger-controlled directory. + """Open file in logger-controlled directory. Args ---- filename : :class:`str` mode : :class:`str` Should be either ``'w'`` (text) or ``'wb'`` (binary data). - ''' + """ - if mode == 'wb': + if mode == "wb": binary = True - elif mode == 'w': + elif mode == "w": binary = False else: - raise ValueError(f'invalid mode {mode!r}') - logger = current + raise ValueError(f"invalid mode {mode!r}") with tempfile.TemporaryFile() as f, context(name): yield f if binary else io.TextIOWrapper(f, write_through=True) f.seek(0) @@ -119,10 +123,10 @@ def data(level: Level, name: str, data: bytes, type: typing.Optional[str] = None def partial(attr): - if attr.endswith('file'): + if attr.endswith("file"): f = file level = attr[:-4] - elif attr.endswith('data'): + elif attr.endswith("data"): f = data level = attr[:-4] else: diff --git a/treelog/_stdout.py b/treelog/_stdout.py index fcc846c..b10ae06 100644 --- a/treelog/_stdout.py +++ b/treelog/_stdout.py @@ -4,23 +4,23 @@ class StdoutLog: - '''Output plain text to stream.''' + """Output plain text to stream.""" def __init__(self, file=sys.stdout): self.file = file self.currentcontext = [] # type: typing.List[str] def pushcontext(self, title: str) -> None: - self.currentcontext.append(title + ' > ') + self.currentcontext.append(title + " > ") def popcontext(self) -> None: self.currentcontext.pop() def recontext(self, title: str) -> None: - self.currentcontext[-1] = title + ' > ' + self.currentcontext[-1] = title + " > " def write(self, msg, level: proto.Level) -> None: if self.currentcontext: - prefix = ''.join(self.currentcontext) - msg = prefix + str(msg).replace('\n', '\n' + ' > '.rjust(len(prefix))) + prefix = "".join(self.currentcontext) + msg = prefix + str(msg).replace("\n", "\n" + " > ".rjust(len(prefix))) print(msg, file=self.file) diff --git a/treelog/_tee.py b/treelog/_tee.py index 8080a3d..b237ce3 100644 --- a/treelog/_tee.py +++ b/treelog/_tee.py @@ -1,12 +1,8 @@ -import contextlib -import os -import tempfile - from .proto import Level, Log class TeeLog: - '''Forward messages to two underlying loggers.''' + """Forward messages to two underlying loggers.""" def __init__(self, baselog1: Log, baselog2: Log) -> None: self._baselog1 = baselog1 diff --git a/treelog/iter.py b/treelog/iter.py index d867bab..654830d 100644 --- a/treelog/iter.py +++ b/treelog/iter.py @@ -1,33 +1,36 @@ import itertools -import functools import warnings import inspect import typing import types -from . import proto, _state +from . import _state -T = typing.TypeVar('T') -T0 = typing.TypeVar('T0') -T1 = typing.TypeVar('T1') -T2 = typing.TypeVar('T2') -T3 = typing.TypeVar('T3') -T4 = typing.TypeVar('T4') -T5 = typing.TypeVar('T5') -T6 = typing.TypeVar('T6') -T7 = typing.TypeVar('T7') -T8 = typing.TypeVar('T8') -T9 = typing.TypeVar('T9') +T = typing.TypeVar("T") +T0 = typing.TypeVar("T0") +T1 = typing.TypeVar("T1") +T2 = typing.TypeVar("T2") +T3 = typing.TypeVar("T3") +T4 = typing.TypeVar("T4") +T5 = typing.TypeVar("T5") +T6 = typing.TypeVar("T6") +T7 = typing.TypeVar("T7") +T8 = typing.TypeVar("T8") +T9 = typing.TypeVar("T9") class wrap(typing.Generic[T]): - '''Wrap iterable in consecutive title contexts. + """Wrap iterable in consecutive title contexts. The wrapped iterable is identical to the original, except that prior to every next item a new log context is opened taken from the ``titles`` iterable. The wrapped object should be entered before use in order to ensure that this - context is properly closed in case the iterator is prematurely abandoned.''' + context is properly closed in case the iterator is prematurely abandoned.""" - def __init__(self, titles: typing.Union[typing.Iterable[str], typing.Generator[str, T, None]], iterable: typing.Iterable[T]) -> None: + def __init__( + self, + titles: typing.Union[typing.Iterable[str], typing.Generator[str, T, None]], + iterable: typing.Iterable[T], + ) -> None: self._titles = iter(titles) self._iterable = iter(iterable) self._log = None # type: typing.Optional[proto.Log] @@ -35,7 +38,7 @@ def __init__(self, titles: typing.Union[typing.Iterable[str], typing.Generator[s def __enter__(self) -> typing.Iterator[T]: if self._log is not None: - raise Exception('iter.wrap is not reentrant') + raise Exception("iter.wrap is not reentrant") self._log = _state.current self._log.pushcontext(next(self._titles)) return iter(self) @@ -44,19 +47,29 @@ def __iter__(self) -> typing.Generator[T, None, None]: if self._log is not None: cansend = inspect.isgenerator(self._titles) for value in self._iterable: - self._log.recontext(typing.cast(typing.Generator[str, T, None], self._titles).send( - value) if cansend else next(self._titles)) + self._log.recontext( + typing.cast(typing.Generator[str, T, None], self._titles).send( + value + ) + if cansend + else next(self._titles) + ) yield value else: with self: self._warn = True yield from self - def __exit__(self, exctype: typing.Optional[typing.Type[BaseException]], excvalue: typing.Optional[BaseException], tb: typing.Optional[types.TracebackType]) -> None: + def __exit__( + self, + exctype: typing.Optional[typing.Type[BaseException]], + excvalue: typing.Optional[BaseException], + tb: typing.Optional[types.TracebackType], + ) -> None: if self._log is None: - raise Exception('iter.wrap has not yet been entered') + raise Exception("iter.wrap has not yet been entered") if self._warn and exctype is GeneratorExit: - warnings.warn('unclosed iter.wrap', ResourceWarning) + warnings.warn("unclosed iter.wrap", ResourceWarning) self._log.popcontext() self._log = None @@ -66,48 +79,109 @@ def plain(title: str, __arg0: typing.Iterable[T0]) -> wrap[T0]: ... @typing.overload -def plain(title: str, __arg0: typing.Iterable[T0], - __arg1: typing.Iterable[T1]) -> wrap[typing.Tuple[T0, T1]]: ... +def plain( + title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1] +) -> wrap[typing.Tuple[T0, T1]]: ... @typing.overload -def plain(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], - __arg2: typing.Iterable[T2]) -> wrap[typing.Tuple[T0, T1, T2]]: ... +def plain( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], +) -> wrap[typing.Tuple[T0, T1, T2]]: ... @typing.overload -def plain(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], - __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3]) -> wrap[typing.Tuple[T0, T1, T2, T3]]: ... +def plain( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], +) -> wrap[typing.Tuple[T0, T1, T2, T3]]: ... @typing.overload -def plain(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], - __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4]) -> wrap[typing.Tuple[T0, T1, T2, T3, T4]]: ... +def plain( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4]]: ... @typing.overload -def plain(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], - __arg4: typing.Iterable[T4], __arg5: typing.Iterable[T5]) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5]]: ... +def plain( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5]]: ... @typing.overload -def plain(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], - __arg4: typing.Iterable[T4], __arg5: typing.Iterable[T5], __arg6: typing.Iterable[T6]) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6]]: ... +def plain( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6]]: ... @typing.overload -def plain(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4], - __arg5: typing.Iterable[T5], __arg6: typing.Iterable[T6], __arg7: typing.Iterable[T7]) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7]]: ... +def plain( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], + __arg7: typing.Iterable[T7], +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7]]: ... @typing.overload -def plain(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4], - __arg5: typing.Iterable[T5], __arg6: typing.Iterable[T6], __arg7: typing.Iterable[T7], __arg8: typing.Iterable[T8]) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8]]: ... +def plain( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], + __arg7: typing.Iterable[T7], + __arg8: typing.Iterable[T8], +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8]]: ... @typing.overload -def plain(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4], __arg5: typing.Iterable[T5], - __arg6: typing.Iterable[T6], __arg7: typing.Iterable[T7], __arg8: typing.Iterable[T8], __arg9: typing.Iterable[T9]) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9]]: ... +def plain( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], + __arg7: typing.Iterable[T7], + __arg8: typing.Iterable[T8], + __arg9: typing.Iterable[T9], +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9]]: ... @typing.overload @@ -115,154 +189,324 @@ def plain(title: str, *args: typing.Any) -> wrap[typing.Any]: ... def plain(title: str, *args: typing.Any) -> wrap[typing.Any]: - '''Wrap arguments in simple enumerated contexts. + """Wrap arguments in simple enumerated contexts. Example: my context 1, my context 2, etc. - ''' + """ - titles = map((_escape(title) + ' {}').format, itertools.count()) + titles = map((_escape(title) + " {}").format, itertools.count()) return wrap(titles, zip(*args) if len(args) > 1 else args[0]) @typing.overload -def fraction(title: str, __arg0: typing.Iterable[T0], *, - length: typing.Optional[int] = ...) -> wrap[T0]: ... +def fraction( + title: str, __arg0: typing.Iterable[T0], *, length: typing.Optional[int] = ... +) -> wrap[T0]: ... @typing.overload -def fraction(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], - *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1]]: ... +def fraction( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1]]: ... @typing.overload -def fraction(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], - __arg2: typing.Iterable[T2], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2]]: ... +def fraction( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2]]: ... + + +@typing.overload +def fraction( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3]]: ... @typing.overload -def fraction(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], - __arg3: typing.Iterable[T3], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3]]: ... +def fraction( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4]]: ... @typing.overload -def fraction(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], - __arg4: typing.Iterable[T4], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4]]: ... +def fraction( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5]]: ... @typing.overload -def fraction(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], - __arg4: typing.Iterable[T4], __arg5: typing.Iterable[T5], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5]]: ... +def fraction( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6]]: ... @typing.overload -def fraction(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4], - __arg5: typing.Iterable[T5], __arg6: typing.Iterable[T6], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6]]: ... +def fraction( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], + __arg7: typing.Iterable[T7], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7]]: ... @typing.overload -def fraction(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4], - __arg5: typing.Iterable[T5], __arg6: typing.Iterable[T6], __arg7: typing.Iterable[T7], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7]]: ... - - -@typing.overload -def fraction(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4], __arg5: typing.Iterable[T5], - __arg6: typing.Iterable[T6], __arg7: typing.Iterable[T7], __arg8: typing.Iterable[T8], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8]]: ... - - -@typing.overload -def fraction(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4], __arg5: typing.Iterable[T5], - __arg6: typing.Iterable[T6], __arg7: typing.Iterable[T7], __arg8: typing.Iterable[T8], __arg9: typing.Iterable[T9], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9]]: ... - - -@typing.overload -def fraction(title: str, *args: typing.Any, - length: typing.Optional[int] = ...) -> wrap[typing.Any]: ... - - -def fraction(title: str, *args: typing.Any, length: typing.Optional[int] = None) -> wrap[typing.Any]: - '''Wrap arguments in enumerated contexts with length. +def fraction( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], + __arg7: typing.Iterable[T7], + __arg8: typing.Iterable[T8], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8]]: ... + + +@typing.overload +def fraction( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], + __arg7: typing.Iterable[T7], + __arg8: typing.Iterable[T8], + __arg9: typing.Iterable[T9], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9]]: ... + + +@typing.overload +def fraction( + title: str, *args: typing.Any, length: typing.Optional[int] = ... +) -> wrap[typing.Any]: ... + + +def fraction( + title: str, *args: typing.Any, length: typing.Optional[int] = None +) -> wrap[typing.Any]: + """Wrap arguments in enumerated contexts with length. Example: my context 1/5, my context 2/5, etc. - ''' + """ if length is None: length = min(len(arg) for arg in args) - titles = map((_escape(title) + ' {}/' + str(length)).format, - itertools.count()) + titles = map((_escape(title) + " {}/" + str(length)).format, itertools.count()) return wrap(titles, zip(*args) if len(args) > 1 else args[0]) @typing.overload -def percentage(title: str, __arg0: typing.Iterable[T0], - *, length: typing.Optional[int] = ...) -> wrap[T0]: ... +def percentage( + title: str, __arg0: typing.Iterable[T0], *, length: typing.Optional[int] = ... +) -> wrap[T0]: ... @typing.overload -def percentage(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], - *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1]]: ... +def percentage( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1]]: ... @typing.overload -def percentage(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], - __arg2: typing.Iterable[T2], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2]]: ... +def percentage( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2]]: ... + + +@typing.overload +def percentage( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3]]: ... @typing.overload -def percentage(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], - __arg3: typing.Iterable[T3], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3]]: ... +def percentage( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4]]: ... @typing.overload -def percentage(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], - __arg4: typing.Iterable[T4], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4]]: ... +def percentage( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5]]: ... @typing.overload -def percentage(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], - __arg4: typing.Iterable[T4], __arg5: typing.Iterable[T5], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5]]: ... +def percentage( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6]]: ... @typing.overload -def percentage(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4], - __arg5: typing.Iterable[T5], __arg6: typing.Iterable[T6], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6]]: ... +def percentage( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], + __arg7: typing.Iterable[T7], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7]]: ... @typing.overload -def percentage(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4], - __arg5: typing.Iterable[T5], __arg6: typing.Iterable[T6], __arg7: typing.Iterable[T7], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7]]: ... - - -@typing.overload -def percentage(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4], __arg5: typing.Iterable[T5], - __arg6: typing.Iterable[T6], __arg7: typing.Iterable[T7], __arg8: typing.Iterable[T8], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8]]: ... - - -@typing.overload -def percentage(title: str, __arg0: typing.Iterable[T0], __arg1: typing.Iterable[T1], __arg2: typing.Iterable[T2], __arg3: typing.Iterable[T3], __arg4: typing.Iterable[T4], __arg5: typing.Iterable[T5], - __arg6: typing.Iterable[T6], __arg7: typing.Iterable[T7], __arg8: typing.Iterable[T8], __arg9: typing.Iterable[T9], *, length: typing.Optional[int] = ...) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9]]: ... - - -@typing.overload -def percentage(title: str, *args: typing.Any, - length: typing.Optional[int] = ...) -> wrap[typing.Any]: ... - - -def percentage(title: str, *args: typing.Any, length: typing.Optional[int] = None) -> wrap[typing.Any]: - '''Wrap arguments in contexts with percentage counter. +def percentage( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], + __arg7: typing.Iterable[T7], + __arg8: typing.Iterable[T8], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8]]: ... + + +@typing.overload +def percentage( + title: str, + __arg0: typing.Iterable[T0], + __arg1: typing.Iterable[T1], + __arg2: typing.Iterable[T2], + __arg3: typing.Iterable[T3], + __arg4: typing.Iterable[T4], + __arg5: typing.Iterable[T5], + __arg6: typing.Iterable[T6], + __arg7: typing.Iterable[T7], + __arg8: typing.Iterable[T8], + __arg9: typing.Iterable[T9], + *, + length: typing.Optional[int] = ..., +) -> wrap[typing.Tuple[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9]]: ... + + +@typing.overload +def percentage( + title: str, *args: typing.Any, length: typing.Optional[int] = ... +) -> wrap[typing.Any]: ... + + +def percentage( + title: str, *args: typing.Any, length: typing.Optional[int] = None +) -> wrap[typing.Any]: + """Wrap arguments in contexts with percentage counter. Example: my context 5%, my context 10%, etc. - ''' + """ if length is None: length = min(len(arg) for arg in args) if length: # type: typing.Iterable[str] titles = map( - (_escape(title) + ' {:.0f}%').format, itertools.count(step=100/length)) + (_escape(title) + " {:.0f}%").format, itertools.count(step=100 / length) + ) else: - titles = title + ' 100%', + titles = (title + " 100%",) return wrap(titles, zip(*args) if len(args) > 1 else args[0]) def _escape(s: str) -> str: - return s.replace('{', '{{').replace('}', '}}') + return s.replace("{", "{{").replace("}", "}}") diff --git a/treelog/proto.py b/treelog/proto.py index be066d5..0fe9f27 100644 --- a/treelog/proto.py +++ b/treelog/proto.py @@ -4,7 +4,6 @@ class Level(Enum): - debug = 0 info = 1 user = 2 @@ -19,14 +18,13 @@ class Data: type: Optional[str] = None def __str__(self): - info = f'{len(self.data)} bytes' + info = f"{len(self.data)} bytes" if self.type: - info = f'{self.type}; {info}' - return f'{self.name} [{info}]' + info = f"{self.type}; {info}" + return f"{self.name} [{info}]" class Log(Protocol): - def pushcontext(self, title: str) -> None: ... def popcontext(self) -> None: ... def recontext(self, title: str) -> None: ... From 05b1010a91f95f85e632bed54c951f1396315a4c Mon Sep 17 00:00:00 2001 From: Gertjan van Zwieten Date: Tue, 17 Mar 2026 10:24:44 +0100 Subject: [PATCH 2/3] Add info attribute to Data object --- treelog/proto.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/treelog/proto.py b/treelog/proto.py index 0fe9f27..1339c27 100644 --- a/treelog/proto.py +++ b/treelog/proto.py @@ -17,11 +17,15 @@ class Data: data: bytes type: Optional[str] = None - def __str__(self): + @property + def info(self): info = f"{len(self.data)} bytes" if self.type: info = f"{self.type}; {info}" - return f"{self.name} [{info}]" + return info + + def __str__(self): + return f"{self.name} [{self.info}]" class Log(Protocol): From 0020a532907fb1302b9f541d6e74400e9c1a2dc7 Mon Sep 17 00:00:00 2001 From: Gertjan van Zwieten Date: Tue, 17 Mar 2026 10:25:09 +0100 Subject: [PATCH 3/3] Remove colour from data info in RichOutputLog --- tests.py | 10 +++++----- treelog/_richoutput.py | 12 +++++++++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/tests.py b/tests.py index b858e12..7d677f4 100644 --- a/tests.py +++ b/tests.py @@ -109,7 +109,7 @@ def check_output(self, f): "\x1b[1;34mmy message\x1b[0m\n" "test.dat > " "\r\x1b[K" - "\x1b[1mtest.dat [5 bytes]\x1b[0m\n" + "\x1b[1mtest.dat\x1b[0m [5 bytes]\n" "my context > " "iter 0 " "> \x1b[4D1 > " @@ -124,10 +124,10 @@ def check_output(self, f): "\x1b[1;31mmultiple..\x1b[0m\n > \x1b[1;31m ..lines\x1b[0m\nmy context > test.dat > " "\x1b[1mgenerating\x1b[0m\nmy context > test.dat > " "\x1b[11D\x1b[K" - "\x1b[1;34mtest.dat [5 bytes]\x1b[0m\nmy context > " + "\x1b[1;34mtest.dat\x1b[0m [5 bytes]\nmy context > " "\r\x1b[Kgenerate_test > test.dat > " "\x1b[11D\x1b[K" - "\x1b[1;35mtest.dat [5 bytes]\x1b[0m\ngenerate_test > " + "\x1b[1;35mtest.dat\x1b[0m [5 bytes]\ngenerate_test > " "\r\x1b[K" "context step=0 > " "\x1b[1mfoo\x1b[0m\n" @@ -136,10 +136,10 @@ def check_output(self, f): "\x1b[1mbar\x1b[0m\n" "context step=1 > " "\r\x1b[K" - "\x1b[1;31msame.dat [5 bytes]\x1b[0m\n" + "\x1b[1;31msame.dat\x1b[0m [5 bytes]\n" "dbg.jpg > " "\r\x1b[K" - "\x1b[1;30mdbg.jpg [image/jpg; 5 bytes]\x1b[0m\n" + "\x1b[1;30mdbg.jpg\x1b[0m [image/jpg; 5 bytes]\n" "\x1b[1;30mdbg\x1b[0m\n" "\x1b[1;35mwarn\x1b[0m\n", ) diff --git a/treelog/_richoutput.py b/treelog/_richoutput.py index f82697f..71c6348 100644 --- a/treelog/_richoutput.py +++ b/treelog/_richoutput.py @@ -1,7 +1,7 @@ import sys import typing -from .proto import Level +from .proto import Level, Data class RichOutputLog: @@ -52,14 +52,20 @@ def contextchangedhook(self) -> None: self._current = _current def write(self, msg, level: Level) -> None: - msg = str(msg) + if isinstance(msg, Data): + info = f" [{msg.info}]" + msg = msg.name + else: + info = "" if self._current and "\n" in msg: msg = msg.replace( "\n", "\033[0m\n" + " > ".rjust(len(self._current)) + self._cmap[level.value], ) self.file.write( - "".join([self._cmap[level.value], msg, "\033[0m\n", self._current]) + "".join( + [self._cmap[level.value], msg, "\033[0m", info, "\n", self._current] + ) )