Merge pull request #3151 from Holzhaus/playlist-plugin-additions

playlist: Add auto-update functionality and more tests
This commit is contained in:
Adrian Sampson 2019-02-18 08:40:55 -05:00 committed by GitHub
commit 81c5ae3fdf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 307 additions and 16 deletions

View file

@ -16,6 +16,7 @@ from __future__ import division, absolute_import, print_function
import os import os
import fnmatch import fnmatch
import tempfile
import beets import beets
@ -82,6 +83,99 @@ class PlaylistPlugin(beets.plugins.BeetsPlugin):
def __init__(self): def __init__(self):
super(PlaylistPlugin, self).__init__() super(PlaylistPlugin, self).__init__()
self.config.add({ self.config.add({
'auto': False,
'playlist_dir': '.', 'playlist_dir': '.',
'relative_to': 'library', 'relative_to': 'library',
}) })
self.playlist_dir = self.config['playlist_dir'].as_filename()
self.changes = {}
if self.config['relative_to'].get() == 'library':
self.relative_to = beets.util.bytestring_path(
beets.config['directory'].as_filename())
elif self.config['relative_to'].get() != 'playlist':
self.relative_to = beets.util.bytestring_path(
self.config['relative_to'].as_filename())
else:
self.relative_to = None
if self.config['auto']:
self.register_listener('item_moved', self.item_moved)
self.register_listener('item_removed', self.item_removed)
self.register_listener('cli_exit', self.cli_exit)
def item_moved(self, item, source, destination):
self.changes[source] = destination
def item_removed(self, item):
if not os.path.exists(beets.util.syspath(item.path)):
self.changes[item.path] = None
def cli_exit(self, lib):
for playlist in self.find_playlists():
self._log.info('Updating playlist: {0}'.format(playlist))
base_dir = beets.util.bytestring_path(
self.relative_to if self.relative_to
else os.path.dirname(playlist)
)
try:
self.update_playlist(playlist, base_dir)
except beets.util.FilesystemError:
self._log.error('Failed to update playlist: {0}'.format(
beets.util.displayable_path(playlist)))
def find_playlists(self):
"""Find M3U playlists in the playlist directory."""
try:
dir_contents = os.listdir(beets.util.syspath(self.playlist_dir))
except OSError:
self._log.warning('Unable to open playlist directory {0}'.format(
beets.util.displayable_path(self.playlist_dir)))
return
for filename in dir_contents:
if fnmatch.fnmatch(filename, '*.[mM]3[uU]'):
yield os.path.join(self.playlist_dir, filename)
def update_playlist(self, filename, base_dir):
"""Find M3U playlists in the specified directory."""
changes = 0
deletions = 0
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tempfp:
new_playlist = tempfp.name
with open(filename, mode='rb') as fp:
for line in fp:
original_path = line.rstrip(b'\r\n')
# Ensure that path from playlist is absolute
is_relative = not os.path.isabs(line)
if is_relative:
lookup = os.path.join(base_dir, original_path)
else:
lookup = original_path
try:
new_path = self.changes[beets.util.normpath(lookup)]
except KeyError:
tempfp.write(line)
else:
if new_path is None:
# Item has been deleted
deletions += 1
continue
changes += 1
if is_relative:
new_path = os.path.relpath(new_path, base_dir)
tempfp.write(line.replace(original_path, new_path))
if changes or deletions:
self._log.info(
'Updated playlist {0} ({1} changes, {2} deletions)'.format(
filename, changes, deletions))
beets.util.copy(new_playlist, filename, replace=True)
beets.util.remove(new_playlist)

View file

