diff --git a/test/test_logging.py b/test/test_logging.py index ae22def46..a579836c3 100644 --- a/test/test_logging.py +++ b/test/test_logging.py @@ -3,6 +3,8 @@ from __future__ import (division, absolute_import, print_function, unicode_literals) import sys +import time +import threading import logging as log from StringIO import StringIO @@ -157,6 +159,81 @@ class LoggingLevelTest(unittest.TestCase, helper.TestHelper): self.assertIn('dummy: debug import_stage', logs) +class ConcurrentEventsTest(TestCase, helper.TestHelper): + """Similar to LoggingLevelTest but lower-level and focused on multiple + events interaction. Since this is a bit heavy we don't do it in + LoggingLevelTest. + """ + class DummyPlugin(plugins.BeetsPlugin): + def __init__(self, test_case): + plugins.BeetsPlugin.__init__(self, 'dummy') + # self.import_stages = [self.import_stage] + self.register_listener('dummy_event1', self.listener1) + self.register_listener('dummy_event2', self.listener2) + self.lock1 = threading.Lock() + self.lock2 = threading.Lock() + self.test_case = test_case + + def log_all(self, name): + self._log.debug('debug ' + name) + self._log.info('info ' + name) + self._log.warning('warning ' + name) + + def listener1(self): + self.test_case.assertEqual(self._log.level, log.INFO) + self.lock1.acquire() + self.test_case.assertEqual(self._log.level, log.INFO) + + def listener2(self): + self.test_case.assertEqual(self._log.level, log.DEBUG) + self.lock2.acquire() + self.test_case.assertEqual(self._log.level, log.DEBUG) + + def setUp(self): + self.setup_beets() + + def tearDown(self): + self.teardown_beets() + + def test_concurrent_events(self): + dp = self.DummyPlugin(self) + try: + dp.lock1.acquire() + dp.lock2.acquire() + self.assertEqual(dp._log.level, log.NOTSET) + + t1 = threading.Thread(target=dp.listeners['dummy_event1'][0]) + t1.start() # blocked. t1 tested its log level + self.assertTrue(t1.is_alive()) + self.assertEqual(dp._log.level, log.NOTSET) + + self.config['verbose'] = 2 + t2 = threading.Thread(target=dp.listeners['dummy_event2'][0]) + t2.start() # blocked. t2 tested its log level + self.assertTrue(t2.is_alive()) + self.assertEqual(dp._log.level, log.NOTSET) + + dp.lock1.release() + time.sleep(.1) # dummy_event1 tests its log level + finishes + self.assertFalse(t1.is_alive()) + self.assertTrue(t2.is_alive()) + self.assertEqual(dp._log.level, log.NOTSET) + dp.lock2.release() + time.sleep(.1) # dummy_event2 tests its log level + finishes + self.assertFalse(t2.is_alive()) + + except: + print("Alive threads:", threading.enumerate()) + if dp.lock1.locked(): + print("Releasing lock1 after exception in test") + dp.lock1.release() + if dp.lock2.locked(): + print("Releasing lock2 after exception in test") + dp.lock2.release() + print("Alive threads:", threading.enumerate()) + raise + + def suite(): return unittest.TestLoader().loadTestsFromName(__name__)