mirror of
https://github.com/beetbox/beets.git
synced 2025-12-06 08:39:17 +01:00
Added regex pattern to strip C0/C1 control characters (excluding useful whitespace) from log messages before terminal output. This prevents disruptive/malicious control sequences from affecting terminal rendering.
353 lines
12 KiB
Python
353 lines
12 KiB
Python
"""Stupid tests that ensure logging works as expected"""
|
||
|
||
import logging as log
|
||
import sys
|
||
import threading
|
||
from types import ModuleType
|
||
from unittest.mock import patch
|
||
|
||
import pytest
|
||
|
||
import beets.logging as blog
|
||
from beets import plugins, ui
|
||
from beets.test import _common, helper
|
||
from beets.test.helper import AsIsImporterMixin, ImportTestCase, PluginMixin
|
||
|
||
|
||
class TestStrFormatLogger:
|
||
"""Tests for the custom str-formatting logger."""
|
||
|
||
def test_logger_creation(self):
|
||
l1 = log.getLogger("foo123")
|
||
l2 = blog.getLogger("foo123")
|
||
assert l1 == l2
|
||
assert l1.__class__ == log.Logger
|
||
|
||
l3 = blog.getLogger("bar123")
|
||
l4 = log.getLogger("bar123")
|
||
assert l3 == l4
|
||
assert l3.__class__ == blog.BeetsLogger
|
||
assert isinstance(
|
||
l3, (blog.StrFormatLogger, blog.ThreadLocalLevelLogger)
|
||
)
|
||
|
||
l5 = l3.getChild("shalala")
|
||
assert l5.__class__ == blog.BeetsLogger
|
||
|
||
l6 = blog.getLogger()
|
||
assert l1 != l6
|
||
|
||
@pytest.mark.parametrize(
|
||
"level", [log.DEBUG, log.INFO, log.WARNING, log.ERROR]
|
||
)
|
||
@pytest.mark.parametrize(
|
||
"msg, args, kwargs, expected",
|
||
[
|
||
("foo {} bar {}", ("oof", "baz"), {}, "foo oof bar baz"),
|
||
(
|
||
"foo {bar} baz {foo}",
|
||
(),
|
||
{"foo": "oof", "bar": "baz"},
|
||
"foo baz baz oof",
|
||
),
|
||
("no args", (), {}, "no args"),
|
||
("foo {} bar {baz}", ("oof",), {"baz": "baz"}, "foo oof bar baz"),
|
||
],
|
||
)
|
||
def test_str_format_logging(
|
||
self, level, msg, args, kwargs, expected, caplog
|
||
):
|
||
logger = blog.getLogger("test_logger")
|
||
logger.setLevel(level)
|
||
|
||
with caplog.at_level(level, logger="test_logger"):
|
||
logger.log(level, msg, *args, **kwargs)
|
||
|
||
assert caplog.records, "No log records were captured"
|
||
assert str(caplog.records[0].msg) == expected
|
||
|
||
|
||
class TestLogSanitization:
|
||
"""Log messages should have control characters removed from:
|
||
- String arguments
|
||
- Keyword argument values
|
||
- Bytes arguments (which get decoded first)
|
||
"""
|
||
|
||
@pytest.mark.parametrize(
|
||
"msg, args, kwargs, expected",
|
||
[
|
||
# Valid UTF-8 bytes are decoded and preserved
|
||
(
|
||
"foo {} bar {bar}",
|
||
(b"oof \xc3\xa9",),
|
||
{"bar": b"baz \xc3\xa9"},
|
||
"foo oof é bar baz é",
|
||
),
|
||
# Invalid UTF-8 bytes are decoded with replacement characters
|
||
(
|
||
"foo {} bar {bar}",
|
||
(b"oof \xff",),
|
||
{"bar": b"baz \xff"},
|
||
"foo oof <20> bar baz <20>",
|
||
),
|
||
# Control characters should be removed
|
||
(
|
||
"foo {} bar {bar}",
|
||
("oof \x9e",),
|
||
{"bar": "baz \x9e"},
|
||
"foo oof <20> bar baz <20>",
|
||
),
|
||
# Whitespace control characters should be preserved
|
||
(
|
||
"foo {} bar {bar}",
|
||
("foo\t\n",),
|
||
{"bar": "bar\r"},
|
||
"foo foo\t\n bar bar\r",
|
||
),
|
||
],
|
||
)
|
||
def test_sanitization(self, msg, args, kwargs, expected, caplog):
|
||
level = log.INFO
|
||
logger = blog.getLogger("test_logger")
|
||
logger.setLevel(level)
|
||
|
||
with caplog.at_level(level, logger="test_logger"):
|
||
logger.log(level, msg, *args, **kwargs)
|
||
|
||
assert caplog.records, "No log records were captured"
|
||
assert str(caplog.records[0].msg) == expected
|
||
|
||
|
||
class DummyModule(ModuleType):
|
||
class DummyPlugin(plugins.BeetsPlugin):
|
||
def __init__(self):
|
||
plugins.BeetsPlugin.__init__(self, "dummy")
|
||
self.import_stages = [self.import_stage]
|
||
self.register_listener("dummy_event", self.listener)
|
||
|
||
def log_all(self, name):
|
||
self._log.debug("debug {}", name)
|
||
self._log.info("info {}", name)
|
||
self._log.warning("warning {}", name)
|
||
|
||
def commands(self):
|
||
cmd = ui.Subcommand("dummy")
|
||
cmd.func = lambda _, __, ___: self.log_all("cmd")
|
||
return (cmd,)
|
||
|
||
def import_stage(self, session, task):
|
||
self.log_all("import_stage")
|
||
|
||
def listener(self):
|
||
self.log_all("listener")
|
||
|
||
def __init__(self, *_, **__):
|
||
module_name = "beetsplug.dummy"
|
||
super().__init__(module_name)
|
||
self.DummyPlugin.__module__ = module_name
|
||
self.DummyPlugin = self.DummyPlugin
|
||
|
||
|
||
class LoggingLevelTest(AsIsImporterMixin, PluginMixin, ImportTestCase):
|
||
plugin = "dummy"
|
||
|
||
@classmethod
|
||
def setUpClass(cls):
|
||
patcher = patch.dict(sys.modules, {"beetsplug.dummy": DummyModule()})
|
||
patcher.start()
|
||
cls.addClassCleanup(patcher.stop)
|
||
|
||
super().setUpClass()
|
||
|
||
def test_command_level0(self):
|
||
self.config["verbose"] = 0
|
||
with helper.capture_log() as logs:
|
||
self.run_command("dummy")
|
||
assert "dummy: warning cmd" in logs
|
||
assert "dummy: info cmd" in logs
|
||
assert "dummy: debug cmd" not in logs
|
||
|
||
def test_command_level1(self):
|
||
self.config["verbose"] = 1
|
||
with helper.capture_log() as logs:
|
||
self.run_command("dummy")
|
||
assert "dummy: warning cmd" in logs
|
||
assert "dummy: info cmd" in logs
|
||
assert "dummy: debug cmd" in logs
|
||
|
||
def test_command_level2(self):
|
||
self.config["verbose"] = 2
|
||
with helper.capture_log() as logs:
|
||
self.run_command("dummy")
|
||
assert "dummy: warning cmd" in logs
|
||
assert "dummy: info cmd" in logs
|
||
assert "dummy: debug cmd" in logs
|
||
|
||
def test_listener_level0(self):
|
||
self.config["verbose"] = 0
|
||
with helper.capture_log() as logs:
|
||
plugins.send("dummy_event")
|
||
assert "dummy: warning listener" in logs
|
||
assert "dummy: info listener" not in logs
|
||
assert "dummy: debug listener" not in logs
|
||
|
||
def test_listener_level1(self):
|
||
self.config["verbose"] = 1
|
||
with helper.capture_log() as logs:
|
||
plugins.send("dummy_event")
|
||
assert "dummy: warning listener" in logs
|
||
assert "dummy: info listener" in logs
|
||
assert "dummy: debug listener" not in logs
|
||
|
||
def test_listener_level2(self):
|
||
self.config["verbose"] = 2
|
||
with helper.capture_log() as logs:
|
||
plugins.send("dummy_event")
|
||
assert "dummy: warning listener" in logs
|
||
assert "dummy: info listener" in logs
|
||
assert "dummy: debug listener" in logs
|
||
|
||
def test_import_stage_level0(self):
|
||
self.config["verbose"] = 0
|
||
with helper.capture_log() as logs:
|
||
self.run_asis_importer()
|
||
assert "dummy: warning import_stage" in logs
|
||
assert "dummy: info import_stage" not in logs
|
||
assert "dummy: debug import_stage" not in logs
|
||
|
||
def test_import_stage_level1(self):
|
||
self.config["verbose"] = 1
|
||
with helper.capture_log() as logs:
|
||
self.run_asis_importer()
|
||
assert "dummy: warning import_stage" in logs
|
||
assert "dummy: info import_stage" in logs
|
||
assert "dummy: debug import_stage" not in logs
|
||
|
||
def test_import_stage_level2(self):
|
||
self.config["verbose"] = 2
|
||
with helper.capture_log() as logs:
|
||
self.run_asis_importer()
|
||
assert "dummy: warning import_stage" in logs
|
||
assert "dummy: info import_stage" in logs
|
||
assert "dummy: debug import_stage" in logs
|
||
|
||
|
||
@_common.slow_test()
|
||
class ConcurrentEventsTest(AsIsImporterMixin, ImportTestCase):
|
||
"""Similar to LoggingLevelTest but lower-level and focused on multiple
|
||
events interaction. Since this is a bit heavy we don't do it in
|
||
LoggingLevelTest.
|
||
"""
|
||
|
||
db_on_disk = True
|
||
|
||
class DummyPlugin(plugins.BeetsPlugin):
|
||
def __init__(self, test_case):
|
||
plugins.BeetsPlugin.__init__(self, "dummy")
|
||
self.register_listener("dummy_event1", self.listener1)
|
||
self.register_listener("dummy_event2", self.listener2)
|
||
self.lock1 = threading.Lock()
|
||
self.lock2 = threading.Lock()
|
||
self.test_case = test_case
|
||
self.exc = None
|
||
self.t1_step = self.t2_step = 0
|
||
|
||
def log_all(self, name):
|
||
self._log.debug("debug {}", name)
|
||
self._log.info("info {}", name)
|
||
self._log.warning("warning {}", name)
|
||
|
||
def listener1(self):
|
||
try:
|
||
assert self._log.level == log.INFO
|
||
self.t1_step = 1
|
||
self.lock1.acquire()
|
||
assert self._log.level == log.INFO
|
||
self.t1_step = 2
|
||
except Exception as e:
|
||
self.exc = e
|
||
|
||
def listener2(self):
|
||
try:
|
||
assert self._log.level == log.DEBUG
|
||
self.t2_step = 1
|
||
self.lock2.acquire()
|
||
assert self._log.level == log.DEBUG
|
||
self.t2_step = 2
|
||
except Exception as e:
|
||
self.exc = e
|
||
|
||
def test_concurrent_events(self):
|
||
dp = self.DummyPlugin(self)
|
||
|
||
def check_dp_exc():
|
||
if dp.exc:
|
||
raise dp.exc
|
||
|
||
try:
|
||
dp.lock1.acquire()
|
||
dp.lock2.acquire()
|
||
assert dp._log.level == log.NOTSET
|
||
|
||
self.config["verbose"] = 1
|
||
t1 = threading.Thread(target=dp.listeners["dummy_event1"][0])
|
||
t1.start() # blocked. t1 tested its log level
|
||
while dp.t1_step != 1:
|
||
check_dp_exc()
|
||
assert t1.is_alive()
|
||
assert dp._log.level == log.NOTSET
|
||
|
||
self.config["verbose"] = 2
|
||
t2 = threading.Thread(target=dp.listeners["dummy_event2"][0])
|
||
t2.start() # blocked. t2 tested its log level
|
||
while dp.t2_step != 1:
|
||
check_dp_exc()
|
||
assert t2.is_alive()
|
||
assert dp._log.level == log.NOTSET
|
||
|
||
dp.lock1.release() # dummy_event1 tests its log level + finishes
|
||
while dp.t1_step != 2:
|
||
check_dp_exc()
|
||
t1.join(0.1)
|
||
assert not t1.is_alive()
|
||
assert t2.is_alive()
|
||
assert dp._log.level == log.NOTSET
|
||
|
||
dp.lock2.release() # dummy_event2 tests its log level + finishes
|
||
while dp.t2_step != 2:
|
||
check_dp_exc()
|
||
t2.join(0.1)
|
||
assert not t2.is_alive()
|
||
|
||
except Exception:
|
||
print("Alive threads:", threading.enumerate())
|
||
if dp.lock1.locked():
|
||
print("Releasing lock1 after exception in test")
|
||
dp.lock1.release()
|
||
if dp.lock2.locked():
|
||
print("Releasing lock2 after exception in test")
|
||
dp.lock2.release()
|
||
print("Alive threads:", threading.enumerate())
|
||
raise
|
||
|
||
def test_root_logger_levels(self):
|
||
"""Root logger level should be shared between threads."""
|
||
self.config["threaded"] = True
|
||
|
||
blog.getLogger("beets").set_global_level(blog.WARNING)
|
||
with helper.capture_log() as logs:
|
||
self.run_asis_importer()
|
||
assert logs == []
|
||
|
||
blog.getLogger("beets").set_global_level(blog.INFO)
|
||
with helper.capture_log() as logs:
|
||
self.run_asis_importer()
|
||
for line in logs:
|
||
assert "import" in line
|
||
assert "album" in line
|
||
|
||
blog.getLogger("beets").set_global_level(blog.DEBUG)
|
||
with helper.capture_log() as logs:
|
||
self.run_asis_importer()
|
||
assert "Sending event: database_change" in logs
|