@ -8,6 +8,7 @@ To use it, enable the ``playlist`` plugin in your configuration
Then configure your playlists like this:: Then configure your playlists like this::
playlist: playlist:
auto: no
relative_to: ~/Music relative_to: ~/Music
playlist_dir: ~/.mpd/playlists playlist_dir: ~/.mpd/playlists
@ -22,6 +23,10 @@ name::
$ beet ls playlist:anotherplaylist $ beet ls playlist:anotherplaylist
The plugin can also update playlists in the playlist directory automatically
every time an item is moved or deleted. This can be controlled by the ``auto``
configuration option.
Configuration Configuration
------------- -------------
@ -29,6 +34,10 @@ To configure the plugin, make a ``smartplaylist:`` section in your
configuration file. In addition to the ``playlists`` described above, the configuration file. In addition to the ``playlists`` described above, the
other configuration options are: other configuration options are:
- **auto**: If this is set to ``yes``, then anytime an item in the library is
moved or removed, the plugin will update all playlists in the
``playlist_dir`` directory that contain that item to reflect the change.
Default: ``no``
- **playlist_dir**: Where to read playlist files from. - **playlist_dir**: Where to read playlist files from.
Default: The current working directory (i.e., ``'.'``). Default: The current working directory (i.e., ``'.'``).
- **relative_to**: Interpret paths in the playlist files relative to a base - **relative_to**: Interpret paths in the playlist files relative to a base

View file

