move a bunch of functions to util

This commit is contained in:
Adrian Sampson 2011-04-10 21:48:05 -07:00
parent 2746f1ff3f
commit e669868896
8 changed files with 66 additions and 260 deletions

View file

@ -21,6 +21,7 @@ from beets.autotag import mb
import re
from munkres import Munkres
from beets import library, mediafile, plugins
from beets.util import syspath, bytestring_path, levenshtein
import logging
# Try 5 releases. In the future, this should be more dynamic: let the
@ -89,14 +90,14 @@ def _sorted_walk(path):
order.
"""
# Make sure the path isn't a Unicode string.
path = library._bytestring_path(path)
path = bytestring_path(path)
# Get all the directories and files at this level.
dirs = []
files = []
for base in os.listdir(path):
cur = os.path.join(path, base)
if os.path.isdir(library._syspath(cur)):
if os.path.isdir(syspath(cur)):
dirs.append(base)
else:
files.append(base)
@ -136,28 +137,6 @@ def albums_in_dir(path):
if items:
yield root, items
def _levenshtein(s1, s2):
"""A nice DP edit distance implementation from Wikibooks:
http://en.wikibooks.org/wiki/Algorithm_implementation/Strings/
Levenshtein_distance#Python
"""
if len(s1) < len(s2):
return _levenshtein(s2, s1)
if not s1:
return len(s2)
previous_row = xrange(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return previous_row[-1]
def _string_dist_basic(str1, str2):
"""Basic edit distance between two strings, ignoring
non-alphanumeric characters and case. Normalized by string length.
@ -166,7 +145,7 @@ def _string_dist_basic(str1, str2):
str2 = re.sub(r'[^a-z0-9]', '', str2.lower())
if not str1 and not str2:
return 0.0
return _levenshtein(str1, str2) / float(max(len(str1), len(str2)))
return levenshtein(str1, str2) / float(max(len(str1), len(str2)))
def string_dist(str1, str2):
"""Gives an "intuitive" edit distance between two strings. This is

View file

@ -25,6 +25,7 @@ from beets import library
import beets.autotag.art
from beets import plugins
from beets.util import pipeline
from beets.util import syspath, normpath
CHOICE_SKIP = 'CHOICE_SKIP'
CHOICE_ASIS = 'CHOICE_ASIS'
@ -238,7 +239,7 @@ def read_albums(config):
possible), False (never resume), or None (ask).
"""
# Use absolute paths.
paths = [library._normpath(path) for path in config.paths]
paths = [normpath(path) for path in config.paths]
# Look for saved progress.
progress = config.resume is not False
@ -394,7 +395,7 @@ def apply_choices(config):
for old_path in old_paths:
# Only delete files that were actually moved.
if old_path not in new_paths:
os.remove(library._syspath(old_path))
os.remove(syspath(old_path))
# Update progress.
if config.resume is not False:
@ -433,7 +434,7 @@ def simple_import(config):
for old_path in old_paths:
# Only delete files that were actually moved.
if old_path not in new_paths:
os.remove(library._syspath(old_path))
os.remove(syspath(old_path))
log.info('added album: %s - %s' % (album.albumartist, album.album))

View file

