mirror of
https://github.com/beetbox/beets.git
synced 2025-12-28 19:42:42 +01:00
properly sandbox all tests
All filesystem writes are now to temporary directories. (Long ago, the policy was to put temporary stuff in _RSRC, but that was a pretty bad idea.)
This commit is contained in:
parent
5477a5d039
commit
77eddaa2d1
4 changed files with 75 additions and 24 deletions
|
|
@ -39,7 +39,7 @@ import beetsplug
|
|||
beetsplug.__path__ = [ os.path.abspath(
|
||||
os.path.join(__file__, '..', '..', 'beetsplug')) ]
|
||||
|
||||
# Test resources/sandbox path.
|
||||
# Test resources path.
|
||||
RSRC = os.path.join(os.path.dirname(__file__), 'rsrc')
|
||||
|
||||
# Propagate to root loger so nosetest can capture it
|
||||
|
|
@ -184,29 +184,41 @@ class Timecop(object):
|
|||
class InputException(Exception):
|
||||
def __init__(self, output=None):
|
||||
self.output = output
|
||||
|
||||
def __str__(self):
|
||||
msg = "Attempt to read with no input provided."
|
||||
if self.output is not None:
|
||||
msg += " Output: %s" % self.output
|
||||
return msg
|
||||
|
||||
|
||||
class DummyOut(object):
|
||||
encoding = 'utf8'
|
||||
|
||||
def __init__(self):
|
||||
self.buf = []
|
||||
|
||||
def write(self, s):
|
||||
self.buf.append(s)
|
||||
|
||||
def get(self):
|
||||
return ''.join(self.buf)
|
||||
|
||||
def clear(self):
|
||||
self.buf = []
|
||||
|
||||
|
||||
class DummyIn(object):
|
||||
encoding = 'utf8'
|
||||
|
||||
def __init__(self, out=None):
|
||||
self.buf = []
|
||||
self.reads = 0
|
||||
self.out = out
|
||||
|
||||
def add(self, s):
|
||||
self.buf.append(s + '\n')
|
||||
|
||||
def readline(self):
|
||||
if not self.buf:
|
||||
if self.out:
|
||||
|
|
@ -215,6 +227,8 @@ class DummyIn(object):
|
|||
raise InputException()
|
||||
self.reads += 1
|
||||
return self.buf.pop(0)
|
||||
|
||||
|
||||
class DummyIO(object):
|
||||
"""Mocks input and output streams for testing UI code."""
|
||||
def __init__(self):
|
||||
|
|
@ -246,6 +260,7 @@ class DummyIO(object):
|
|||
def touch(path):
|
||||
open(path, 'a').close()
|
||||
|
||||
|
||||
class Bag(object):
|
||||
"""An object that exposes a set of fields given as keyword
|
||||
arguments. Any field not found in the dictionary appears to be None.
|
||||
|
|
@ -270,6 +285,7 @@ def platform_windows():
|
|||
finally:
|
||||
os.path = old_path
|
||||
|
||||
|
||||
@contextmanager
|
||||
def platform_posix():
|
||||
import posixpath
|
||||
|
|
@ -280,6 +296,7 @@ def platform_posix():
|
|||
finally:
|
||||
os.path = old_path
|
||||
|
||||
|
||||
@contextmanager
|
||||
def system_mock(name):
|
||||
import platform
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ class TestHelper(object):
|
|||
|
||||
Make sure you call ``teardown_beets()`` afterwards.
|
||||
"""
|
||||
self.temp_dir = mkdtemp()
|
||||
self.create_temp_dir()
|
||||
os.environ['BEETSDIR'] = self.temp_dir
|
||||
|
||||
self.config = beets.config
|
||||
|
|
@ -122,7 +122,7 @@ class TestHelper(object):
|
|||
def teardown_beets(self):
|
||||
del os.environ['BEETSDIR']
|
||||
# FIXME somehow close all open fd to the ilbrary
|
||||
shutil.rmtree(self.temp_dir)
|
||||
self.remove_temp_dir()
|
||||
self.config.clear()
|
||||
|
||||
def load_plugins(self, *plugins):
|
||||
|
|
@ -208,3 +208,14 @@ class TestHelper(object):
|
|||
else:
|
||||
lib = Library(':memory:')
|
||||
beets.ui._raw_main(list(args), lib)
|
||||
|
||||
def create_temp_dir(self):
|
||||
"""Create a temporary directory and assign it into
|
||||
`self.temp_dir`. Call `remove_temp_dir` later to delete it.
|
||||
"""
|
||||
self.temp_dir = mkdtemp()
|
||||
|
||||
def remove_temp_dir(self):
|
||||
"""Delete the temporary directory created by `create_temp_dir`.
|
||||
"""
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
|
|
|||
|
|
@ -31,8 +31,6 @@ from beets import plugins
|
|||
from beets import config
|
||||
from beets.mediafile import MediaFile
|
||||
|
||||
TEMP_LIB = os.path.join(_common.RSRC, 'test_copy.blb')
|
||||
|
||||
# Shortcut to path normalization.
|
||||
np = util.normpath
|
||||
|
||||
|
|
@ -62,7 +60,7 @@ class StoreTest(_common.LibTestCase):
|
|||
|
||||
def test_store_only_writes_dirty_fields(self):
|
||||
original_genre = self.i.genre
|
||||
self.i._values_fixed['genre'] = 'beatboxing' # change w/o dirtying
|
||||
self.i._values_fixed['genre'] = 'beatboxing' # change w/o dirtying
|
||||
self.i.store()
|
||||
new_genre = self.lib._connection().execute(
|
||||
'select genre from items where '
|
||||
|
|
@ -132,6 +130,7 @@ class DestinationTest(_common.TestCase):
|
|||
super(DestinationTest, self).setUp()
|
||||
self.lib = beets.library.Library(':memory:')
|
||||
self.i = item(self.lib)
|
||||
|
||||
def tearDown(self):
|
||||
super(DestinationTest, self).tearDown()
|
||||
self.lib._connection().close()
|
||||
|
|
@ -452,6 +451,7 @@ class PathFormattingMixin(object):
|
|||
"""Utilities for testing path formatting."""
|
||||
def _setf(self, fmt):
|
||||
self.lib.path_formats.insert(0, ('default', fmt))
|
||||
|
||||
def _assert_dest(self, dest, i=None):
|
||||
if i is None:
|
||||
i = self.i
|
||||
|
|
@ -467,6 +467,7 @@ class DestinationFunctionTest(_common.TestCase, PathFormattingMixin):
|
|||
self.lib.directory = '/base'
|
||||
self.lib.path_formats = [('default', u'path')]
|
||||
self.i = item(self.lib)
|
||||
|
||||
def tearDown(self):
|
||||
super(DestinationFunctionTest, self).tearDown()
|
||||
self.lib._connection().close()
|
||||
|
|
@ -613,11 +614,13 @@ class PluginDestinationTest(_common.TestCase):
|
|||
|
||||
# Mock beets.plugins.item_field_getters.
|
||||
self._tv_map = {}
|
||||
|
||||
def field_getters():
|
||||
getters = {}
|
||||
for key, value in self._tv_map.items():
|
||||
getters[key] = lambda _: value
|
||||
return getters
|
||||
|
||||
self.old_field_getters = plugins.item_field_getters
|
||||
plugins.item_field_getters = field_getters
|
||||
|
||||
|
|
@ -876,7 +879,7 @@ class PathTruncationTest(_common.TestCase):
|
|||
class MtimeTest(_common.TestCase):
|
||||
def setUp(self):
|
||||
super(MtimeTest, self).setUp()
|
||||
self.ipath = os.path.join(_common.RSRC, 'testfile.mp3')
|
||||
self.ipath = os.path.join(self.temp_dir, 'testfile.mp3')
|
||||
shutil.copy(os.path.join(_common.RSRC, 'full.mp3'), self.ipath)
|
||||
self.i = beets.library.Item.from_path(self.ipath)
|
||||
self.lib = beets.library.Library(':memory:')
|
||||
|
|
@ -945,8 +948,8 @@ class TemplateTest(_common.LibTestCase):
|
|||
self.album.store()
|
||||
self.assertEqual(self.i.evaluate_template('$foo'), 'baz')
|
||||
|
||||
class WriteTest(_common.LibTestCase):
|
||||
|
||||
class WriteTest(_common.LibTestCase):
|
||||
def test_write_nonexistant(self):
|
||||
self.i.path = '/path/does/not/exist'
|
||||
self.assertRaises(beets.library.ReadError, self.i.write)
|
||||
|
|
@ -956,8 +959,13 @@ class WriteTest(_common.LibTestCase):
|
|||
shutil.copy(os.path.join(_common.RSRC, 'empty.mp3'), path)
|
||||
os.chmod(path, stat.S_IRUSR)
|
||||
|
||||
self.i.path = path
|
||||
self.assertRaises(beets.library.WriteError, self.i.write)
|
||||
try:
|
||||
self.i.path = path
|
||||
self.assertRaises(beets.library.WriteError, self.i.write)
|
||||
|
||||
finally:
|
||||
# Restore write permissions so the file can be cleaned up.
|
||||
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
|
||||
|
||||
def test_write_with_custom_path(self):
|
||||
custom_path = os.path.join(self.temp_dir, 'file.mp3')
|
||||
|
|
|
|||
|
|
@ -19,16 +19,21 @@ import shutil
|
|||
|
||||
import _common
|
||||
from _common import unittest
|
||||
from helper import TestHelper
|
||||
import beets.mediafile
|
||||
|
||||
|
||||
_sc = beets.mediafile._safe_cast
|
||||
|
||||
|
||||
class EdgeTest(unittest.TestCase):
|
||||
def test_emptylist(self):
|
||||
# Some files have an ID3 frame that has a list with no elements.
|
||||
# This is very hard to produce, so this is just the first 8192
|
||||
# bytes of a file found "in the wild".
|
||||
emptylist = beets.mediafile.MediaFile(
|
||||
os.path.join(_common.RSRC, 'emptylist.mp3'))
|
||||
os.path.join(_common.RSRC, 'emptylist.mp3')
|
||||
)
|
||||
genre = emptylist.genre
|
||||
self.assertEqual(genre, None)
|
||||
|
||||
|
|
@ -36,7 +41,8 @@ class EdgeTest(unittest.TestCase):
|
|||
# Ensures that release times delimited by spaces are ignored.
|
||||
# Amie Street produces such files.
|
||||
space_time = beets.mediafile.MediaFile(
|
||||
os.path.join(_common.RSRC, 'space_time.mp3'))
|
||||
os.path.join(_common.RSRC, 'space_time.mp3')
|
||||
)
|
||||
self.assertEqual(space_time.year, 2009)
|
||||
self.assertEqual(space_time.month, 9)
|
||||
self.assertEqual(space_time.day, 4)
|
||||
|
|
@ -45,7 +51,8 @@ class EdgeTest(unittest.TestCase):
|
|||
# Ensures that release times delimited by Ts are ignored.
|
||||
# The iTunes Store produces such files.
|
||||
t_time = beets.mediafile.MediaFile(
|
||||
os.path.join(_common.RSRC, 't_time.m4a'))
|
||||
os.path.join(_common.RSRC, 't_time.m4a')
|
||||
)
|
||||
self.assertEqual(t_time.year, 1987)
|
||||
self.assertEqual(t_time.month, 3)
|
||||
self.assertEqual(t_time.day, 31)
|
||||
|
|
@ -69,7 +76,6 @@ class EdgeTest(unittest.TestCase):
|
|||
self.assertEqual(f.bitrate, 0)
|
||||
|
||||
|
||||
_sc = beets.mediafile._safe_cast
|
||||
class InvalidValueToleranceTest(unittest.TestCase):
|
||||
|
||||
def test_safe_cast_string_to_int(self):
|
||||
|
|
@ -102,15 +108,21 @@ class InvalidValueToleranceTest(unittest.TestCase):
|
|||
self.assertTrue(us.startswith(u'caf'))
|
||||
|
||||
|
||||
class SafetyTest(unittest.TestCase):
|
||||
class SafetyTest(unittest.TestCase, TestHelper):
|
||||
def setUp(self):
|
||||
self.create_temp_dir()
|
||||
|
||||
def tearDown(self):
|
||||
self.remove_temp_dir()
|
||||
|
||||
def _exccheck(self, fn, exc, data=''):
|
||||
fn = os.path.join(_common.RSRC, fn)
|
||||
fn = os.path.join(self.temp_dir, fn)
|
||||
with open(fn, 'w') as f:
|
||||
f.write(data)
|
||||
try:
|
||||
self.assertRaises(exc, beets.mediafile.MediaFile, fn)
|
||||
finally:
|
||||
os.unlink(fn) # delete the temporary file
|
||||
os.unlink(fn) # delete the temporary file
|
||||
|
||||
def test_corrupt_mp3_raises_unreadablefileerror(self):
|
||||
# Make sure we catch Mutagen reading errors appropriately.
|
||||
|
|
@ -139,6 +151,7 @@ class SafetyTest(unittest.TestCase):
|
|||
self._exccheck('nothing.xml', beets.mediafile.UnreadableFileError,
|
||||
"ftyp")
|
||||
|
||||
@unittest.skipIf(not hasattr(os, 'symlink'), 'platform lacks symlink')
|
||||
def test_broken_symlink(self):
|
||||
fn = os.path.join(_common.RSRC, 'brokenlink')
|
||||
os.symlink('does_not_exist', fn)
|
||||
|
|
@ -160,16 +173,17 @@ class SideEffectsTest(unittest.TestCase):
|
|||
self.assertEqual(old_mtime, new_mtime)
|
||||
|
||||
|
||||
class EncodingTest(unittest.TestCase):
|
||||
class EncodingTest(unittest.TestCase, TestHelper):
|
||||
def setUp(self):
|
||||
self.create_temp_dir()
|
||||
src = os.path.join(_common.RSRC, 'full.m4a')
|
||||
self.path = os.path.join(_common.RSRC, 'test.m4a')
|
||||
self.path = os.path.join(self.temp_dir, 'test.m4a')
|
||||
shutil.copy(src, self.path)
|
||||
|
||||
self.mf = beets.mediafile.MediaFile(self.path)
|
||||
|
||||
def tearDown(self):
|
||||
os.remove(self.path)
|
||||
self.remove_temp_dir()
|
||||
|
||||
def test_unicode_label_in_m4a(self):
|
||||
self.mf.label = u'foo\xe8bar'
|
||||
|
|
@ -191,7 +205,7 @@ class MissingAudioDataTest(unittest.TestCase):
|
|||
self.mf = ZeroLengthMediaFile(path)
|
||||
|
||||
def test_bitrate_with_zero_length(self):
|
||||
del self.mf.mgfile.info.bitrate # Not available directly.
|
||||
del self.mf.mgfile.info.bitrate # Not available directly.
|
||||
self.assertEqual(self.mf.bitrate, 0)
|
||||
|
||||
|
||||
|
|
@ -242,15 +256,16 @@ class SoundCheckTest(unittest.TestCase):
|
|||
self.assertEqual(peak, 0.0)
|
||||
|
||||
|
||||
class ID3v23Test(unittest.TestCase):
|
||||
class ID3v23Test(unittest.TestCase, TestHelper):
|
||||
def _make_test(self, ext='mp3'):
|
||||
self.create_temp_dir()
|
||||
src = os.path.join(_common.RSRC, 'full.{0}'.format(ext))
|
||||
self.path = os.path.join(_common.RSRC, 'test.{0}'.format(ext))
|
||||
self.path = os.path.join(self.temp_dir, 'test.{0}'.format(ext))
|
||||
shutil.copy(src, self.path)
|
||||
return beets.mediafile.MediaFile(self.path)
|
||||
|
||||
def _delete_test(self):
|
||||
os.remove(self.path)
|
||||
self.remove_temp_dir()
|
||||
|
||||
def test_v24_year_tag(self):
|
||||
mf = self._make_test()
|
||||
|
|
|
|||
Loading…
Reference in a new issue