@ -27,17 +27,17 @@ from test import helper
import beets import beets
class PlaylistTest(unittest.TestCase, helper.TestHelper): class PlaylistTestHelper(helper.TestHelper):
def setUp(self): def setUp(self):
self.setup_beets() self.setup_beets()
self.lib = beets.library.Library(':memory:') self.lib = beets.library.Library(':memory:')
self.music_dir = os.path.expanduser('~/Music') self.music_dir = os.path.expanduser(os.path.join('~', 'Music'))
i1 = _common.item() i1 = _common.item()
i1.path = beets.util.normpath(os.path.join( i1.path = beets.util.normpath(os.path.join(
self.music_dir, self.music_dir,
'a/b/c.mp3', 'a', 'b', 'c.mp3',
)) ))
i1.title = u'some item' i1.title = u'some item'
i1.album = u'some album' i1.album = u'some album'
@ -47,7 +47,7 @@ class PlaylistTest(unittest.TestCase, helper.TestHelper):
i2 = _common.item() i2 = _common.item()
i2.path = beets.util.normpath(os.path.join( i2.path = beets.util.normpath(os.path.join(
self.music_dir, self.music_dir,
'd/e/f.mp3', 'd', 'e', 'f.mp3',
)) ))
i2.title = 'another item' i2.title = 'another item'
i2.album = 'another album' i2.album = 'another album'
@ -57,7 +57,7 @@ class PlaylistTest(unittest.TestCase, helper.TestHelper):
i3 = _common.item() i3 = _common.item()
i3.path = beets.util.normpath(os.path.join( i3.path = beets.util.normpath(os.path.join(
self.music_dir, self.music_dir,
'x/y/z.mp3', 'x', 'y', 'z.mp3',
)) ))
i3.title = 'yet another item' i3.title = 'yet another item'
i3.album = 'yet another album' i3.album = 'yet another album'
@ -65,32 +65,34 @@ class PlaylistTest(unittest.TestCase, helper.TestHelper):
self.lib.add_album([i3]) self.lib.add_album([i3])
self.playlist_dir = tempfile.mkdtemp() self.playlist_dir = tempfile.mkdtemp()
with open(os.path.join(self.playlist_dir, 'test.m3u'), 'w') as f:
f.write('{0}\n'.format(beets.util.displayable_path(i1.path)))
f.write('{0}\n'.format(beets.util.displayable_path(i2.path)))
self.config['directory'] = self.music_dir self.config['directory'] = self.music_dir
self.config['playlist']['relative_to'] = 'library'
self.config['playlist']['playlist_dir'] = self.playlist_dir self.config['playlist']['playlist_dir'] = self.playlist_dir
self.setup_test()
self.load_plugins('playlist') self.load_plugins('playlist')
def setup_test(self):
raise NotImplementedError
def tearDown(self): def tearDown(self):
self.unload_plugins() self.unload_plugins()
shutil.rmtree(self.playlist_dir) shutil.rmtree(self.playlist_dir)
self.teardown_beets() self.teardown_beets()
def test_query_name(self):
q = u'playlist:test' class PlaylistQueryTestHelper(PlaylistTestHelper):
def test_name_query_with_absolute_paths_in_playlist(self):
q = u'playlist:absolute'
results = self.lib.items(q) results = self.lib.items(q)
self.assertEqual(set([i.title for i in results]), set([ self.assertEqual(set([i.title for i in results]), set([
u'some item', u'some item',
u'another item', u'another item',
])) ]))
def test_query_path(self): def test_path_query_with_absolute_paths_in_playlist(self):
q = u'playlist:{0}'.format(shlex_quote(os.path.join( q = u'playlist:{0}'.format(shlex_quote(os.path.join(
self.playlist_dir, self.playlist_dir,
'test.m3u', 'absolute.m3u',
))) )))
results = self.lib.items(q) results = self.lib.items(q)
self.assertEqual(set([i.title for i in results]), set([ self.assertEqual(set([i.title for i in results]), set([
@ -98,12 +100,31 @@ class PlaylistTest(unittest.TestCase, helper.TestHelper):
u'another item', u'another item',
])) ]))
def test_query_name_nonexisting(self): def test_name_query_with_relative_paths_in_playlist(self):
q = u'playlist:relative'
results = self.lib.items(q)
self.assertEqual(set([i.title for i in results]), set([
u'some item',
u'another item',
]))
def test_path_query_with_relative_paths_in_playlist(self):
q = u'playlist:{0}'.format(shlex_quote(os.path.join(
self.playlist_dir,
'relative.m3u',
)))
results = self.lib.items(q)
self.assertEqual(set([i.title for i in results]), set([
u'some item',
u'another item',
]))
def test_name_query_with_nonexisting_playlist(self):
q = u'playlist:nonexisting'.format(self.playlist_dir) q = u'playlist:nonexisting'.format(self.playlist_dir)
results = self.lib.items(q) results = self.lib.items(q)
self.assertEqual(set(results), set()) self.assertEqual(set(results), set())
def test_query_path_nonexisting(self): def test_path_query_with_nonexisting_playlist(self):
q = u'playlist:{0}'.format(shlex_quote(os.path.join( q = u'playlist:{0}'.format(shlex_quote(os.path.join(
self.playlist_dir, self.playlist_dir,
self.playlist_dir, self.playlist_dir,
@ -113,6 +134,173 @@ class PlaylistTest(unittest.TestCase, helper.TestHelper):
self.assertEqual(set(results), set()) self.assertEqual(set(results), set())
class PlaylistTestRelativeToLib(PlaylistQueryTestHelper, unittest.TestCase):
def setup_test(self):
with open(os.path.join(self.playlist_dir, 'absolute.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'd', 'e', 'f.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'nonexisting.mp3')))
with open(os.path.join(self.playlist_dir, 'relative.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join('a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join('d', 'e', 'f.mp3')))
f.write('{0}\n'.format('nonexisting.mp3'))
self.config['playlist']['relative_to'] = 'library'
class PlaylistTestRelativeToDir(PlaylistQueryTestHelper, unittest.TestCase):
def setup_test(self):
with open(os.path.join(self.playlist_dir, 'absolute.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'd', 'e', 'f.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'nonexisting.mp3')))
with open(os.path.join(self.playlist_dir, 'relative.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join('a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join('d', 'e', 'f.mp3')))
f.write('{0}\n'.format('nonexisting.mp3'))
self.config['playlist']['relative_to'] = self.music_dir
class PlaylistTestRelativeToPls(PlaylistQueryTestHelper, unittest.TestCase):
def setup_test(self):
with open(os.path.join(self.playlist_dir, 'absolute.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'd', 'e', 'f.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'nonexisting.mp3')))
with open(os.path.join(self.playlist_dir, 'relative.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.relpath(
os.path.join(self.music_dir, 'a', 'b', 'c.mp3'),
start=self.playlist_dir,
)))
f.write('{0}\n'.format(os.path.relpath(
os.path.join(self.music_dir, 'd', 'e', 'f.mp3'),
start=self.playlist_dir,
)))
f.write('{0}\n'.format(os.path.relpath(
os.path.join(self.music_dir, 'nonexisting.mp3'),
start=self.playlist_dir,
)))
self.config['playlist']['relative_to'] = 'playlist'
self.config['playlist']['playlist_dir'] = self.playlist_dir
class PlaylistUpdateTestHelper(PlaylistTestHelper):
def setup_test(self):
with open(os.path.join(self.playlist_dir, 'absolute.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'd', 'e', 'f.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'nonexisting.mp3')))
with open(os.path.join(self.playlist_dir, 'relative.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join('a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join('d', 'e', 'f.mp3')))
f.write('{0}\n'.format('nonexisting.mp3'))
self.config['playlist']['auto'] = True
self.config['playlist']['relative_to'] = 'library'
class PlaylistTestItemMoved(PlaylistUpdateTestHelper, unittest.TestCase):
def test_item_moved(self):
# Emit item_moved event for an item that is in a playlist
results = self.lib.items(u'path:{0}'.format(shlex_quote(
os.path.join(self.music_dir, 'd', 'e', 'f.mp3'))))
item = results[0]
beets.plugins.send(
'item_moved', item=item, source=item.path,
destination=beets.util.bytestring_path(
os.path.join(self.music_dir, 'g', 'h', 'i.mp3')))
# Emit item_moved event for an item that is not in a playlist
results = self.lib.items(u'path:{0}'.format(shlex_quote(
os.path.join(self.music_dir, 'x', 'y', 'z.mp3'))))
item = results[0]
beets.plugins.send(
'item_moved', item=item, source=item.path,
destination=beets.util.bytestring_path(
os.path.join(self.music_dir, 'u', 'v', 'w.mp3')))
# Emit cli_exit event
beets.plugins.send('cli_exit', lib=self.lib)
# Check playlist with absolute paths
playlist_path = os.path.join(self.playlist_dir, 'absolute.m3u')
with open(playlist_path, 'r') as f:
lines = [line.strip() for line in f.readlines()]
self.assertEqual(lines, [
os.path.join(self.music_dir, 'a', 'b', 'c.mp3'),
os.path.join(self.music_dir, 'g', 'h', 'i.mp3'),
os.path.join(self.music_dir, 'nonexisting.mp3'),
])
# Check playlist with relative paths
playlist_path = os.path.join(self.playlist_dir, 'relative.m3u')
with open(playlist_path, 'r') as f:
lines = [line.strip() for line in f.readlines()]
self.assertEqual(lines, [
os.path.join('a', 'b', 'c.mp3'),
os.path.join('g', 'h', 'i.mp3'),
'nonexisting.mp3',
])
class PlaylistTestItemRemoved(PlaylistUpdateTestHelper, unittest.TestCase):
def test_item_removed(self):
# Emit item_removed event for an item that is in a playlist
results = self.lib.items(u'path:{0}'.format(shlex_quote(
os.path.join(self.music_dir, 'd', 'e', 'f.mp3'))))
item = results[0]
beets.plugins.send('item_removed', item=item)
# Emit item_removed event for an item that is not in a playlist
results = self.lib.items(u'path:{0}'.format(shlex_quote(
os.path.join(self.music_dir, 'x', 'y', 'z.mp3'))))
item = results[0]
beets.plugins.send('item_removed', item=item)
# Emit cli_exit event
beets.plugins.send('cli_exit', lib=self.lib)
# Check playlist with absolute paths
playlist_path = os.path.join(self.playlist_dir, 'absolute.m3u')
with open(playlist_path, 'r') as f:
lines = [line.strip() for line in f.readlines()]
self.assertEqual(lines, [
os.path.join(self.music_dir, 'a', 'b', 'c.mp3'),
os.path.join(self.music_dir, 'nonexisting.mp3'),
])
# Check playlist with relative paths
playlist_path = os.path.join(self.playlist_dir, 'relative.m3u')
with open(playlist_path, 'r') as f:
lines = [line.strip() for line in f.readlines()]
self.assertEqual(lines, [
os.path.join('a', 'b', 'c.mp3'),
'nonexisting.mp3',
])
def suite(): def suite():
return unittest.TestLoader().loadTestsFromName(__name__) return unittest.TestLoader().loadTestsFromName(__name__)