mirror of
https://github.com/beetbox/beets.git
synced 2026-02-17 04:43:40 +01:00
Discover attachment files
This commit is contained in:
parent
ee19abc293
commit
1a630908c9
4 changed files with 470 additions and 131 deletions
|
|
@ -29,6 +29,10 @@ from beets.util.functemplate import Template
|
|||
log = logging.getLogger('beets')
|
||||
|
||||
|
||||
AUDIO_EXTENSIONS = ['.mp3', '.ogg', '.mp4', '.m4a', '.mpc',
|
||||
'.wma', '.wv', '.flac', '.aiff', '.ape']
|
||||
|
||||
|
||||
def ref_type(entity):
|
||||
# FIXME prevents circular dependency
|
||||
from beets.library import Item, Album
|
||||
|
|
@ -103,10 +107,12 @@ class Attachment(dbcore.db.Model):
|
|||
|
||||
If `copy` is `False` (the default) then the original file is deleted.
|
||||
"""
|
||||
|
||||
if dest is None:
|
||||
dest = self.destination
|
||||
|
||||
if self.path == dest:
|
||||
return self.path
|
||||
|
||||
if os.path.exists(dest) and not overwrite:
|
||||
root, ext = os.path.splitext(dest)
|
||||
log.warn('attachment destination already exists: {0}'
|
||||
|
|
@ -316,7 +322,7 @@ class AttachmentFactory(object):
|
|||
def __init__(self, db=None):
|
||||
self._db = db
|
||||
self._libdir = db.directory
|
||||
self._discoverers = []
|
||||
self._detectors = []
|
||||
self._collectors = []
|
||||
|
||||
def find(self, attachment_query=None, entity_query=None):
|
||||
|
|
@ -335,16 +341,72 @@ class AttachmentFactory(object):
|
|||
queries.append(attachment_query)
|
||||
return self._db._fetch(Attachment, AndQuery(queries))
|
||||
|
||||
def discover(self, path, entity=None):
|
||||
def detect(self, path, entity=None):
|
||||
"""Yield a list of attachments for types registered with the path.
|
||||
|
||||
The method uses the registered type discoverer functions to get
|
||||
The method uses the registered type detector functions to get
|
||||
a list of types for `path`. For each type it yields an attachment
|
||||
through `create`.
|
||||
"""
|
||||
for type in self._discover_types(path):
|
||||
for type in self._detect_types(path):
|
||||
yield self.create(path, type, entity)
|
||||
|
||||
def discover(self, entity, local=None):
|
||||
"""Return a list of non-audio files whose path start with the
|
||||
entity prefix.
|
||||
|
||||
For albums the entity prefix is the album directory. For items it
|
||||
is the item's path, excluding the extension.
|
||||
|
||||
If the `local` argument is given the method returns a singleton
|
||||
list consisting of the path `entity_preifix + separator + local` if
|
||||
it exists. Multiple separators are tried depening on the entity
|
||||
type. For albums the only separator is the directory separator.
|
||||
For items the separtors are configured by `attachments.item_sep`
|
||||
"""
|
||||
if local is None:
|
||||
return self._discover_full(entity)
|
||||
else:
|
||||
return self._discover_local(entity, local)
|
||||
|
||||
def _discover_full(self, entity):
|
||||
if ref_type(entity) == 'album':
|
||||
entity_dir = entity.item_dir()
|
||||
entity_prefix = entity_dir
|
||||
else:
|
||||
entity_dir = os.path.dirname(entity.path)
|
||||
entity_prefix = os.path.splitext(entity.path)[0]
|
||||
|
||||
discovered = []
|
||||
for dirpath, dirnames, filenames in os.walk(entity_dir):
|
||||
for dirname in dirnames:
|
||||
path = os.path.join(dirpath, dirname)
|
||||
if not path.startswith(entity_prefix):
|
||||
dirnames.remove(dirname)
|
||||
|
||||
for filename in filenames:
|
||||
path = os.path.join(dirpath, filename)
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
if path.startswith(entity_prefix) \
|
||||
and ext not in AUDIO_EXTENSIONS:
|
||||
discovered.append(path)
|
||||
return discovered
|
||||
|
||||
def _discover_local(self, entity, local):
|
||||
if ref_type(entity) == 'album':
|
||||
seps = [os.sep]
|
||||
entity_prefix = entity.item_dir()
|
||||
else:
|
||||
# TODO make this configurable
|
||||
seps = [os.sep, ' - ', '', ' ', '-', '_', '.']
|
||||
entity_prefix = os.path.splitext(entity.path)[0]
|
||||
|
||||
for sep in seps:
|
||||
path = entity_prefix + sep + local
|
||||
if os.path.isfile(path):
|
||||
return [path]
|
||||
return []
|
||||
|
||||
def create(self, path, type, entity=None):
|
||||
"""Return a populated `Attachment` instance.
|
||||
|
||||
|
|
@ -359,12 +421,22 @@ class AttachmentFactory(object):
|
|||
attachment[key] = value
|
||||
return attachment
|
||||
|
||||
def register_discoverer(self, discover):
|
||||
"""`discover` is a callable accepting the path of an attachment
|
||||
def add(self, path, type, entity):
|
||||
"""Create an attachment, add it to the database and return it.
|
||||
|
||||
This is the same as calling `create()` and then adding the
|
||||
attachment to the database.
|
||||
"""
|
||||
attachment = self.create(path, type, entity)
|
||||
self._db.add(attachment)
|
||||
return attachment
|
||||
|
||||
def register_detector(self, detector):
|
||||
"""`detector` is a callable accepting the path of an attachment
|
||||
as its only argument. If it was able to determine the type it
|
||||
returns its name as a string. Otherwise it must return `None`
|
||||
"""
|
||||
self._discoverers.append(discover)
|
||||
self._detectors.append(detector)
|
||||
|
||||
def register_collector(self, collector):
|
||||
"""`collector` is a callable accepting the type and path of an
|
||||
|
|
@ -376,22 +448,33 @@ class AttachmentFactory(object):
|
|||
|
||||
def register_plugins(self, plugins):
|
||||
for plugin in plugins:
|
||||
if hasattr(plugin, 'attachment_discoverer'):
|
||||
self.register_discoverer(plugin.attachment_discoverer)
|
||||
if hasattr(plugin, 'attachment_detector'):
|
||||
self.register_detector(plugin.attachment_detector)
|
||||
if hasattr(plugin, 'attachment_collector'):
|
||||
self.register_collector(plugin.attachment_collector)
|
||||
|
||||
def _discover_types(self, path):
|
||||
types = []
|
||||
for discover in self._discoverers:
|
||||
def _detect_types(self, path):
|
||||
"""Yield a list of types registered for the path.
|
||||
|
||||
Uses the functions from `register_detector` and the
|
||||
`attachments.types` configuration.
|
||||
"""
|
||||
# FIXME circular dependency
|
||||
from beets import config
|
||||
for detector in self._detectors:
|
||||
try:
|
||||
type = discover(path)
|
||||
type = detector(path)
|
||||
if type:
|
||||
types.append(type)
|
||||
yield type
|
||||
except:
|
||||
# TODO logging?
|
||||
pass
|
||||
return types
|
||||
|
||||
types_config = config['attachments']['types']
|
||||
if types_config.exists():
|
||||
for matcher, type in types_config.get(dict).items():
|
||||
if re.match(matcher, path):
|
||||
yield type
|
||||
|
||||
def _collect_meta(self, type, path):
|
||||
all_meta = {}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ from beets.util import syspath, normpath, ancestry, displayable_path
|
|||
from beets.util.functemplate import Template
|
||||
from beets import library
|
||||
from beets import config
|
||||
from beets import attachments
|
||||
from beets.attachments import AttachmentFactory
|
||||
from beets.util.confit import _package_path
|
||||
|
||||
VARIOUS_ARTISTS = u'Various Artists'
|
||||
|
|
@ -94,44 +94,59 @@ class AttachCommand(ui.Subcommand):
|
|||
'-c', '--copy', action='store_true', dest='copy',
|
||||
help='copy attachment intead of moving them'
|
||||
)
|
||||
self.parser.add_option(
|
||||
'-M', '--no-move', action='store_false', dest='move',
|
||||
help='keep the attachment in place'
|
||||
)
|
||||
self.parser.add_option(
|
||||
'--track', action='store_true', dest='track',
|
||||
help='attach path to the tracks matched by the query'
|
||||
)
|
||||
self.parser.add_option(
|
||||
'-t', '--type', dest='type',
|
||||
help='create one attachment with this type',
|
||||
help='create one attachment with this type'
|
||||
)
|
||||
self.parser.add_option(
|
||||
'-l', '--local', dest='local', action='store_true',
|
||||
help='path is local to album directory',
|
||||
help='path is local to album directory'
|
||||
)
|
||||
self.parser.add_option(
|
||||
'-d', '--discover', dest='discover', action='store_true',
|
||||
)
|
||||
|
||||
def func(self, lib, opts, args):
|
||||
factory = attachments.AttachmentFactory(lib)
|
||||
factory = AttachmentFactory(lib)
|
||||
factory.register_plugins(plugins.find_plugins())
|
||||
path = args.pop(0)
|
||||
|
||||
if opts.discover:
|
||||
path = None
|
||||
else:
|
||||
path = args.pop(0)
|
||||
|
||||
if opts.track:
|
||||
entities = lib.items(decargs(args))
|
||||
else:
|
||||
entities = lib.albums(decargs(args))
|
||||
|
||||
if opts.local and opts.track:
|
||||
raise ui.UserError('Cannot attach local files to tracks.')
|
||||
|
||||
for entity in entities:
|
||||
if opts.local:
|
||||
album_dir = entity.item_dir()
|
||||
if opts.local or opts.discover:
|
||||
paths = factory.discover(entity, path)
|
||||
else:
|
||||
album_dir = None
|
||||
abspath = self.resolve_path(path, album_dir)
|
||||
paths = [self.resolve_path(path)]
|
||||
|
||||
if opts.type:
|
||||
factory.create(abspath, opts.type, entity).add()
|
||||
else:
|
||||
for attachment in factory.discover(abspath, entity):
|
||||
attachment.add()
|
||||
for abspath in paths:
|
||||
if opts.type:
|
||||
attachments = [factory.create(abspath, opts.type, entity)]
|
||||
else:
|
||||
attachments = factory.detect(abspath, entity)
|
||||
|
||||
for a in attachments:
|
||||
a.add()
|
||||
if opts.move != False or opts.copy:
|
||||
a.move(copy=opts.copy)
|
||||
else:
|
||||
log.warn(u'unknown attachment: {0}'
|
||||
.format(displayable_path(abspath)))
|
||||
|
||||
def resolve_path(self, path, album_dir=None):
|
||||
if os.path.isabs(path):
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ import os
|
|||
import os.path
|
||||
import shutil
|
||||
import subprocess
|
||||
import logging
|
||||
from tempfile import mkdtemp, mkstemp
|
||||
from contextlib import contextmanager
|
||||
from StringIO import StringIO
|
||||
|
|
@ -88,6 +89,35 @@ def capture_stdout():
|
|||
sys.stdout = org
|
||||
|
||||
|
||||
@contextmanager
|
||||
def capture_log(logger='beets'):
|
||||
"""Collects log messages in a list.
|
||||
|
||||
>>> with capture_log('x') as logs:
|
||||
... logging.getLogger('x').info('eggs')
|
||||
...
|
||||
>>> logs
|
||||
['eggs']
|
||||
"""
|
||||
capture = LogCapture()
|
||||
log = logging.getLogger(logger)
|
||||
log.addHandler(capture)
|
||||
try:
|
||||
yield capture.messages
|
||||
finally:
|
||||
log.removeHandler(capture)
|
||||
|
||||
|
||||
class LogCapture(logging.Handler):
|
||||
|
||||
def __init__(self):
|
||||
super(LogCapture, self).__init__()
|
||||
self.messages = []
|
||||
|
||||
def emit(self, record):
|
||||
self.messages.append(str(record.msg))
|
||||
|
||||
|
||||
def has_program(cmd, args=['--version']):
|
||||
"""Returns `True` if `cmd` can be executed.
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -14,40 +14,64 @@
|
|||
|
||||
|
||||
import os
|
||||
from tempfile import mkstemp
|
||||
from _common import unittest
|
||||
from helper import TestHelper
|
||||
from helper import TestHelper, capture_log
|
||||
|
||||
import beets.ui
|
||||
from beets.plugins import BeetsPlugin
|
||||
from beets.attachments import AttachmentFactory, Attachment
|
||||
from beets.library import Library, Album, Item
|
||||
from beets.library import Album, Item
|
||||
|
||||
|
||||
class AttachmentTestHelper(TestHelper):
|
||||
# TODO Merge parts with TestHelper and refactor some stuff.
|
||||
|
||||
def mkstemp(self, suffix='', path=None, content=''):
|
||||
if path:
|
||||
path = path + suffix
|
||||
with open(path, 'a+') as f:
|
||||
f.write(content)
|
||||
else:
|
||||
(handle, path) = mkstemp(suffix)
|
||||
os.write(handle, content)
|
||||
os.close(handle)
|
||||
def setup_beets(self):
|
||||
super(AttachmentTestHelper, self).setup_beets()
|
||||
self.config['attachment']['paths'] = ['${entity_prefix}${basename}']
|
||||
|
||||
if not hasattr(self, 'tmp_files'):
|
||||
self.tmp_files = []
|
||||
self.tmp_files.append(path)
|
||||
@property
|
||||
def factory(self):
|
||||
if not hasattr(self, '_factory'):
|
||||
self._factory = AttachmentFactory(self.lib)
|
||||
return self._factory
|
||||
|
||||
def touch(self, path, dir=None, content=''):
|
||||
if dir:
|
||||
path = os.path.join(dir, path)
|
||||
|
||||
if not os.path.isabs(path):
|
||||
path = os.path.join(self.temp_dir, path)
|
||||
|
||||
parent = os.path.dirname(path)
|
||||
if not os.path.isdir(parent):
|
||||
os.makedirs(parent)
|
||||
|
||||
with open(path, 'a+') as f:
|
||||
f.write(content)
|
||||
return path
|
||||
|
||||
def remove_tmp_files(self):
|
||||
if not hasattr(self, 'tmp_files'):
|
||||
return
|
||||
def add_album(self, name='album name', touch=True):
|
||||
"""Add an album with one track to the library and create
|
||||
dummy track file.
|
||||
"""
|
||||
album = Album(album=name)
|
||||
self.lib.add(album)
|
||||
item_path = os.path.join(self.lib.directory, name, 'track.mp3')
|
||||
self.touch(item_path)
|
||||
|
||||
for p in self.tmp_files:
|
||||
if os.path.exists(p):
|
||||
os.remove(p)
|
||||
item = Item(album_id=album.id, path=item_path)
|
||||
self.lib.add(item)
|
||||
return album
|
||||
|
||||
def add_item(self, title):
|
||||
"""Add item to the library and create dummy file.
|
||||
"""
|
||||
path = os.path.join(self.libdir, '{0}.mp3'.format(title))
|
||||
self.touch(path)
|
||||
item = Item(title=title, path=path)
|
||||
self.lib.add(item)
|
||||
return item
|
||||
|
||||
def create_item_attachment(self, path, type='atype',
|
||||
track_path='/track/path.mp3'):
|
||||
|
|
@ -57,16 +81,7 @@ class AttachmentTestHelper(TestHelper):
|
|||
path=path, type=type)
|
||||
|
||||
def create_album_attachment(self, path, type='type'):
|
||||
album = Album(album='album')
|
||||
self.lib.add(album)
|
||||
album_dir = os.path.join(self.lib.directory, album.album)
|
||||
os.mkdir(album_dir)
|
||||
|
||||
# Make sure album.item_dir() returns a path
|
||||
item = Item(album_id=album.id,
|
||||
path=os.path.join(album_dir, 'track.mp3'))
|
||||
self.lib.add(item)
|
||||
|
||||
album = self.add_album()
|
||||
attachment = Attachment(db=self.lib, entity=album,
|
||||
path=path, type=type)
|
||||
self.lib.add(attachment)
|
||||
|
|
@ -184,14 +199,12 @@ class AttachmentTest(unittest.TestCase, AttachmentTestHelper):
|
|||
|
||||
def setUp(self):
|
||||
self.setup_beets()
|
||||
self.config['attachment']['paths'] = ['$entity_prefix/$basename']
|
||||
|
||||
def tearDown(self):
|
||||
self.teardown_beets()
|
||||
self.remove_tmp_files()
|
||||
|
||||
def test_move(self):
|
||||
attachment = self.create_album_attachment(self.mkstemp())
|
||||
attachment = self.create_album_attachment(self.touch('a'))
|
||||
original_path = attachment.path
|
||||
|
||||
self.assertNotEqual(attachment.destination, original_path)
|
||||
|
|
@ -203,7 +216,7 @@ class AttachmentTest(unittest.TestCase, AttachmentTestHelper):
|
|||
self.assertFalse(os.path.exists(original_path))
|
||||
|
||||
def test_copy(self):
|
||||
attachment = self.create_album_attachment(self.mkstemp())
|
||||
attachment = self.create_album_attachment(self.touch('a'))
|
||||
original_path = attachment.path
|
||||
|
||||
self.assertNotEqual(attachment.destination, original_path)
|
||||
|
|
@ -215,10 +228,10 @@ class AttachmentTest(unittest.TestCase, AttachmentTestHelper):
|
|||
self.assertTrue(os.path.isfile(original_path))
|
||||
|
||||
def test_move_dest_exists(self):
|
||||
attachment = self.create_album_attachment(self.mkstemp('.jpg'))
|
||||
attachment = self.create_album_attachment(self.touch('a.jpg'))
|
||||
dest = attachment.destination
|
||||
dest_root, dest_ext = os.path.splitext(dest)
|
||||
self.mkstemp(path=dest)
|
||||
self.touch(dest)
|
||||
|
||||
# TODO test log warning
|
||||
attachment.move()
|
||||
|
|
@ -228,9 +241,9 @@ class AttachmentTest(unittest.TestCase, AttachmentTestHelper):
|
|||
self.assertTrue(os.path.isfile(attachment.destination))
|
||||
|
||||
def test_move_overwrite(self):
|
||||
attachment_path = self.mkstemp(suffix='.jpg', content='JPEG')
|
||||
attachment_path = self.touch('a.jpg', content='JPEG')
|
||||
attachment = self.create_album_attachment(attachment_path)
|
||||
self.mkstemp(path=attachment.destination, content='NONJPEG')
|
||||
self.touch(attachment.destination, content='NONJPEG')
|
||||
|
||||
# TODO test log warning
|
||||
attachment.move(overwrite=True)
|
||||
|
|
@ -239,15 +252,22 @@ class AttachmentTest(unittest.TestCase, AttachmentTestHelper):
|
|||
self.assertEqual(f.read(), 'JPEG')
|
||||
|
||||
|
||||
class AttachmentFactoryTest(unittest.TestCase):
|
||||
class AttachmentFactoryTest(unittest.TestCase, AttachmentTestHelper):
|
||||
"""Tests the following methods of `AttachmentFactory`
|
||||
|
||||
* factory.create() and meta data collectors
|
||||
* factory.detect() and type detectors (config and plugin)
|
||||
* factory.discover()
|
||||
* factory.find()
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.lib = Library(':memory:')
|
||||
self.factory = AttachmentFactory(self.lib)
|
||||
self.setup_beets()
|
||||
|
||||
def tearDown(self):
|
||||
self.lib._connection().close()
|
||||
del self.lib._connections
|
||||
self.teardown_beets()
|
||||
|
||||
# factory.create()
|
||||
|
||||
def test_create_with_path_and_type(self):
|
||||
attachment = self.factory.create('/path/to/attachment', 'coverart')
|
||||
|
|
@ -261,15 +281,72 @@ class AttachmentFactoryTest(unittest.TestCase):
|
|||
entity=album)
|
||||
self.assertEqual(attachment.ref, album.id)
|
||||
self.assertEqual(attachment.ref_type, 'album')
|
||||
self.assertEqual(attachment.entity.id, album.id)
|
||||
|
||||
def test_create_populates_metadata(self):
|
||||
def collector(type, path):
|
||||
return {'mime': 'image/'}
|
||||
if type == 'coverart':
|
||||
return {'mime': 'image/'}
|
||||
self.factory.register_collector(collector)
|
||||
|
||||
attachment = self.factory.create('/path/to/attachment', 'coverart')
|
||||
self.assertEqual(attachment['mime'], 'image/')
|
||||
|
||||
attachment = self.factory.create('/path/to/attachment', 'noart')
|
||||
self.assertNotIn('mime', attachment)
|
||||
|
||||
# factory.detect()
|
||||
|
||||
def test_detect_plugin_types(self):
|
||||
def detector(path):
|
||||
return 'image'
|
||||
self.factory.register_detector(detector)
|
||||
|
||||
attachments = list(self.factory.detect('/path/to/attachment'))
|
||||
self.assertEqual(len(attachments), 1)
|
||||
self.assertEqual(attachments[0].type, 'image')
|
||||
|
||||
def test_detect_config_types(self):
|
||||
self.config['attachments']['types'] = {
|
||||
'.*\.jpg': 'image'
|
||||
}
|
||||
|
||||
attachments = list(self.factory.detect('/path/to/cover.jpg'))
|
||||
self.assertEqual(len(attachments), 1)
|
||||
self.assertEqual(attachments[0].type, 'image')
|
||||
|
||||
attachments = list(self.factory.detect('/path/to/cover.png'))
|
||||
self.assertEqual(len(attachments), 0)
|
||||
|
||||
def test_detect_multiple_types(self):
|
||||
self.factory.register_detector(lambda _: 'a')
|
||||
self.factory.register_detector(lambda _: 'b')
|
||||
self.config['attachments']['types'] = {
|
||||
'.*\.jpg$': 'c',
|
||||
'.*/cover.jpg': 'd'
|
||||
}
|
||||
attachments = list(self.factory.detect('/path/to/cover.jpg'))
|
||||
self.assertItemsEqual(map(lambda a: a.type, attachments), 'abcd')
|
||||
|
||||
# factory.discover(album)
|
||||
|
||||
def test_discover_album(self):
|
||||
album = self.add_album()
|
||||
attachment_path = self.touch('cover.jpg', dir=album.item_dir())
|
||||
|
||||
discovered = self.factory.discover(album)
|
||||
self.assertEqual([attachment_path], discovered)
|
||||
|
||||
def test_discover_album_local(self):
|
||||
album = self.add_album()
|
||||
attachment_path = self.touch('cover.jpg', dir=album.item_dir())
|
||||
|
||||
discovered = self.factory.discover(album, 'cover.jpg')
|
||||
self.assertEqual([attachment_path], discovered)
|
||||
|
||||
# factory.find()
|
||||
# TODO extend
|
||||
|
||||
def test_find_all_attachments(self):
|
||||
self.factory.create('/path', 'atype').add()
|
||||
self.factory.create('/another_path', 'asecondtype').add()
|
||||
|
|
@ -282,120 +359,254 @@ class AttachmentFactoryTest(unittest.TestCase):
|
|||
self.assertEqual(attachment.type, 'atype')
|
||||
|
||||
|
||||
class EntityAttachmentsTest(unittest.TestCase):
|
||||
class EntityAttachmentsTest(unittest.TestCase, AttachmentTestHelper):
|
||||
"""Test attachment queries on entities.
|
||||
|
||||
- `item.attachments()`
|
||||
- `album.attachments()`
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.lib = Library(':memory:')
|
||||
self.factory = AttachmentFactory(self.lib)
|
||||
self.setup_beets()
|
||||
|
||||
def tearDown(self):
|
||||
self.teardown_beets()
|
||||
|
||||
def test_all_item_attachments(self):
|
||||
item = Item()
|
||||
item.add(self.lib)
|
||||
|
||||
attachment = self.factory.create('/path/to/attachment',
|
||||
'coverart', item)
|
||||
attachment.add()
|
||||
attachments = [
|
||||
self.factory.add('/path/to/attachment', 'coverart', item),
|
||||
self.factory.add('/path/to/attachment', 'riplog', item)
|
||||
]
|
||||
|
||||
self.assertItemsEqual(map(lambda a: a.id, item.attachments()),
|
||||
[attachment.id])
|
||||
map(lambda a: a.id, attachments))
|
||||
|
||||
def test_all_album_attachments(self):
|
||||
album = Album()
|
||||
album.add(self.lib)
|
||||
|
||||
attachment = self.factory.create('/path/to/attachment',
|
||||
'coverart', album)
|
||||
attachment.add()
|
||||
|
||||
attachments = [
|
||||
self.factory.add('/path/to/attachment', 'coverart', album),
|
||||
self.factory.add('/path/to/attachment', 'riplog', album)
|
||||
]
|
||||
self.assertItemsEqual(map(lambda a: a.id, album.attachments()),
|
||||
[attachment.id])
|
||||
map(lambda a: a.id, attachments))
|
||||
|
||||
def test_query_album_attachments(self):
|
||||
self.skipTest('Not implemented yet')
|
||||
album = Album()
|
||||
album.add(self.lib)
|
||||
|
||||
attachments = [
|
||||
self.factory.add('/path/to/attachment', 'coverart', album),
|
||||
self.factory.add('/path/to/attachment', 'riplog', album)
|
||||
]
|
||||
queried = album.attachments('type:riplog').get()
|
||||
self.assertEqual(queried.id, attachments[1].id)
|
||||
|
||||
|
||||
class AttachCommandTest(unittest.TestCase, AttachmentTestHelper):
|
||||
"""Tests the `beet attach FILE QUERY...` command
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.setup_beets()
|
||||
self.setup_log_attachment_plugin()
|
||||
self.add_attachment_plugin('log')
|
||||
|
||||
def tearDown(self):
|
||||
self.teardown_beets()
|
||||
self.unload_plugins()
|
||||
self.remove_tmp_files()
|
||||
|
||||
def test_attach_to_album(self):
|
||||
album = self.add_album('albumtitle')
|
||||
|
||||
attachment_path = self.mkstemp('.log')
|
||||
attachment_path = self.touch('attachment.log')
|
||||
self.runcli('attach', attachment_path, 'albumtitle')
|
||||
attachment = album.attachments().get()
|
||||
self.assertEqual(attachment.type, 'log')
|
||||
|
||||
def test_attach_to_album_and_move(self):
|
||||
self.skipTest('Not implemented')
|
||||
|
||||
def test_attach_to_album_and_copy(self):
|
||||
self.skipTest('Not implemented')
|
||||
|
||||
def test_attach_to_album_and_not_move(self):
|
||||
self.skipTest('Not implemented')
|
||||
|
||||
def test_file_relative_to_album_dir(self):
|
||||
album = self.add_album('albumtitle')
|
||||
|
||||
attachment_path = os.path.join(album.item_dir(), 'inalbumdir.log')
|
||||
self.mkstemp(path=attachment_path)
|
||||
self.runcli('attach', '--local', 'inalbumdir.log', 'albumtitle')
|
||||
attachment_path = self.touch('attachment.log')
|
||||
dest = os.path.join(album.item_dir(), 'attachment.log')
|
||||
self.assertFalse(os.path.isfile(dest))
|
||||
|
||||
self.runcli('attach', attachment_path, 'albumtitle')
|
||||
attachment = album.attachments().get()
|
||||
self.assertEqual(attachment.type, 'log')
|
||||
self.assertEqual(attachment.path, dest)
|
||||
self.assertTrue(os.path.isfile(dest))
|
||||
|
||||
def test_attach_to_album_and_copy(self):
|
||||
album = self.add_album('albumtitle')
|
||||
|
||||
attachment_path = self.touch('attachment.log')
|
||||
dest = os.path.join(album.item_dir(), 'attachment.log')
|
||||
|
||||
self.runcli('attach', '--copy', attachment_path, 'albumtitle')
|
||||
attachment = album.attachments().get()
|
||||
self.assertEqual(attachment.path, dest)
|
||||
self.assertTrue(os.path.isfile(attachment_path))
|
||||
self.assertTrue(os.path.isfile(dest))
|
||||
|
||||
def test_attach_to_album_and_not_move(self):
|
||||
album = self.add_album('albumtitle')
|
||||
|
||||
attachment_path = self.touch('attachment.log')
|
||||
|
||||
self.runcli('attach', '--no-move', attachment_path, 'albumtitle')
|
||||
attachment = album.attachments().get()
|
||||
self.assertEqual(attachment.path, attachment_path)
|
||||
|
||||
def test_attach_to_item(self):
|
||||
item = Item(title='tracktitle')
|
||||
self.lib.add(item)
|
||||
item = self.add_item(title='tracktitle')
|
||||
attachment_path = self.touch('attachment.log')
|
||||
|
||||
attachment_path = self.mkstemp('.log')
|
||||
self.runcli('attach', '--track', attachment_path, 'tracktitle')
|
||||
attachment = item.attachments().get()
|
||||
self.assertEqual(attachment.type, 'log')
|
||||
|
||||
def test_attach_to_item_and_move(self):
|
||||
self.skipTest('Not implemented')
|
||||
item = self.add_item(title='tracktitle')
|
||||
attachment_path = self.touch('attachment.log')
|
||||
|
||||
dest = os.path.splitext(item.path)[0] + ' - ' + 'attachment.log'
|
||||
self.assertFalse(os.path.isfile(dest))
|
||||
|
||||
self.runcli('attach', '--track', attachment_path, 'tracktitle')
|
||||
attachment = item.attachments().get()
|
||||
self.assertEqual(attachment.path, dest)
|
||||
self.assertTrue(os.path.isfile(dest))
|
||||
|
||||
# attach --local FILE ALBUM_QUERY
|
||||
|
||||
def test_local_album_file(self):
|
||||
albums = [self.add_album('album 1'), self.add_album('album 2')]
|
||||
for album in albums:
|
||||
self.touch('inalbumdir.log', dir=album.item_dir())
|
||||
self.touch('dontinclude.log', dir=album.item_dir())
|
||||
|
||||
self.runcli('attach', '--local', 'inalbumdir.log')
|
||||
|
||||
for album in albums:
|
||||
self.assertEqual(len(list(album.attachments())), 1)
|
||||
attachment = album.attachments().get()
|
||||
self.assertEqual(attachment.type, 'log')
|
||||
|
||||
# attach --track --local FILE ITEM_QUERY
|
||||
|
||||
def test_local_track_file(self):
|
||||
item1 = self.add_item('song 1')
|
||||
prefix = os.path.splitext(item1.path)[0]
|
||||
self.touch(prefix + ' rip.log')
|
||||
|
||||
item2 = self.add_item('song 2')
|
||||
prefix = os.path.splitext(item2.path)[0]
|
||||
self.touch(prefix + ' - rip.log')
|
||||
self.touch(prefix + ' - no.log')
|
||||
|
||||
item3 = self.add_item('song 3')
|
||||
prefix = os.path.splitext(item3.path)[0]
|
||||
self.touch(prefix + ' - no.log')
|
||||
|
||||
self.runcli('attach', '--track', '--local', 'rip.log')
|
||||
|
||||
self.assertEqual(len(list(item1.attachments())), 1)
|
||||
self.assertEqual(len(list(item2.attachments())), 1)
|
||||
self.assertEqual(len(list(item3.attachments())), 0)
|
||||
|
||||
def test_local_track_file_extenstion(self):
|
||||
item = self.add_item('song')
|
||||
prefix = os.path.splitext(item.path)[0]
|
||||
self.touch(prefix + '.log')
|
||||
|
||||
self.runcli('attach', '--track', '--local', '.log')
|
||||
|
||||
self.assertEqual(len(list(item.attachments())), 1)
|
||||
|
||||
# attach --discover ALBUM_QUERY
|
||||
|
||||
def test_discover_in_album_dir(self):
|
||||
self.add_attachment_plugin('png')
|
||||
|
||||
album1 = self.add_album('album 1')
|
||||
self.touch('cover.png', dir=album1.item_dir())
|
||||
|
||||
album2 = self.add_album('album 2')
|
||||
self.touch('cover.png', dir=album2.item_dir())
|
||||
self.touch('subfolder/rip.log', dir=album2.item_dir())
|
||||
|
||||
self.runcli('attach', '--discover')
|
||||
|
||||
attachments1 = list(album1.attachments())
|
||||
self.assertEqual(len(attachments1), 1)
|
||||
self.assertEqual(attachments1[0].type, 'png')
|
||||
|
||||
attachments2 = list(album2.attachments())
|
||||
self.assertEqual(len(attachments2), 2)
|
||||
self.assertItemsEqual(map(lambda a: a.type, attachments2),
|
||||
['png', 'log'])
|
||||
|
||||
# attach --discover --track ITEM_QUERY
|
||||
|
||||
def test_discover_track_files(self):
|
||||
self.add_attachment_plugin('png')
|
||||
|
||||
tracks = [self.add_item('track 1'), self.add_item('track 2')]
|
||||
for track in tracks:
|
||||
root = os.path.splitext(track.path)[0]
|
||||
self.touch(root + '.log')
|
||||
self.touch(root + '- a.log')
|
||||
self.touch(root + ' b.png')
|
||||
self.touch(root + '-c.png')
|
||||
self.touch('d.png', dir=root)
|
||||
|
||||
self.runcli('attach', '--discover', '--track')
|
||||
|
||||
for track in tracks:
|
||||
self.assertEqual(len(list(track.attachments())), 5)
|
||||
attachment_types = map(lambda a: a.type, list(track.attachments()))
|
||||
self.assertItemsEqual(['png', 'png', 'png', 'log', 'log'],
|
||||
attachment_types)
|
||||
|
||||
# attach --type TYPE QUERY
|
||||
|
||||
def test_user_type(self):
|
||||
album = self.add_album('albumtitle')
|
||||
attachment_path = self.touch('a.custom')
|
||||
|
||||
attachment_path = self.mkstemp()
|
||||
self.runcli('attach', '-t', 'customtype',
|
||||
attachment_path, 'albumtitle')
|
||||
self.runcli('attach', '-t', 'customtype', attachment_path)
|
||||
attachment = album.attachments().get()
|
||||
self.assertEqual(attachment.type, 'customtype')
|
||||
|
||||
def test_unknown_warning(self):
|
||||
self.skipTest('Not implemented')
|
||||
def test_unknown_type_warning(self):
|
||||
album = self.add_album('albumtitle')
|
||||
attachment_path = self.touch('unkown')
|
||||
with capture_log() as logs:
|
||||
self.runcli('attach', attachment_path)
|
||||
|
||||
self.assertIn('unknown attachment: {0}'.format(attachment_path), logs)
|
||||
self.assertIsNone(album.attachments().get())
|
||||
|
||||
def test_interactive_type(self):
|
||||
self.skipTest('not implemented yet')
|
||||
|
||||
# Helpers
|
||||
|
||||
def runcli(self, *args):
|
||||
beets.ui._raw_main(list(args), self.lib)
|
||||
|
||||
def setup_log_attachment_plugin(self):
|
||||
def log_discoverer(path):
|
||||
if path.endswith('.log'):
|
||||
return 'log'
|
||||
def add_attachment_plugin(self, ext):
|
||||
def ext_detector(path):
|
||||
if path.endswith('.' + ext):
|
||||
return ext
|
||||
log_plugin = BeetsPlugin()
|
||||
log_plugin.attachment_discoverer = log_discoverer
|
||||
log_plugin.attachment_detector = ext_detector
|
||||
self.add_plugin(log_plugin)
|
||||
|
||||
def add_album(self, name):
|
||||
album = Album(album=name)
|
||||
self.lib.add(album)
|
||||
album_dir = os.path.join(self.lib.directory, name)
|
||||
os.mkdir(album_dir)
|
||||
|
||||
item = Item(album_id=album.id,
|
||||
path=os.path.join(album_dir, 'track.mp3'))
|
||||
self.lib.add(item)
|
||||
return album
|
||||
|
||||
|
||||
def suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
|
|
|||
Loading…
Reference in a new issue