Extend attachment tests

This commit is contained in:
Thomas Scholtes 2014-08-14 15:14:04 +02:00
parent f6e6af08d2
commit 2c05a874ec
4 changed files with 161 additions and 114 deletions

View file

@ -40,6 +40,9 @@ def config(key):
return config['attachments'][key]
def track_separators():
return config('track separators').get(list) + [os.sep]
def ref_type(entity):
# FIXME prevents circular dependency
from beets.library import Item, Album
@ -122,23 +125,27 @@ class Attachment(dbcore.db.Model):
if self.path == dest:
return self.path
if os.path.exists(dest) and not overwrite:
root, ext = os.path.splitext(dest)
log.warn('attachment destination already exists: {0}'
.format(displayable_path(dest)))
if os.path.exists(dest):
if overwrite:
log.warn('overwrite attachment destination {0}'
.format(displayable_path(dest)))
else:
log.warn('attachment destination already exists: {0}'
.format(displayable_path(dest)))
root, ext = os.path.splitext(dest)
alternative = 0
while os.path.exists(dest):
alternative += 1
dest = root + ".{0}".format(alternative) + ext
alternative = 0
while os.path.exists(dest):
alternative += 1
dest = root + ".{0}".format(alternative) + ext
if copy:
util.copy(self.path, dest, overwrite)
log.warn('copy attachment to {0}'
log.info('copy attachment to {0}'
.format(displayable_path(dest)))
else:
util.move(self.path, dest, overwrite)
log.warn('move attachment to {0}'
log.info('move attachment to {0}'
.format(displayable_path(dest)))
self.path = dest
self.store()
@ -247,8 +254,7 @@ class DestinationTemplateMapping(collections.Mapping):
if ref_type(self.entity) == 'album':
return self['entity_dir']
elif ref_type(self.entity) == 'item':
separator = config('track separators').get(list)[0]
return self['track_base'] + separator
return self['track_base'] + track_separators()[0]
@property
def ext_prefix(self):
@ -349,6 +355,37 @@ class AttachmentFactory(object):
self._detectors = []
self._collectors = []
def create(self, path, type, entity=None):
"""Return a populated `Attachment` instance.
The `path`, `type`, and `entity` properties of the attachment
are set corresponding to the arguments. In addition the method
set retrieves meta data from registered collectors and and adds
it as flexible attributes.
Also sets the attachments's basename to the value returned by
`Attachment.basename()`. Therefore, if the entity is moved
later, we retain the basename instead of recalculating it
through `attachment.basename`.
"""
# TODO entity should not be optional
attachment = Attachment(db=self._db, path=path,
entity=entity, type=type)
attachment.basename = self.basename(path, entity)
for key, value in self._collect_meta(type, attachment.path).items():
attachment[key] = value
return attachment
def add(self, path, type, entity):
"""Create an attachment, add it to the database and return it.
This is the same as calling `create()` and then adding the
attachment to the database.
"""
attachment = self.create(path, type, entity)
self._db.add(attachment)
return attachment
def find(self, attachment_query=None, entity_query=None):
"""Yield all attachments in the library matching
`attachment_query` and their associated items matching
@ -365,16 +402,6 @@ class AttachmentFactory(object):
queries.append(attachment_query)
return self._db._fetch(Attachment, AndQuery(queries))
def detect(self, path, entity=None):
"""Yield a list of attachments for types registered with the path.
The method uses the registered type detector functions to get
a list of types for `path`. For each type it yields an attachment
through `create`.
"""
for type in self._detect_types(path):
yield self.create(path, type, entity)
def discover(self, entity_or_prefix, local=None):
"""Return a list of non-audio files whose path start with the
entity prefix.
@ -410,7 +437,7 @@ class AttachmentFactory(object):
return discovered
def _discover_local(self, prefix, local):
seps = config('track separators').get(list)
seps = track_separators()
if local[0] == '.':
seps.append('')
for sep in seps:
@ -419,28 +446,15 @@ class AttachmentFactory(object):
return [path]
return []
@classmethod
def path_prefix(cls, entity_or_prefix):
# TODO doc
if isinstance(entity_or_prefix, basestring):
if os.path.isdir(entity_or_prefix):
dir = entity_or_prefix
prefix = dir
else:
prefix = os.path.splitext(entity_or_prefix)[0]
dir = os.path.dirname(prefix)
elif ref_type(entity_or_prefix) == 'album':
try:
dir = entity_or_prefix.item_dir()
prefix = dir
except ValueError:
raise ValueError('Could not determine album directory')
else: # entity is track
if entity_or_prefix.path is None:
raise ValueError('Item has no path')
prefix = os.path.splitext(entity_or_prefix.path)[0]
dir = os.path.dirname(prefix)
return (prefix, dir)
def detect(self, path, entity=None):
"""Yield a list of attachments for types registered with the path.
The method uses the registered type detector functions to get
a list of types for `path`. For each type it yields an attachment
through `create`.
"""
for type in self._detect_types(path):
yield self.create(path, type, entity)
@classmethod
def basename(cls, path, entity_or_prefix):
@ -478,7 +492,7 @@ class AttachmentFactory(object):
if ref_type(entity_or_prefix) == 'album':
separators = [os.sep]
else:
separators = config('track separators').get(list)
separators = track_separators()
prefix, _ = cls.path_prefix(entity_or_prefix)
for sep in separators:
@ -486,36 +500,28 @@ class AttachmentFactory(object):
return path[(len(prefix) + len(sep)):]
return os.path.basename(path)
def create(self, path, type, entity=None):
"""Return a populated `Attachment` instance.
The `path`, `type`, and `entity` properties of the attachment
are set corresponding to the arguments. In addition the method
set retrieves meta data from registered collectors and and adds
it as flexible attributes.
Also sets the attachments's basename to the value returned by
`Attachment.basename()`. Therefore, if the entity is moved
later, we retain the basename instead of recalculating it
through `attachment.basename`.
"""
# TODO entity should not be optional
attachment = Attachment(db=self._db, path=path,
entity=entity, type=type)
attachment.basename = self.basename(path, entity)
for key, value in self._collect_meta(type, attachment.path).items():
attachment[key] = value
return attachment
def add(self, path, type, entity):
"""Create an attachment, add it to the database and return it.
This is the same as calling `create()` and then adding the
attachment to the database.
"""
attachment = self.create(path, type, entity)
self._db.add(attachment)
return attachment
@classmethod
def path_prefix(cls, entity_or_prefix):
# TODO doc
if isinstance(entity_or_prefix, basestring):
if os.path.isdir(entity_or_prefix):
dir = entity_or_prefix
prefix = dir
else:
prefix = os.path.splitext(entity_or_prefix)[0]
dir = os.path.dirname(prefix)
elif ref_type(entity_or_prefix) == 'album':
try:
dir = entity_or_prefix.item_dir()
prefix = dir
except ValueError:
raise ValueError('Could not determine album directory')
else: # entity is track
if entity_or_prefix.path is None:
raise ValueError('Item has no path')
prefix = os.path.splitext(entity_or_prefix.path)[0]
dir = os.path.dirname(prefix)
return (prefix, dir)
def register_detector(self, detector):
"""`detector` is a callable accepting the path of an attachment

View file

@ -20,6 +20,7 @@ import:
detail: no
flat: no
group_albums: no
attachments: yes
clutter: ["Thumbs.DB", ".DS_Store"]
ignore: [".*", "*~", "System Volume Information"]

View file

@ -1129,7 +1129,8 @@ def apply_choices(session, task):
if task.is_album:
task.infer_album_fields()
task.discover_attachments(session.attachment_factory)
if session.config['attachments']:
task.discover_attachments(session.attachment_factory)
task.add(session.lib)

View file

@ -28,10 +28,8 @@ class AttachmentTestHelper(TestHelper):
def setup_beets(self):
super(AttachmentTestHelper, self).setup_beets()
self.config['attachments'] = {
'paths': ['${entity_prefix}${basename}'],
'track separators': [' - ', ' ', '-', '_', '.', os.sep]
}
self.set_path_template('${entity_prefix}${basename}')
self.set_track_separator(' - ', ' ', '.')
@property
def factory(self):
@ -104,6 +102,12 @@ class AttachmentTestHelper(TestHelper):
log_plugin.attachment_collector = collector
self.add_plugin(log_plugin)
def set_path_template(self, *templates):
self.config['attachments']['paths'] = templates
def set_track_separator(self, *separators):
self.config['attachments']['track separators'] = list(separators)
def runcli(self, *args):
beets.ui._raw_main(list(args), self.lib)
@ -218,11 +222,6 @@ class AttachmentDestinationTest(unittest.TestCase, AttachmentTestHelper):
attachment.path = attachment.destination
self.assertEqual('/track--a.ext', attachment.destination)
# Helper
def set_path_template(self, *templates):
self.config['attachments']['paths'] = templates
class AttachmentTest(unittest.TestCase, AttachmentTestHelper):
"""Test `attachment.move()`.
@ -261,27 +260,41 @@ class AttachmentTest(unittest.TestCase, AttachmentTestHelper):
def test_move_dest_exists(self):
attachment = self.create_album_attachment(self.touch('a.jpg'))
dest = attachment.destination
dest_root, dest_ext = os.path.splitext(dest)
self.touch(dest)
# TODO test log warning
attachment.move()
dest_root, dest_ext = os.path.splitext(dest)
dest_alternative = dest_root + '.1' + dest_ext
self.assertEqual(dest_root + '.1' + dest_ext, attachment.path)
with capture_log() as logs:
attachment.move()
self.assertEqual(dest_alternative, attachment.path)
self.assertTrue(os.path.isfile(attachment.path))
self.assertTrue(os.path.isfile(attachment.destination))
self.assertIn(
'attachment destination already exists: {0}'.format(dest), logs
)
self.assertIn(
'move attachment to {0}'.format(dest_alternative), logs
)
def test_move_overwrite(self):
attachment_path = self.touch('a.jpg', content='JPEG')
attachment = self.create_album_attachment(attachment_path)
self.touch(attachment.destination, content='NONJPEG')
dest = attachment.destination
self.touch(dest, content='NONJPEG')
# TODO test log warning
attachment.move(overwrite=True)
with capture_log() as logs:
attachment.move(overwrite=True)
with open(attachment.destination, 'r') as f:
with open(dest, 'r') as f:
self.assertEqual(f.read(), 'JPEG')
self.assertIn(
'overwrite attachment destination {0}'.format(dest), logs
)
class AttachmentFactoryTest(unittest.TestCase, AttachmentTestHelper):
"""Tests the following methods of `AttachmentFactory`
@ -290,6 +303,7 @@ class AttachmentFactoryTest(unittest.TestCase, AttachmentTestHelper):
* factory.detect() and type detectors (config and plugin)
* factory.discover()
* factory.find()
* factory.basename()
"""
def setUp(self):
@ -479,7 +493,9 @@ class EntityAttachmentsTest(unittest.TestCase, AttachmentTestHelper):
self.assertEqual(queried.id, attachments[1].id)
class AttachImportTest(unittest.TestCase, AttachmentTestHelper):
class AttachmentImportTest(unittest.TestCase, AttachmentTestHelper):
"""Attachments should be created in the importer.
"""
def setUp(self):
self.setup_beets()
@ -503,6 +519,14 @@ class AttachImportTest(unittest.TestCase, AttachmentTestHelper):
self.assertEqual(attachment.path,
os.path.join(album.item_dir(), 'cover.jpg'))
def test_config_disable(self):
self.config['import']['attachments'] = False
album_dir = os.path.join(self.importer.paths[0], 'album 0')
self.touch('cover.jpg', dir=album_dir)
self.importer.run()
album = self.lib.albums().get()
self.assertEqual(len(album.attachments()), 0)
def test_add_singleton_track_attachment(self):
self.config['import']['singletons'] = True
track_prefix = \
@ -533,6 +557,8 @@ class AttachCommandTest(unittest.TestCase, AttachmentTestHelper):
self.teardown_beets()
self.unload_plugins()
# attach FILE ALBUM_QUERY
def test_attach_to_album(self):
album = self.add_album('albumtitle')
@ -574,6 +600,20 @@ class AttachCommandTest(unittest.TestCase, AttachmentTestHelper):
attachment = album.attachments().get()
self.assertEqual(attachment.path, attachment_path)
def test_unknown_type_warning(self):
album = self.add_album('albumtitle')
attachment_path = self.touch('unkown')
with capture_log() as logs:
self.runcli('attach', attachment_path)
self.assertIn('unknown attachment: {0}'.format(attachment_path), logs)
self.assertIsNone(album.attachments().get())
def test_interactive_type(self):
self.skipTest('not implemented yet')
# attach --track FILE ITEM_QUERY
def test_attach_to_item(self):
item = self.add_item(title='tracktitle')
attachment_path = self.touch('attachment.log')
@ -595,6 +635,7 @@ class AttachCommandTest(unittest.TestCase, AttachmentTestHelper):
self.assertTrue(os.path.isfile(dest))
# attach --local FILE ALBUM_QUERY
# TODO globs
def test_local_album_file(self):
albums = [self.add_album('album 1'), self.add_album('album 2')]
@ -607,7 +648,10 @@ class AttachCommandTest(unittest.TestCase, AttachmentTestHelper):
for album in albums:
self.assertEqual(len(list(album.attachments())), 1)
attachment = album.attachments().get()
self.assertEqual(attachment.type, 'log')
self.assertEqual(attachment.path,
os.path.join(album.item_dir(), 'inalbumdir.log'))
# attach --track --local FILE ITEM_QUERY
@ -640,6 +684,16 @@ class AttachCommandTest(unittest.TestCase, AttachmentTestHelper):
self.assertEqual(len(list(item.attachments())), 1)
def test_local_track_file_with_custom_separator(self):
self.set_track_separator('XX', '||')
item = self.add_item('song')
prefix = os.path.splitext(item.path)[0]
self.touch(prefix + '||rip.log')
self.runcli('attach', '--track', '--local', 'rip.log')
attachment = item.attachments().get()
self.assertEqual(prefix + 'XXrip.log', attachment.path)
# attach --discover ALBUM_QUERY
def test_discover_in_album_dir(self):
@ -658,10 +712,8 @@ class AttachCommandTest(unittest.TestCase, AttachmentTestHelper):
self.assertEqual(len(attachments1), 1)
self.assertEqual(attachments1[0].type, 'png')
attachments2 = list(album2.attachments())
self.assertEqual(len(attachments2), 2)
self.assertItemsEqual(map(lambda a: a.type, attachments2),
['png', 'log'])
attachment_types = map(lambda a: a.type, (album2.attachments()))
self.assertItemsEqual(attachment_types, ['png', 'log'])
# attach --discover --track ITEM_QUERY
@ -680,12 +732,11 @@ class AttachCommandTest(unittest.TestCase, AttachmentTestHelper):
self.runcli('attach', '--discover', '--track')
for track in tracks:
self.assertEqual(len(list(track.attachments())), 5)
attachment_types = map(lambda a: a.type, list(track.attachments()))
attachment_types = map(lambda a: a.type, track.attachments())
self.assertItemsEqual(['png', 'png', 'png', 'log', 'log'],
attachment_types)
# attach --type TYPE QUERY
# attach --type TYPE FILE QUERY
def test_user_type(self):
album = self.add_album('albumtitle')
@ -695,18 +746,6 @@ class AttachCommandTest(unittest.TestCase, AttachmentTestHelper):
attachment = album.attachments().get()
self.assertEqual(attachment.type, 'customtype')
def test_unknown_type_warning(self):
album = self.add_album('albumtitle')
attachment_path = self.touch('unkown')
with capture_log() as logs:
self.runcli('attach', attachment_path)
self.assertIn('unknown attachment: {0}'.format(attachment_path), logs)
self.assertIsNone(album.attachments().get())
def test_interactive_type(self):
self.skipTest('not implemented yet')
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)