BPD uses new VFS as a backend (#131)

This commit is contained in:
Adrian Sampson 2011-05-05 17:00:05 -07:00
parent 926032fd07
commit 3e90579a6c
6 changed files with 107 additions and 224 deletions

5
NEWS
View file

@ -12,6 +12,11 @@
beets-lyrics.
* Fixed a problem where duplicate albums or items imported at the same
time would fail to be detected.
* BPD now uses a persistent "virtual filesystem" in order to fake a
directory structure. This means that your path format settings are
respected in BPD's browsing hierarchy. This may come at a performance
cost, however. The virtual filesystem used by BPD is available for
reuse by plugins (e.g., the FUSE plugin).
* Fix crash when autotagging files with no metadata.
1.0b8

View file

@ -757,12 +757,13 @@ class Library(BaseLibrary):
self.conn.executescript(setup_sql)
self.conn.commit()
def destination(self, item, pathmod=None, in_album=False, noroot=False):
def destination(self, item, pathmod=None, in_album=False, fragment=False):
"""Returns the path in the library directory designated for item
item (i.e., where the file ought to be). in_album forces the
item to be treated as part of an album. noroot makes this
item to be treated as part of an album. fragment makes this
method return just the path fragment underneath the root library
directory.
directory; the path is also returned as Unicode instead of
encoded as a bytestring.
"""
pathmod = pathmod or os.path
@ -808,7 +809,7 @@ class Library(BaseLibrary):
subpath = subpath_tmpl.substitute(mapping)
# Encode for the filesystem, dropping unencodable characters.
if isinstance(subpath, unicode):
if isinstance(subpath, unicode) and not fragment:
encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
subpath = subpath.encode(encoding, 'replace')
@ -819,7 +820,7 @@ class Library(BaseLibrary):
_, extension = pathmod.splitext(item.path)
subpath += extension
if noroot:
if fragment:
return subpath
else:
return normpath(os.path.join(self.directory, subpath))

View file

@ -209,17 +209,17 @@ def sanitize_path(path, pathmod=None):
def sanitize_for_path(value, pathmod, key=None):
"""Sanitize the value for inclusion in a path: replace separators
with _, etc. Doesn't guarantee that the whole path will be valid;
you should still call _sanitize_path on the complete path.
you should still call sanitize_path on the complete path.
"""
if isinstance(value, basestring):
for sep in (pathmod.sep, pathmod.altsep):
if sep:
value = value.replace(sep, '_')
value = value.replace(sep, u'_')
elif key in ('track', 'tracktotal', 'disc', 'disctotal'):
# pad with zeros
value = '%02i' % value
value = u'%02i' % value
else:
value = str(value)
value = unicode(value)
return value
def str2bool(value):

View file

@ -42,7 +42,7 @@ def libtree(lib):
"""
root = Node({}, {})
for item in lib.items():
dest = lib.destination(item, noroot=True)
dest = lib.destination(item, fragment=True)
parts = util.components(dest)
_insert(root, parts, item.id)
return root

View file

@ -27,6 +27,7 @@ import time
import beets
from beets.plugins import BeetsPlugin
import beets.ui
from beets import vfs
DEFAULT_PORT = 6600
@ -136,63 +137,6 @@ class BPDClose(Exception):
"""
# Path-like encoding of string sequences. We use this to simulate the
# directory structure required by the MPD protocol to browse music in
# the library.
def seq_to_path(seq, placeholder=u''):
"""Encodes a sequence of strings as a path-like string. The
sequence can be recovered exactly using path_to_list. If
`placeholder` is provided, it is used in place of empty path
components.
"""
out = []
for s in seq:
if placeholder and s == u'':
out.append(placeholder)
else:
out.append(s.replace(u'\\', u'\\\\') # preserve backslashes
.replace(u'_', u'\\_') # preserve _s
.replace(u'/', u'_') # hide /s as _s
)
return u'/'.join(out)
def path_to_list(path, placeholder=u''):
"""Takes a path-like string (probably encoded by seq_to_path) and
returns the list of strings it represents. If `placeholder` is
provided, it is interpreted to represent an empty path component.
Also, when given a `placeholder`, this function ignores empty
path components.
"""
def repl(m):
# This function maps "escaped" characters to original
# characters. Because the regex is in the right order, the
# sequences are replaced top-to-bottom.
return {u'\\\\': u'\\',
u'\\_': u'_',
u'_': u'/',
}[m.group(0)]
components = [re.sub(ur'\\\\|\\_|_', repl, component)
for component in path.split(u'/')]
if placeholder:
new_components = []
for c in components:
if c == u'':
# Drop empty path components.
continue
if c == placeholder:
new_components.append(u'')
else:
new_components.append(c)
components = new_components
return components
PATH_PH = u'(unknown)'
# Generic server infrastructure, implementing the basic protocol.
class BaseServer(object):
@ -754,6 +698,7 @@ class Server(BaseServer):
super(Server, self).__init__(host, port, password)
self.lib = library
self.player = gstplayer.GstPlayer(self.play_finished)
self.cmd_update(None)
def run(self):
self.player.run()
@ -768,12 +713,8 @@ class Server(BaseServer):
# Metadata helper functions.
def _item_path(self, item):
"""Returns the item's "virtual path."""
return seq_to_path((item.artist, item.album, item.title), PATH_PH)
def _item_info(self, item):
info_lines = [u'file: ' + self._item_path(item),
info_lines = [u'file: ' + self.lib.destination(item, fragment=True),
u'Time: ' + unicode(int(item.length)),
u'Title: ' + item.title,
u'Artist: ' + item.artist,
@ -803,114 +744,116 @@ class Server(BaseServer):
return item.id
# Path (directory tree) browsing.
def _parse_path(self, path=u"/"):
"""Take an artist/album/track path and return its components.
"""
if len(path) >= 1 and path[0] == u'/': # Remove leading slash.
path = path[1:]
items = path_to_list(path, PATH_PH)
# Database updating.
dirs = [None, None, None]
for i in range(len(dirs)):
if items:
# Take a directory if it exists. Otherwise, leave as "none".
# This way, we ensure that we always return 3 elements.
dirs[i] = items.pop(0)
return dirs
def cmd_update(self, conn, path=u'/'):
"""Updates the catalog to reflect the current database state.
"""
# Path is ignored. Also, the real MPD does this asynchronously;
# this is done inline.
self.tree = vfs.libtree(self.lib)
self.updated_time = time.time()
# Path (directory tree) browsing.
def _resolve_path(self, path):
"""Returns a VFS node or an item ID located at the path given.
If the path does not exist, raises a
"""
components = path.split(u'/')
node = self.tree
for component in components:
if not component:
continue
if isinstance(node, int):
# We're trying to descend into a file node.
raise ArgumentNotFoundError()
if component in node.files:
node = node.files[component]
elif component in node.dirs:
node = node.dirs[component]
else:
raise ArgumentNotFoundError()
return node
def _path_join(self, p1, p2):
"""Smashes together two BPD paths."""
out = p1 + u'/' + p2
return out.replace(u'//', u'/').replace(u'//', u'/')
def cmd_lsinfo(self, conn, path=u"/"):
"""Sends info on all the items in the path."""
artist, album, track = self._parse_path(path)
if artist is None: # List all artists.
artists = set()
for album in self.lib.albums():
artists.add(album.albumartist)
for artist in sorted(artists):
yield u'directory: ' + seq_to_path((artist,), PATH_PH)
elif album is None: # List all albums for an artist.
for album in self.lib.albums(artist):
parts = (album.albumartist, album.album)
yield u'directory: ' + seq_to_path(parts, PATH_PH)
elif track is None: # List all tracks on an album.
for item in self.lib.items(artist, album):
yield self._item_info(item)
else: # List a track. This isn't a directory.
node = self._resolve_path(path)
if isinstance(node, int):
# Trying to list a track.
raise BPDError(ERROR_ARG, 'this is not a directory')
else:
for name, itemid in node.files.iteritems():
item = self.lib.get_item(itemid)
yield self._item_info(item)
for name, _ in node.dirs.iteritems():
yield u'directory: ' + self._path_join(path, name)
def _listall(self, path=u"/", info=False):
def _listall(self, basepath, node, info=False):
"""Helper function for recursive listing. If info, show
tracks' complete info; otherwise, just show items' paths.
"""
artist, album, track = self._parse_path(path)
# artists
if not artist:
for a in self.lib.artists():
yield u'directory: ' + a
# albums
if not album:
for a in self.lib.albums(artist or None):
parts = a.albumartist, a.album
yield u'directory: ' + seq_to_path(parts, PATH_PH)
# tracks
items = self.lib.items(artist or None, album or None)
if info:
for item in items:
if isinstance(node, int):
# List a single file.
if info:
item = self.lib.get_item(node)
yield self._item_info(item)
else:
yield u'file: ' + basepath
else:
for item in items:
yield u'file: ' + self._item_path(i)
# List a directory. Recurse into both directories and files.
for name, itemid in sorted(node.files.iteritems()):
newpath = self._path_join(basepath, name)
# "yield from"
for v in self._listall(newpath, itemid, info): yield v
for name, subdir in sorted(node.dirs.iteritems()):
newpath = self._path_join(basepath, name)
yield u'directory: ' + newpath
for v in self._listall(newpath, subdir, info): yield v
def cmd_listall(self, conn, path=u"/"):
"""Send the paths all items in the directory, recursively."""
return self._listall(path, False)
return self._listall(path, self._resolve_path(path), False)
def cmd_listallinfo(self, conn, path=u"/"):
"""Send info on all the items in the directory, recursively."""
return self._listall(path, True)
return self._listall(path, self._resolve_path(path), True)
# Playlist manipulation.
def _all_items(self, node):
"""Generator yielding all items under a VFS node.
"""
if isinstance(node, int):
# Could be more efficient if we built up all the IDs and
# then issued a single SELECT.
yield self.lib.get_item(node)
else:
# Recurse into a directory.
for name, itemid in sorted(node.files.iteritems()):
# "yield from"
for v in self._all_items(itemid): yield v
for name, subdir in sorted(node.files.iteritems()):
for v in self._all_items(subdir): yield v
def _add(self, path, send_id=False):
"""Adds a track or directory to the playlist, specified by a
"""Adds a track or directory to the playlist, specified by the
path. If `send_id`, write each item's id to the client.
"""
components = path_to_list(path, PATH_PH)
if len(components) <= 3:
items = []
if len(components) <= 2:
# Whole artist or album.
for album in self.lib.albums(*components[:2]):
items += album.items()
else:
# Single item.
albums = list(self.lib.albums(*components[:2]))
if albums:
for item in albums[0].items():
if item.title == components[2]:
items.append(item)
break
for item in items:
self.playlist.append(item)
if send_id:
yield u'Id: ' + unicode(item.id)
if not items:
# No items matched.
raise ArgumentNotFoundError()
self.playlist_version += 1
else:
# More than three path components: invalid pathname.
raise ArgumentNotFoundError()
for item in self._all_items(self._resolve_path(path)):
self.playlist.append(item)
if send_id:
yield u'Id: ' + unicode(item.id)
def cmd_add(self, conn, path):
"""Adds a track or directory to the playlist, specified by a
@ -956,7 +899,7 @@ class Server(BaseServer):
u'uptime: ' + unicode(int(time.time() - self.startup_time)),
u'playtime: ' + u'0', #fixme
u'db_playtime: ' + unicode(int(totaltime)),
u'db_update: ' + unicode(int(self.startup_time)), #fixme
u'db_update: ' + unicode(int(self.updated_time)),
)

View file

@ -20,72 +20,6 @@ import unittest
import _common
from beetsplug import bpd
class FauxPathTest(unittest.TestCase):
def test_single_element_preserved(self):
seq = ['hello']
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq)), seq)
def test_multiple_elements_preserved(self):
seq = ['hello', 'there', 'how', 'are', 'you']
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq)), seq)
def test_spaces_preserved(self):
seq = ['hel lo', 'what', 'is up']
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq)), seq)
def test_empty_string_preserved_in_middle(self):
seq = ['hello', '', 'sup']
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq)), seq)
def test_empty_strings_preserved_on_ends(self):
seq = ['', 'whatever', '']
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq)), seq)
def test_empty_strings_only(self):
seq = ['', '', '']
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq)), seq)
def test_slashes_preserved(self):
seq = ['hel/lo', 'what', 'is', 'up']
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq)), seq)
def test_backslashes_preserved(self):
seq = ['hel\\lo', 'what', 'is', 'up']
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq)), seq)
def test_unicode_preserved(self):
seq = [u'hello', u'what \x99 is', u'up']
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq)), seq)
def test_slashes_not_encoded_as_slashes(self):
no_slashes = bpd.seq_to_path(['goodday', 'sir'])
with_slashes = bpd.seq_to_path(['good/day', 'sir'])
self.assertEqual(no_slashes.count('/'), with_slashes.count('/'))
def test_empty_seq_preserved_with_placeholder(self):
seq = []
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq, 'PH'), 'PH'),
seq)
def test_empty_strings_preserved_with_placeholder(self):
seq = ['hello', '', 'sup']
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq, 'PH'), 'PH'),
seq)
def test_empty_strings_only_preserved_with_placeholder(self):
seq = ['', '', '']
self.assertEqual(bpd.path_to_list(bpd.seq_to_path(seq, 'PH'), 'PH'),
seq)
def test_placeholder_does_replace(self):
seq = ['hello', '', 'sup']
self.assertFalse('//' in bpd.seq_to_path(seq, 'PH'))
# Note that the path encodes doesn't currently try to distinguish
# between the placeholder and strings identical to the placeholder.
# This might be a nice feature but is not currently essential.
class CommandParseTest(unittest.TestCase):
def test_no_args(self):
s = ur'command'