mirror of
https://github.com/beetbox/beets.git
synced 2025-12-06 08:39:17 +01:00
parent
4ff7ac5712
commit
199844671e
1 changed files with 77 additions and 0 deletions
|
|
@ -3,6 +3,8 @@ from __future__ import (division, absolute_import, print_function,
|
|||
unicode_literals)
|
||||
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
import logging as log
|
||||
from StringIO import StringIO
|
||||
|
||||
|
|
@ -157,6 +159,81 @@ class LoggingLevelTest(unittest.TestCase, helper.TestHelper):
|
|||
self.assertIn('dummy: debug import_stage', logs)
|
||||
|
||||
|
||||
class ConcurrentEventsTest(TestCase, helper.TestHelper):
|
||||
"""Similar to LoggingLevelTest but lower-level and focused on multiple
|
||||
events interaction. Since this is a bit heavy we don't do it in
|
||||
LoggingLevelTest.
|
||||
"""
|
||||
class DummyPlugin(plugins.BeetsPlugin):
|
||||
def __init__(self, test_case):
|
||||
plugins.BeetsPlugin.__init__(self, 'dummy')
|
||||
# self.import_stages = [self.import_stage]
|
||||
self.register_listener('dummy_event1', self.listener1)
|
||||
self.register_listener('dummy_event2', self.listener2)
|
||||
self.lock1 = threading.Lock()
|
||||
self.lock2 = threading.Lock()
|
||||
self.test_case = test_case
|
||||
|
||||
def log_all(self, name):
|
||||
self._log.debug('debug ' + name)
|
||||
self._log.info('info ' + name)
|
||||
self._log.warning('warning ' + name)
|
||||
|
||||
def listener1(self):
|
||||
self.test_case.assertEqual(self._log.level, log.INFO)
|
||||
self.lock1.acquire()
|
||||
self.test_case.assertEqual(self._log.level, log.INFO)
|
||||
|
||||
def listener2(self):
|
||||
self.test_case.assertEqual(self._log.level, log.DEBUG)
|
||||
self.lock2.acquire()
|
||||
self.test_case.assertEqual(self._log.level, log.DEBUG)
|
||||
|
||||
def setUp(self):
|
||||
self.setup_beets()
|
||||
|
||||
def tearDown(self):
|
||||
self.teardown_beets()
|
||||
|
||||
def test_concurrent_events(self):
|
||||
dp = self.DummyPlugin(self)
|
||||
try:
|
||||
dp.lock1.acquire()
|
||||
dp.lock2.acquire()
|
||||
self.assertEqual(dp._log.level, log.NOTSET)
|
||||
|
||||
t1 = threading.Thread(target=dp.listeners['dummy_event1'][0])
|
||||
t1.start() # blocked. t1 tested its log level
|
||||
self.assertTrue(t1.is_alive())
|
||||
self.assertEqual(dp._log.level, log.NOTSET)
|
||||
|
||||
self.config['verbose'] = 2
|
||||
t2 = threading.Thread(target=dp.listeners['dummy_event2'][0])
|
||||
t2.start() # blocked. t2 tested its log level
|
||||
self.assertTrue(t2.is_alive())
|
||||
self.assertEqual(dp._log.level, log.NOTSET)
|
||||
|
||||
dp.lock1.release()
|
||||
time.sleep(.1) # dummy_event1 tests its log level + finishes
|
||||
self.assertFalse(t1.is_alive())
|
||||
self.assertTrue(t2.is_alive())
|
||||
self.assertEqual(dp._log.level, log.NOTSET)
|
||||
dp.lock2.release()
|
||||
time.sleep(.1) # dummy_event2 tests its log level + finishes
|
||||
self.assertFalse(t2.is_alive())
|
||||
|
||||
except:
|
||||
print("Alive threads:", threading.enumerate())
|
||||
if dp.lock1.locked():
|
||||
print("Releasing lock1 after exception in test")
|
||||
dp.lock1.release()
|
||||
if dp.lock2.locked():
|
||||
print("Releasing lock2 after exception in test")
|
||||
dp.lock2.release()
|
||||
print("Alive threads:", threading.enumerate())
|
||||
raise
|
||||
|
||||
|
||||
def suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue