"""Stupid tests that ensure logging works as expected""" import logging as log import sys import threading from types import ModuleType import pytest import beets.logging as blog from beets import plugins, ui from beets.test import _common, helper from beets.test.helper import AsIsImporterMixin, ImportTestCase, PluginMixin class TestStrFormatLogger: """Tests for the custom str-formatting logger.""" def test_logger_creation(self): l1 = log.getLogger("foo123") l2 = blog.getLogger("foo123") assert l1 == l2 assert l1.__class__ == log.Logger l3 = blog.getLogger("bar123") l4 = log.getLogger("bar123") assert l3 == l4 assert l3.__class__ == blog.BeetsLogger assert isinstance( l3, (blog.StrFormatLogger, blog.ThreadLocalLevelLogger) ) l5 = l3.getChild("shalala") assert l5.__class__ == blog.BeetsLogger l6 = blog.getLogger() assert l1 != l6 @pytest.mark.parametrize( "level", [log.DEBUG, log.INFO, log.WARNING, log.ERROR] ) @pytest.mark.parametrize( "msg, args, kwargs, expected", [ ("foo {} bar {}", ("oof", "baz"), {}, "foo oof bar baz"), ( "foo {bar} baz {foo}", (), {"foo": "oof", "bar": "baz"}, "foo baz baz oof", ), ("no args", (), {}, "no args"), ("foo {} bar {baz}", ("oof",), {"baz": "baz"}, "foo oof bar baz"), ], ) def test_str_format_logging( self, level, msg, args, kwargs, expected, caplog ): logger = blog.getLogger("test_logger") logger.setLevel(level) with caplog.at_level(level, logger="test_logger"): logger.log(level, msg, *args, **kwargs) assert caplog.records, "No log records were captured" assert str(caplog.records[0].msg) == expected class DummyModule(ModuleType): class DummyPlugin(plugins.BeetsPlugin): def __init__(self): plugins.BeetsPlugin.__init__(self, "dummy") self.import_stages = [self.import_stage] self.register_listener("dummy_event", self.listener) def log_all(self, name): self._log.debug("debug {}", name) self._log.info("info {}", name) self._log.warning("warning {}", name) def commands(self): cmd = ui.Subcommand("dummy") cmd.func = lambda _, __, ___: self.log_all("cmd") return (cmd,) def import_stage(self, session, task): self.log_all("import_stage") def listener(self): self.log_all("listener") def __init__(self, *_, **__): module_name = "beetsplug.dummy" super().__init__(module_name) self.DummyPlugin.__module__ = module_name self.DummyPlugin = self.DummyPlugin class LoggingLevelTest(AsIsImporterMixin, PluginMixin, ImportTestCase): plugin = "dummy" @classmethod def setUpClass(cls): patcher = patch.dict(sys.modules, {"beetsplug.dummy": DummyModule()}) patcher.start() cls.addClassCleanup(patcher.stop) super().setUpClass() def test_command_level0(self): self.config["verbose"] = 0 with helper.capture_log() as logs: self.run_command("dummy") assert "dummy: warning cmd" in logs assert "dummy: info cmd" in logs assert "dummy: debug cmd" not in logs def test_command_level1(self): self.config["verbose"] = 1 with helper.capture_log() as logs: self.run_command("dummy") assert "dummy: warning cmd" in logs assert "dummy: info cmd" in logs assert "dummy: debug cmd" in logs def test_command_level2(self): self.config["verbose"] = 2 with helper.capture_log() as logs: self.run_command("dummy") assert "dummy: warning cmd" in logs assert "dummy: info cmd" in logs assert "dummy: debug cmd" in logs def test_listener_level0(self): self.config["verbose"] = 0 with helper.capture_log() as logs: plugins.send("dummy_event") assert "dummy: warning listener" in logs assert "dummy: info listener" not in logs assert "dummy: debug listener" not in logs def test_listener_level1(self): self.config["verbose"] = 1 with helper.capture_log() as logs: plugins.send("dummy_event") assert "dummy: warning listener" in logs assert "dummy: info listener" in logs assert "dummy: debug listener" not in logs def test_listener_level2(self): self.config["verbose"] = 2 with helper.capture_log() as logs: plugins.send("dummy_event") assert "dummy: warning listener" in logs assert "dummy: info listener" in logs assert "dummy: debug listener" in logs def test_import_stage_level0(self): self.config["verbose"] = 0 with helper.capture_log() as logs: self.run_asis_importer() assert "dummy: warning import_stage" in logs assert "dummy: info import_stage" not in logs assert "dummy: debug import_stage" not in logs def test_import_stage_level1(self): self.config["verbose"] = 1 with helper.capture_log() as logs: self.run_asis_importer() assert "dummy: warning import_stage" in logs assert "dummy: info import_stage" in logs assert "dummy: debug import_stage" not in logs def test_import_stage_level2(self): self.config["verbose"] = 2 with helper.capture_log() as logs: self.run_asis_importer() assert "dummy: warning import_stage" in logs assert "dummy: info import_stage" in logs assert "dummy: debug import_stage" in logs @_common.slow_test() class ConcurrentEventsTest(AsIsImporterMixin, ImportTestCase): """Similar to LoggingLevelTest but lower-level and focused on multiple events interaction. Since this is a bit heavy we don't do it in LoggingLevelTest. """ db_on_disk = True class DummyPlugin(plugins.BeetsPlugin): def __init__(self, test_case): plugins.BeetsPlugin.__init__(self, "dummy") self.register_listener("dummy_event1", self.listener1) self.register_listener("dummy_event2", self.listener2) self.lock1 = threading.Lock() self.lock2 = threading.Lock() self.test_case = test_case self.exc = None self.t1_step = self.t2_step = 0 def log_all(self, name): self._log.debug("debug {}", name) self._log.info("info {}", name) self._log.warning("warning {}", name) def listener1(self): try: assert self._log.level == log.INFO self.t1_step = 1 self.lock1.acquire() assert self._log.level == log.INFO self.t1_step = 2 except Exception as e: self.exc = e def listener2(self): try: assert self._log.level == log.DEBUG self.t2_step = 1 self.lock2.acquire() assert self._log.level == log.DEBUG self.t2_step = 2 except Exception as e: self.exc = e def test_concurrent_events(self): dp = self.DummyPlugin(self) def check_dp_exc(): if dp.exc: raise dp.exc try: dp.lock1.acquire() dp.lock2.acquire() assert dp._log.level == log.NOTSET self.config["verbose"] = 1 t1 = threading.Thread(target=dp.listeners["dummy_event1"][0]) t1.start() # blocked. t1 tested its log level while dp.t1_step != 1: check_dp_exc() assert t1.is_alive() assert dp._log.level == log.NOTSET self.config["verbose"] = 2 t2 = threading.Thread(target=dp.listeners["dummy_event2"][0]) t2.start() # blocked. t2 tested its log level while dp.t2_step != 1: check_dp_exc() assert t2.is_alive() assert dp._log.level == log.NOTSET dp.lock1.release() # dummy_event1 tests its log level + finishes while dp.t1_step != 2: check_dp_exc() t1.join(0.1) assert not t1.is_alive() assert t2.is_alive() assert dp._log.level == log.NOTSET dp.lock2.release() # dummy_event2 tests its log level + finishes while dp.t2_step != 2: check_dp_exc() t2.join(0.1) assert not t2.is_alive() except Exception: print("Alive threads:", threading.enumerate()) if dp.lock1.locked(): print("Releasing lock1 after exception in test") dp.lock1.release() if dp.lock2.locked(): print("Releasing lock2 after exception in test") dp.lock2.release() print("Alive threads:", threading.enumerate()) raise def test_root_logger_levels(self): """Root logger level should be shared between threads.""" self.config["threaded"] = True blog.getLogger("beets").set_global_level(blog.WARNING) with helper.capture_log() as logs: self.run_asis_importer() assert logs == [] blog.getLogger("beets").set_global_level(blog.INFO) with helper.capture_log() as logs: self.run_asis_importer() for line in logs: assert "import" in line assert "album" in line blog.getLogger("beets").set_global_level(blog.DEBUG) with helper.capture_log() as logs: self.run_asis_importer() assert "Sending event: database_change" in logs