@ -1,5 +1,5 @@
# This file is part of beets.
# Copyright 2010, Adrian Sampson.
# Copyright 2011, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
@ -21,6 +21,8 @@ from string import Template
import logging
from beets.mediafile import MediaFile
from beets import plugins
from beets import util
from beets.util import bytestring_path, syspath, normpath
MAX_FILENAME_LENGTH = 200
@ -109,184 +111,6 @@ class InvalidFieldError(Exception):
pass
# Utility functions.
def _normpath(path):
"""Provide the canonical form of the path suitable for storing in
the database.
"""
return os.path.normpath(os.path.abspath(os.path.expanduser(path)))
def _ancestry(path, pathmod=None):
"""Return a list consisting of path's parent directory, its
grandparent, and so on. For instance:
>>> _ancestry('/a/b/c')
['/', '/a', '/a/b']
"""
pathmod = pathmod or os.path
out = []
last_path = None
while path:
path = pathmod.dirname(path)
if path == last_path:
break
last_path = path
if path: # don't yield ''
out.insert(0, path)
return out
def _mkdirall(path):
"""Make all the enclosing directories of path (like mkdir -p on the
parent).
"""
for ancestor in _ancestry(path):
if not os.path.isdir(_syspath(ancestor)):
os.mkdir(_syspath(ancestor))
def _prune_dirs(path, root):
"""If path is an empty directory, then remove it. Recursively
remove path's ancestry up to root (which is never removed) where
there are empty directories. If path is not contained in root, then
nothing is removed.
"""
path = _normpath(path)
root = _normpath(root)
ancestors = _ancestry(path)
if root in ancestors:
# Only remove directories below the root.
ancestors = ancestors[ancestors.index(root)+1:]
# Traverse upward from path.
ancestors.append(path)
ancestors.reverse()
for directory in ancestors:
try:
os.rmdir(_syspath(directory))
except OSError:
break
def _components(path, pathmod=None):
"""Return a list of the path components in path. For instance:
>>> _components('/a/b/c')
['a', 'b', 'c']
"""
pathmod = pathmod or os.path
comps = []
ances = _ancestry(path, pathmod)
for anc in ances:
comp = pathmod.basename(anc)
if comp:
comps.append(comp)
else: # root
comps.append(anc)
last = pathmod.basename(path)
if last:
comps.append(last)
return comps
def _bytestring_path(path):
"""Given a path, which is either a str or a unicode, returns a str
path (ensuring that we never deal with Unicode pathnames).
"""
# Pass through bytestrings.
if isinstance(path, str):
return path
# Try to encode with default encodings, but fall back to UTF8.
encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
try:
return path.encode(encoding)
except UnicodeError:
return path.encode('utf8')
def _syspath(path, pathmod=None):
"""Convert a path for use by the operating system. In particular,
paths on Windows must receive a magic prefix and must be converted
to unicode before they are sent to the OS.
"""
pathmod = pathmod or os.path
windows = pathmod.__name__ == 'ntpath'
# Don't do anything if we're not on windows
if not windows:
return path
if not isinstance(path, unicode):
# Try to decode with default encodings, but fall back to UTF8.
encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
try:
path = path.decode(encoding, 'replace')
except UnicodeError:
path = path.decode('utf8', 'replace')
# Add the magic prefix if it isn't already there
if not path.startswith(u'\\\\?\\'):
path = u'\\\\?\\' + path
return path
# Note: POSIX actually supports \ and : -- I just think they're
# a pain. And ? has caused problems for some.
CHAR_REPLACE = [
(re.compile(r'[\\/\?]|^\.'), '_'),
(re.compile(r':'), '-'),
]
CHAR_REPLACE_WINDOWS = re.compile('["\*<>\|]|^\.|\.$| +$'), '_'
def _sanitize_path(path, pathmod=None):
"""Takes a path and makes sure that it is legal. Returns a new path.
Only works with fragments; won't work reliably on Windows when a
path begins with a drive letter. Path separators (including altsep!)
should already be cleaned from the path components.
"""
pathmod = pathmod or os.path
windows = pathmod.__name__ == 'ntpath'
comps = _components(path, pathmod)
if not comps:
return ''
for i, comp in enumerate(comps):
# Replace special characters.
for regex, repl in CHAR_REPLACE:
comp = regex.sub(repl, comp)
if windows:
regex, repl = CHAR_REPLACE_WINDOWS
comp = regex.sub(repl, comp)
# Truncate each component.
comp = comp[:MAX_FILENAME_LENGTH]
comps[i] = comp
return pathmod.join(*comps)
def _sanitize_for_path(value, pathmod, key=None):
"""Sanitize the value for inclusion in a path: replace separators
with _, etc. Doesn't guarantee that the whole path will be valid;
you should still call _sanitize_path on the complete path.
"""
if isinstance(value, basestring):
for sep in (pathmod.sep, pathmod.altsep):
if sep:
value = value.replace(sep, '_')
elif key in ('track', 'tracktotal', 'disc', 'disctotal'):
# pad with zeros
value = '%02i' % value
else:
value = str(value)
return value
def _bool(value):
"""Returns a boolean reflecting a human-entered string."""
if value.lower() in ('yes', '1', 'true', 't', 'y'):
return True
else:
return False
# Library items (songs).
class Item(object):
@ -345,7 +169,7 @@ class Item(object):
# Encode unicode paths and read buffers.
if key == 'path':
if isinstance(value, unicode):
value = _bytestring_path(value)
value = bytestring_path(value)
elif isinstance(value, buffer):
value = str(value)
@ -367,8 +191,8 @@ class Item(object):
if read_path is None:
read_path = self.path
else:
read_path = _normpath(read_path)
f = MediaFile(_syspath(read_path))
read_path = normpath(read_path)
f = MediaFile(syspath(read_path))
for key in ITEM_KEYS_META:
setattr(self, key, getattr(f, key))
@ -377,7 +201,7 @@ class Item(object):
def write(self):
"""Writes the item's metadata to the associated file.
"""
f = MediaFile(_syspath(self.path))
f = MediaFile(syspath(self.path))
for key in ITEM_KEYS_WRITABLE:
setattr(f, key, getattr(self, key))
f.save()
@ -408,16 +232,16 @@ class Item(object):
dest = library.destination(self, in_album=in_album)
# Create necessary ancestry for the move.
_mkdirall(dest)
util.mkdirall(dest)
if not shutil._samefile(_syspath(self.path), _syspath(dest)):
if not shutil._samefile(syspath(self.path), syspath(dest)):
if copy:
# copyfile rather than copy will not copy permissions
# bits, thus possibly making the copy writable even when
# the original is read-only.
shutil.copyfile(_syspath(self.path), _syspath(dest))
shutil.copyfile(syspath(self.path), syspath(dest))
else:
shutil.move(_syspath(self.path), _syspath(dest))
shutil.move(syspath(self.path), syspath(dest))
# Either copying or moving succeeded, so update the stored path.
self.path = dest
@ -588,7 +412,7 @@ class CollectionQuery(Query):
elif key.lower() in ITEM_KEYS: # ignore unrecognized keys
subqueries.append(SubstringQuery(key.lower(), pattern))
elif key.lower() == 'singleton':
subqueries.append(SingletonQuery(_bool(pattern)))
subqueries.append(SingletonQuery(util.str2bool(pattern)))
if not subqueries: # no terms in query
subqueries = [TrueQuery()]
return cls(subqueries)
@ -854,14 +678,14 @@ class Library(BaseLibrary):
art_filename='cover',
item_fields=ITEM_FIELDS,
album_fields=ALBUM_FIELDS):
self.path = _bytestring_path(path)
self.directory = _bytestring_path(directory)
self.path = bytestring_path(path)
self.directory = bytestring_path(directory)
if path_formats is None:
path_formats = {'default': '$artist/$album/$track $title'}
elif isinstance(path_formats, basestring):
path_formats = {'default': path_formats}
self.path_formats = path_formats
self.art_filename = _bytestring_path(art_filename)
self.art_filename = bytestring_path(art_filename)
self.conn = sqlite3.connect(self.path)
self.conn.row_factory = sqlite3.Row
@ -949,7 +773,7 @@ class Library(BaseLibrary):
else:
# From Item.
value = getattr(item, key)
mapping[key] = _sanitize_for_path(value, pathmod, key)
mapping[key] = util.sanitize_for_path(value, pathmod, key)
# Use the album artist if the track artist is not set and
# vice-versa.
@ -967,13 +791,13 @@ class Library(BaseLibrary):
subpath = subpath.encode(encoding, 'replace')
# Truncate components and remove forbidden characters.
subpath = _sanitize_path(subpath)
subpath = util.sanitize_path(subpath)
# Preserve extension.
_, extension = pathmod.splitext(item.path)
subpath += extension
return _normpath(os.path.join(self.directory, subpath))
return normpath(os.path.join(self.directory, subpath))
# Main interface.
@ -1074,8 +898,8 @@ class Library(BaseLibrary):
album.remove(delete, False)
if delete:
os.unlink(_syspath(item.path))
_prune_dirs(os.path.dirname(item.path), self.directory)
os.unlink(syspath(item.path))
util.prune_dirs(os.path.dirname(item.path), self.directory)
# Browsing.
@ -1207,7 +1031,7 @@ class Album(BaseAlbum):
def __init__(self, lib, record):
# Decode Unicode paths in database.
if 'artpath' in record and isinstance(record['artpath'], unicode):
record['artpath'] = _bytestring_path(record['artpath'])
record['artpath'] = bytestring_path(record['artpath'])
super(Album, self).__init__(lib, record)
def __setattr__(self, key, value):
@ -1218,7 +1042,7 @@ class Album(BaseAlbum):
elif key in ALBUM_KEYS:
# Make sure paths are bytestrings.
if key == 'artpath' and isinstance(value, unicode):
value = _bytestring_path(value)
value = bytestring_path(value)
# Reflect change in this object.
self._record[key] = value
@ -1275,7 +1099,7 @@ class Album(BaseAlbum):
# Delete art file.
artpath = self.artpath
if artpath:
os.unlink(_syspath(artpath))
os.unlink(syspath(artpath))
# Remove album from database.
self._library.conn.execute(
@ -1299,9 +1123,9 @@ class Album(BaseAlbum):
new_art = self.art_destination(old_art, newdir)
if new_art != old_art:
if copy:
shutil.copy(_syspath(old_art), _syspath(new_art))
shutil.copy(syspath(old_art), syspath(new_art))
else:
shutil.move(_syspath(old_art), _syspath(new_art))
shutil.move(syspath(old_art), syspath(new_art))
self.artpath = new_art
# Store new item paths. We do this at the end to avoid
@ -1318,7 +1142,7 @@ class Album(BaseAlbum):
items, so the album must contain at least one item or
item_dir must be provided.
"""
image = _bytestring_path(image)
image = bytestring_path(image)
if item_dir is None:
item = self.items().next()
item_dir = os.path.dirname(item.path)
@ -1330,11 +1154,11 @@ class Album(BaseAlbum):
"""Sets the album's cover art to the image at the given path.
The image is copied into place, replacing any existing art.
"""
path = _bytestring_path(path)
path = bytestring_path(path)
oldart = self.artpath
artdest = self.art_destination(path)
if oldart == artdest:
os.unlink(_syspath(oldart))
os.unlink(syspath(oldart))
shutil.copyfile(_syspath(path), _syspath(artdest))
shutil.copyfile(syspath(path), syspath(artdest))
self.artpath = artdest

View file

@ -202,7 +202,6 @@ class Packed(object):
return _safe_cast(self.out_type, out)
def __setitem__(self, index, value):
if self.packstyle in (packing.SLASHED, packing.TUPLE):
# SLASHED and TUPLE are always two-item packings
length = 2

View file

@ -26,7 +26,7 @@ from beets import autotag
import beets.autotag.art
from beets import plugins
from beets import importer
from beets import library
from beets.util import syspath
# Global logger.
log = logging.getLogger('beets')
@ -304,7 +304,7 @@ def import_files(lib, paths, copy, write, autot, logpath, art, threaded,
"""
# Check the user-specified directories.
for path in paths:
if not os.path.isdir(library._syspath(path)):
if not os.path.isdir(syspath(path)):
raise ui.UserError('not a directory: ' + path)
# Open the log.

View file

@ -19,6 +19,7 @@ from beets.plugins import BeetsPlugin
from beets import library
from beets import ui
from beets import mediafile
from beets import util
def info(paths):
# Set up fields to output.
@ -34,7 +35,7 @@ def info(paths):
if not first:
ui.print_()
path = library._normpath(path)
path = util.normpath(path)
ui.print_(path)
try:
mf = mediafile.MediaFile(path)

View file

@ -24,11 +24,12 @@ import posixpath
import _common
from _common import item
import beets.library
from beets import util
def lib(): return beets.library.Library('rsrc' + os.sep + 'test.blb')
def boracay(l): return beets.library.Item(l.conn.execute('select * from items '
'where id=3').fetchone())
np = beets.library._normpath
np = util.normpath
class LoadTest(unittest.TestCase):
def setUp(self):
@ -212,15 +213,15 @@ class DestinationTest(unittest.TestCase):
self.assertFalse('two / three' in p)
def test_sanitize_unix_replaces_leading_dot(self):
p = beets.library._sanitize_path('one/.two/three', posixpath)
p = util.sanitize_path('one/.two/three', posixpath)
self.assertFalse('.' in p)
def test_sanitize_windows_replaces_trailing_dot(self):
p = beets.library._sanitize_path('one/two./three', ntpath)
p = util.sanitize_path('one/two./three', ntpath)
self.assertFalse('.' in p)
def test_sanitize_windows_replaces_illegal_chars(self):
p = beets.library._sanitize_path(':*?"<>|', ntpath)
p = util.sanitize_path(':*?"<>|', ntpath)
self.assertFalse(':' in p)
self.assertFalse('*' in p)
self.assertFalse('?' in p)
@ -230,7 +231,7 @@ class DestinationTest(unittest.TestCase):
self.assertFalse('|' in p)
def test_sanitize_replaces_colon_with_dash(self):
p = beets.library._sanitize_path(u':', posixpath)
p = util.sanitize_path(u':', posixpath)
self.assertEqual(p, u'-')
def test_path_with_format(self):
@ -302,26 +303,26 @@ class DestinationTest(unittest.TestCase):
def test_syspath_windows_format(self):
path = ntpath.join('a', 'b', 'c')
outpath = beets.library._syspath(path, ntpath)
outpath = util.syspath(path, ntpath)
self.assertTrue(isinstance(outpath, unicode))
self.assertTrue(outpath.startswith(u'\\\\?\\'))
def test_syspath_posix_unchanged(self):
path = posixpath.join('a', 'b', 'c')
outpath = beets.library._syspath(path, posixpath)
outpath = util.syspath(path, posixpath)
self.assertEqual(path, outpath)
def test_sanitize_windows_replaces_trailing_space(self):
p = beets.library._sanitize_path('one/two /three', ntpath)
p = util.sanitize_path('one/two /three', ntpath)
self.assertFalse(' ' in p)
def test_component_sanitize_replaces_separators(self):
name = posixpath.join('a', 'b')
newname = beets.library._sanitize_for_path(name, posixpath)
newname = util.sanitize_for_path(name, posixpath)
self.assertNotEqual(name, newname)
def test_component_sanitize_pads_with_zero(self):
name = beets.library._sanitize_for_path(1, posixpath, 'track')
name = util.sanitize_for_path(1, posixpath, 'track')
self.assertTrue(name.startswith('0'))
def test_artist_falls_back_to_albumartist(self):
@ -353,7 +354,7 @@ class DestinationTest(unittest.TestCase):
self.assertEqual(p.rsplit(os.path.sep, 1)[1], 'something')
def test_sanitize_path_works_on_empty_string(self):
p = beets.library._sanitize_path('', posixpath)
p = util.sanitize_path('', posixpath)
self.assertEqual(p, '')
class MigrationTest(unittest.TestCase):
@ -646,12 +647,12 @@ class PathStringTest(unittest.TestCase):
def test_sanitize_path_with_special_chars(self):
path = 'b\xe1r?'
new_path = beets.library._sanitize_path(path)
new_path = util.sanitize_path(path)
self.assert_(new_path.startswith('b\xe1r'))
def test_sanitize_path_returns_bytestring(self):
path = 'b\xe1r?'
new_path = beets.library._sanitize_path(path)
new_path = util.sanitize_path(path)
self.assert_(isinstance(new_path, str))
def test_unicode_artpath_becomes_bytestring(self):

View file

@ -24,6 +24,7 @@ from os.path import join
import _common
from _common import item
import beets.library
from beets import util
def touch(path):
open(path, 'a').close()
@ -72,7 +73,7 @@ class MoveTest(unittest.TestCase):
def test_move_changes_path(self):
self.i.move(self.lib)
self.assertEqual(self.i.path, beets.library._normpath(self.dest))
self.assertEqual(self.i.path, util.normpath(self.dest))
def test_copy_already_at_destination(self):
self.i.move(self.lib)
@ -102,28 +103,28 @@ class HelperTest(unittest.TestCase):
def test_ancestry_works_on_file(self):
p = '/a/b/c'
a = ['/','/a','/a/b']
self.assertEqual(beets.library._ancestry(p), a)
self.assertEqual(util.ancestry(p), a)
def test_ancestry_works_on_dir(self):
p = '/a/b/c/'
a = ['/', '/a', '/a/b', '/a/b/c']
self.assertEqual(beets.library._ancestry(p), a)
self.assertEqual(util.ancestry(p), a)
def test_ancestry_works_on_relative(self):
p = 'a/b/c'
a = ['a', 'a/b']
self.assertEqual(beets.library._ancestry(p), a)
self.assertEqual(util.ancestry(p), a)
def test_components_works_on_file(self):
p = '/a/b/c'
a = ['/', 'a', 'b', 'c']
self.assertEqual(beets.library._components(p), a)
self.assertEqual(util.components(p), a)
def test_components_works_on_dir(self):
p = '/a/b/c/'
a = ['/', 'a', 'b', 'c']
self.assertEqual(beets.library._components(p), a)
self.assertEqual(util.components(p), a)
def test_components_works_on_relative(self):
p = 'a/b/c'
a = ['a', 'b', 'c']
self.assertEqual(beets.library._components(p), a)
self.assertEqual(util.components(p), a)
class AlbumFileTest(unittest.TestCase):
def setUp(self):
@ -136,7 +137,7 @@ class AlbumFileTest(unittest.TestCase):
self.i = item()
# Make a file for the item.
self.i.path = self.lib.destination(self.i)
beets.library._mkdirall(self.i.path)
util.mkdirall(self.i.path)
touch(self.i.path)
# Make an album.
self.ai = self.lib.add_album((self.i,))
@ -178,7 +179,7 @@ class ArtFileTest(unittest.TestCase):
self.i = item()
self.i.path = self.lib.destination(self.i)
# Make a music file.
beets.library._mkdirall(self.i.path)
util.mkdirall(self.i.path)
touch(self.i.path)
# Make an album.
self.ai = self.lib.add_album((self.i,))
@ -251,7 +252,7 @@ class RemoveTest(unittest.TestCase):
self.i = item()
self.i.path = self.lib.destination(self.i)
# Make a music file.
beets.library._mkdirall(self.i.path)
util.mkdirall(self.i.path)
touch(self.i.path)
# Make an album with the item.
self.ai = self.lib.add_album((self.i,))