mirror of
https://github.com/beetbox/beets.git
synced 2026-02-17 12:56:05 +01:00
Implement and test attachment.move()
This commit is contained in:
parent
d4d4c4a788
commit
ee19abc293
2 changed files with 151 additions and 47 deletions
|
|
@ -16,14 +16,19 @@
|
|||
import re
|
||||
import os.path
|
||||
import collections
|
||||
import logging
|
||||
from argparse import ArgumentParser
|
||||
|
||||
from beets import dbcore
|
||||
from beets.dbcore.query import Query, AndQuery, MatchQuery
|
||||
from beets.util import normpath
|
||||
from beets import util
|
||||
from beets.util import normpath, displayable_path
|
||||
from beets.util.functemplate import Template
|
||||
|
||||
|
||||
log = logging.getLogger('beets')
|
||||
|
||||
|
||||
def ref_type(entity):
|
||||
# FIXME prevents circular dependency
|
||||
from beets.library import Item, Album
|
||||
|
|
@ -83,23 +88,46 @@ class Attachment(dbcore.db.Model):
|
|||
raise ValueError('{} must have an id', format(entity))
|
||||
self.ref = entity.id
|
||||
|
||||
def move(self, destination=None, copy=False, force=False):
|
||||
"""Moves the attachment from its original `path` to
|
||||
`destination` and updates `self.path`.
|
||||
def move(self, dest=None, copy=False, overwrite=False):
|
||||
"""Moves the attachment from its original `path` to `dest` and
|
||||
updates `self.path`.
|
||||
|
||||
If `destination` is given it must be a path. If the path is
|
||||
relative, it is treated relative to the `libdir`.
|
||||
The `dest` parameter defaults to the `destination property`. If
|
||||
it specified, it must be an absolute path.
|
||||
|
||||
If the destination is `None` it is set to the `destionation`
|
||||
property.
|
||||
|
||||
If the destination already exists and `force` is `False` it
|
||||
raises an error.
|
||||
If the destination already exists and `overwrite` is `False` an
|
||||
alternative is chosen. For example if the destination is
|
||||
`/path/to/file.ext` then the alternative is
|
||||
`/path/to/file.1.ext`. If the alternative exists, too, the
|
||||
integer is increased until the path does not exist.
|
||||
|
||||
If `copy` is `False` (the default) then the original file is deleted.
|
||||
"""
|
||||
# TODO implement
|
||||
raise NotImplementedError
|
||||
|
||||
if dest is None:
|
||||
dest = self.destination
|
||||
|
||||
if os.path.exists(dest) and not overwrite:
|
||||
root, ext = os.path.splitext(dest)
|
||||
log.warn('attachment destination already exists: {0}'
|
||||
.format(displayable_path(dest)))
|
||||
|
||||
alternative = 0
|
||||
while os.path.exists(dest):
|
||||
alternative += 1
|
||||
dest = root + ".{0}".format(alternative) + ext
|
||||
|
||||
if copy:
|
||||
util.copy(self.path, dest, overwrite)
|
||||
log.warn('copy attachment to {0}'
|
||||
.format(displayable_path(dest)))
|
||||
else:
|
||||
util.move(self.path, dest, overwrite)
|
||||
log.warn('move attachment to {0}'
|
||||
.format(displayable_path(dest)))
|
||||
self.path = dest
|
||||
self.store()
|
||||
return self.path
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
|
|
@ -218,7 +246,7 @@ class DestinationTemplateMapping(collections.Mapping):
|
|||
if self.attachment.ref_type == 'album':
|
||||
return self.entity.item_dir() + os.sep
|
||||
elif self.attachment.ref_type == 'item':
|
||||
return os.path.split(self.entity.path)[0] + os.sep
|
||||
return os.path.dirname(self.entity.path) + os.sep
|
||||
|
||||
@property
|
||||
def track_base(self):
|
||||
|
|
@ -227,7 +255,6 @@ class DestinationTemplateMapping(collections.Mapping):
|
|||
if self.attachment.ref_type == 'item':
|
||||
return os.path.splitext(self.entity.path)[0]
|
||||
|
||||
|
||||
@property
|
||||
def basename(self):
|
||||
"""Filename of the attachment's path in its parent directory.
|
||||
|
|
|
|||
|
|
@ -24,7 +24,56 @@ from beets.attachments import AttachmentFactory, Attachment
|
|||
from beets.library import Library, Album, Item
|
||||
|
||||
|
||||
class AttachmentDestinationTest(unittest.TestCase, TestHelper):
|
||||
class AttachmentTestHelper(TestHelper):
|
||||
|
||||
def mkstemp(self, suffix='', path=None, content=''):
|
||||
if path:
|
||||
path = path + suffix
|
||||
with open(path, 'a+') as f:
|
||||
f.write(content)
|
||||
else:
|
||||
(handle, path) = mkstemp(suffix)
|
||||
os.write(handle, content)
|
||||
os.close(handle)
|
||||
|
||||
if not hasattr(self, 'tmp_files'):
|
||||
self.tmp_files = []
|
||||
self.tmp_files.append(path)
|
||||
return path
|
||||
|
||||
def remove_tmp_files(self):
|
||||
if not hasattr(self, 'tmp_files'):
|
||||
return
|
||||
|
||||
for p in self.tmp_files:
|
||||
if os.path.exists(p):
|
||||
os.remove(p)
|
||||
|
||||
def create_item_attachment(self, path, type='atype',
|
||||
track_path='/track/path.mp3'):
|
||||
item = Item(path=track_path)
|
||||
self.lib.add(item)
|
||||
return Attachment(db=self.lib, entity=item,
|
||||
path=path, type=type)
|
||||
|
||||
def create_album_attachment(self, path, type='type'):
|
||||
album = Album(album='album')
|
||||
self.lib.add(album)
|
||||
album_dir = os.path.join(self.lib.directory, album.album)
|
||||
os.mkdir(album_dir)
|
||||
|
||||
# Make sure album.item_dir() returns a path
|
||||
item = Item(album_id=album.id,
|
||||
path=os.path.join(album_dir, 'track.mp3'))
|
||||
self.lib.add(item)
|
||||
|
||||
attachment = Attachment(db=self.lib, entity=album,
|
||||
path=path, type=type)
|
||||
self.lib.add(attachment)
|
||||
return attachment
|
||||
|
||||
|
||||
class AttachmentDestinationTest(unittest.TestCase, AttachmentTestHelper):
|
||||
"""Test the `attachment.destination` property.
|
||||
"""
|
||||
|
||||
|
|
@ -128,25 +177,66 @@ class AttachmentDestinationTest(unittest.TestCase, TestHelper):
|
|||
def set_path_template(self, *templates):
|
||||
self.config['attachment']['paths'] = templates
|
||||
|
||||
def create_item_attachment(self, path, type='atype',
|
||||
track_path='/track/path.mp3'):
|
||||
item = Item(path='/the/track/path.mp3')
|
||||
self.lib.add(item)
|
||||
return Attachment(db=self.lib, entity=item,
|
||||
path=path, type=type)
|
||||
|
||||
def create_album_attachment(self, path, type='type'):
|
||||
album = Album(album='album')
|
||||
self.lib.add(album)
|
||||
album_dir = os.path.join(self.lib.directory, album.album)
|
||||
os.mkdir(album_dir)
|
||||
class AttachmentTest(unittest.TestCase, AttachmentTestHelper):
|
||||
"""Test `attachment.move()`.
|
||||
"""
|
||||
|
||||
# Make sure album.item_dir() returns a path
|
||||
item = Item(album_id=album.id, path=os.path.join(album_dir,
|
||||
'track.mp3'))
|
||||
self.lib.add(item)
|
||||
def setUp(self):
|
||||
self.setup_beets()
|
||||
self.config['attachment']['paths'] = ['$entity_prefix/$basename']
|
||||
|
||||
return Attachment(db=self.lib, entity=album, path=path, type=type)
|
||||
def tearDown(self):
|
||||
self.teardown_beets()
|
||||
self.remove_tmp_files()
|
||||
|
||||
def test_move(self):
|
||||
attachment = self.create_album_attachment(self.mkstemp())
|
||||
original_path = attachment.path
|
||||
|
||||
self.assertNotEqual(attachment.destination, original_path)
|
||||
self.assertTrue(os.path.isfile(original_path))
|
||||
attachment.move()
|
||||
|
||||
self.assertEqual(attachment.destination, attachment.path)
|
||||
self.assertTrue(os.path.isfile(attachment.path))
|
||||
self.assertFalse(os.path.exists(original_path))
|
||||
|
||||
def test_copy(self):
|
||||
attachment = self.create_album_attachment(self.mkstemp())
|
||||
original_path = attachment.path
|
||||
|
||||
self.assertNotEqual(attachment.destination, original_path)
|
||||
self.assertTrue(os.path.isfile(original_path))
|
||||
attachment.move(copy=True)
|
||||
|
||||
self.assertEqual(attachment.destination, attachment.path)
|
||||
self.assertTrue(os.path.isfile(attachment.path))
|
||||
self.assertTrue(os.path.isfile(original_path))
|
||||
|
||||
def test_move_dest_exists(self):
|
||||
attachment = self.create_album_attachment(self.mkstemp('.jpg'))
|
||||
dest = attachment.destination
|
||||
dest_root, dest_ext = os.path.splitext(dest)
|
||||
self.mkstemp(path=dest)
|
||||
|
||||
# TODO test log warning
|
||||
attachment.move()
|
||||
|
||||
self.assertEqual(dest_root + '.1' + dest_ext, attachment.path)
|
||||
self.assertTrue(os.path.isfile(attachment.path))
|
||||
self.assertTrue(os.path.isfile(attachment.destination))
|
||||
|
||||
def test_move_overwrite(self):
|
||||
attachment_path = self.mkstemp(suffix='.jpg', content='JPEG')
|
||||
attachment = self.create_album_attachment(attachment_path)
|
||||
self.mkstemp(path=attachment.destination, content='NONJPEG')
|
||||
|
||||
# TODO test log warning
|
||||
attachment.move(overwrite=True)
|
||||
|
||||
with open(attachment.destination, 'r') as f:
|
||||
self.assertEqual(f.read(), 'JPEG')
|
||||
|
||||
|
||||
class AttachmentFactoryTest(unittest.TestCase):
|
||||
|
|
@ -221,18 +311,16 @@ class EntityAttachmentsTest(unittest.TestCase):
|
|||
[attachment.id])
|
||||
|
||||
|
||||
class AttachCommandTest(unittest.TestCase, TestHelper):
|
||||
class AttachCommandTest(unittest.TestCase, AttachmentTestHelper):
|
||||
|
||||
def setUp(self):
|
||||
self.setup_beets()
|
||||
self.setup_log_attachment_plugin()
|
||||
self.tmp_files = []
|
||||
|
||||
def tearDown(self):
|
||||
for p in self.tmp_files:
|
||||
os.remove(p)
|
||||
self.teardown_beets()
|
||||
self.unload_plugins()
|
||||
self.remove_tmp_files()
|
||||
|
||||
def test_attach_to_album(self):
|
||||
album = self.add_album('albumtitle')
|
||||
|
|
@ -289,17 +377,6 @@ class AttachCommandTest(unittest.TestCase, TestHelper):
|
|||
def runcli(self, *args):
|
||||
beets.ui._raw_main(list(args), self.lib)
|
||||
|
||||
def mkstemp(self, suffix='', path=None):
|
||||
if path:
|
||||
path = path + suffix
|
||||
with open(path, 'a+') as f:
|
||||
f.write('')
|
||||
else:
|
||||
(handle, path) = mkstemp(suffix)
|
||||
os.close(handle)
|
||||
self.tmp_files.append(path)
|
||||
return path
|
||||
|
||||
def setup_log_attachment_plugin(self):
|
||||
def log_discoverer(path):
|
||||
if path.endswith('.log'):
|
||||
|
|
|
|||
Loading…
Reference in a new issue