initial import

--HG--
extra : convert_revision : svn%3A41726ec3-264d-0410-9c23-a9f1637257cc/trunk%402
This commit is contained in:
adrian.sampson 2008-05-14 01:42:56 +00:00
parent c1ed60af98
commit ee7bb4b9e8
11 changed files with 706 additions and 0 deletions

2
beets/__init__.py Normal file
View file

@ -0,0 +1,2 @@
#!/usr/bin/env python
from beets.library import Library

167
beets/library.py Normal file
View file

@ -0,0 +1,167 @@
#!/usr/bin/env python
import sqlite3, os, sys, operator
import beets.tag
from string import Template
# Fields in the "items" table; all the metadata available for items in the
# library. These are used directly in SQL; they are vulnerable to injection if
# accessible to the user.
metadata_fields = [
('title', 'text'),
('artist', 'text'),
('album', 'text'),
('genre', 'text'),
('composer', 'text'),
('grouping', 'text'),
('year', 'int'),
('track', 'int'),
('maxtrack', 'int'),
('disc', 'int'),
('maxdisc', 'int'),
('lyrics', 'text'),
('comments', 'text'),
('bpm', 'int'),
('comp', 'bool')
]
class Library(object):
def __init__(self, path='library.blb'):
self.path = path
self.conn = sqlite3.connect(self.path)
self.conn.row_factory = sqlite3.Row
self.setup()
def setup(self):
"Set up the schema of the library file."
# options (library data) table
setup_sql = """
create table if not exists options (
key text primary key,
value text
);"""
# items (things in the library) table
setup_sql += """create table if not exists items (
path text primary key, """
setup_sql += ', '.join(map(' '.join, metadata_fields))
setup_sql += ' );'
c = self.conn.cursor()
c.executescript(setup_sql)
c.close()
self.conn.commit()
# DATABASE UTILITY FUNCTIONS
def select(self, where='', subvars=[], columns='*'):
"Look up items in the library. Returns a cursor."
c = self.conn.cursor()
if where.strip(): # we have a where clause
where = ' where ' + where
c.execute('select ' + columns + ' from items' + where,
subvars)
return c
def selects_any(self, where, subvars):
"Returns True iff the SELECT query matches any rows."
c = self.select(where, subvars)
out = (c.fetchone() is not None)
c.close()
# FILE/DB UTILITY FUNCTIONS
def add_file(self, path):
"Adds a new file to the library."
# build query part for metadata fields
columns = ','.join(map(operator.itemgetter(0),metadata_fields))
values = ','.join(['?']*len(metadata_fields))
subvars = []
f = beets.tag.MediaFile(path)
for field, value in metadata_fields:
subvars.append(getattr(f, field))
# other fields
columns += ',path'
values += ',?'
subvars.append(path)
# issue query
c = self.conn.cursor()
query = 'insert into items (' + columns + ') values (' + values + ')'
c.execute(query, subvars)
c.close()
def update_file(self, path):
"Updates a file already in the database with the file's metadata."
# build query part for metadata fields
assignments = ','.join(['?=?']*len(metadata_fields))
subvars = []
f = beets.tag.MediaFile(path)
for field, value in metadata_fields:
subvars += [field, getattr(f, field)]
# build the full query itself
query = 'update items set ' + assignments + ' where path=?'
subvars.append(path)
c = self.conn.cursor()
c.execute(query, subvars)
c.close()
# MISC. UTILITY FUNCTIONS
def mynormpath(self, path):
"""Provide the canonical form of the path suitable for storing in the
database. In the future, options may modify the behavior of this
method."""
# force absolute paths:
# os.path.normpath(os.path.abspath(os.path.expanduser(path)))
return os.path.normpath(os.path.expanduser(path))
def log(self, msg):
"""Print a log message."""
print >>sys.stderr, msg
def pprint(self, item, form='$artist - $title'):
print Template(form).substitute(item)
def add_path(self, path, clobber=False):
"""Add a file to the library or recursively search a directory and add
all its contents."""
if os.path.isdir(path):
# recurse into all directory contents
for ent in os.listdir(path):
self.add_path(path + os.sep + ent, clobber)
elif os.path.isfile(path):
# add _if_ it's legible (otherwise ignore but say so)
if self.selects_any('path=?', (self.mynormpath(path),)):
if not clobber:
self.log(path + ' already in database, skipping')
return
else:
self.update_file(self.mynormpath(path))
else:
try:
self.add_file(self.mynormpath(path))
except beets.tag.FileTypeError:
self.log(path + ' of unknown type, skipping')
elif not os.path.exists(path):
raise IOError('file not found: ' + path)
# high-level (and command-line) interface
def add(self, *paths):
for path in paths:
self.add_path(path, clobber=False)
self.conn.commit()
def remove(self, *criteria):
raise NotImplementedError
def update(self, *criteria):
#c = self.select(criteria, [], 'path')
#for f in c:
# self.update_file(f.path)
#c.close()
raise NotImplementedError
def write(self, *criteria):
raise NotImplementedError
def list(self, *criteria):
c = self.select(' '.join(criteria), [])
for row in c:
self.pprint(row)
c.close()

360
beets/tag.py Normal file
View file

@ -0,0 +1,360 @@
#!/usr/bin/env python
"""Handles low-level interfacing for files' tags. Wraps mutagen to
automatically detect file types and provide a unified interface for the
specific tags Beets is interested in."""
from mutagen import mp4, mp3, id3
import os.path
__all__ = ['FileTypeError', 'MediaFile']
# Currently allowed values for type:
# mp3, mp4
class FileTypeError(IOError):
pass
###########################
#### UTILITY FUNCTIONS ####
###########################
# For dealing with the sort of weirdnesses we find in metadata fields.
def fromslashed(slashed, sep=u'/'):
"""Extract a pair of items from a slashed string. If only one
value is present, it is assumed to be the left-hand value."""
if slashed is None:
return (None, None)
items = slashed.split(sep)
if len(items) == 1:
out = (items[0], None)
else:
out = (items[0], items[1])
# represent "nothing stored" more gracefully
if out[0] == '': out[0] = None
if out[1] == '': out[1] = None
return out
def toslashed(pair_or_val, sep=u'/'):
"""Store a pair of items or a single item in a slashed string. If
only one value is provided (in a list/tuple or as a single value),
no slash is used."""
if type(pair_or_val) is list or type(pair_or_val) is tuple:
if len(pair_or_val) == 0:
out = [u'']
elif len(pair_or_val) == 1:
out = [unicode(pair_or_val[0])]
else:
out = [unicode(pair_or_val[0]), unicode(pair_or_val[1])]
else: # "scalar"
out = [unicode(pair_or_val)]
return sep.join(out)
def unpair(pair, right=False, noneval=None):
"""Return the left or right value in a pair (as selected by the "right"
parameter. If the value on that side is not available, return noneval.)"""
if right: idx = 1
else: idx = 0
try:
out = pair[idx]
except:
out = None
finally:
if out is None:
return noneval
else:
return out
def normalize_pair(pair, noneval=None):
"""Make sure the pair is a tuple that has exactly two entries. If we need
to fill anything in, we'll use noneval."""
return (unpair(pair, False, noneval),
unpair(pair, True, noneval))
###############################
#### MediaField descriptor ####
###############################
class MediaField(object):
"""A descriptor providing access to a particular (abstract) metadata
field. The various messy parameters control the translation to concrete
metadata manipulations in the language of mutagen."""
# possible types used to store the relevant data
TYPE_RAW = 0 # stored as a single object (not in a list)
TYPE_LIST = 1 << 0 # stored in the first element of a list
TYPE_UNICODE = 1 << 1 # stored as a unicode object
TYPE_INTEGER = 1 << 2 # as an int
TYPE_BOOLEAN = 1 << 3 # as a bool
# RAW and LIST are mutually exclusive, as are UNICODE, INTEGER and
# BOOLEAN. Must pick either RAW or LIST, but none of the other types
# are necessary.
# non-type aspects of data storage
STYLE_PLAIN = 0 # no filtering
STYLE_UNICODE = 1 << 0 # value is a string, stored as a string
STYLE_INTEGER = 1 << 1 # value is an integer, maybe stored as a string
STYLE_BOOLEAN = 1 << 2 # value is a boolean, probably stored as a string
STYLE_SLASHED = 1 << 3 # int stored in a string on one side of a / char
STYLE_2PLE = 1 << 4 # stored as one value in an integer 2-tuple
# The above styles are all mutually exclusive.
STYLE_LEFT = 1 << 5 # for SLASHED or 2PLE, value is in first entry
STYLE_RIGHT = 1 << 6 # likewise, in second entry
# These are mutually exclusive and relevant only with SLASHED and 2PLE.
def __init__(self, id3key, mp4key,
# in ID3 tags, use only the frame with this "desc" field
id3desc=None,
# compositions of the TYPE_ flag above
id3type=TYPE_UNICODE|TYPE_LIST, mp4type=TYPE_UNICODE|TYPE_LIST,
# compositions of STYLE_ flags
id3style=STYLE_UNICODE, mp4style=STYLE_UNICODE
):
self.keys = { 'mp3': id3key,
'mp4': mp4key }
self.types = { 'mp3': id3type,
'mp4': mp4type }
self.styles = { 'mp3': id3style,
'mp4': mp4style }
self.id3desc = id3desc
def __fetchdata(self, obj):
"""Get the value associated with this descriptor's key (and id3desc if
present) from the mutagen tag dict. Unwraps from a list if necessary."""
(mykey, mytype, mystyle) = self.__params(obj)
try:
# fetch the value, which may be a scalar or a list
if obj.type == 'mp3':
if self.id3desc is not None: # also match on 'desc' field
frames = obj.tags.tags.getall(mykey)
entry = None
for frame in frames:
if frame.desc == self.id3desc:
entry = frame.text
break
if entry is None: # no desc match
return None
else:
entry = obj.tags[mykey].text
else:
entry = obj.tags[mykey]
# possibly index the list
if mytype & self.TYPE_LIST:
return entry[0]
else:
return entry
except KeyError: # the tag isn't present
return None
def __storedata(self, obj, val):
"""Store val for this descriptor's key in the tag dictionary. Store it
as a single-item list if necessary. Uses id3desc if present."""
(mykey, mytype, mystyle) = self.__params(obj)
# wrap as a list if necessary
if mytype & self.TYPE_LIST: out = [val]
else: out = val
if obj.type == 'mp3':
if self.id3desc is not None: # match on id3desc
frames = obj.tags.tags.getall(mykey)
# try modifying in place
found = False
for frame in frames:
if frame.desc == self.id3desc:
frame.text = out
found = True
break
# need to make a new frame?
if not found:
frame = id3.Frames[mykey](encoding=3, desc=self.id3desc,
text=val)
obj.tags.tags.add(frame)
else: # no match on desc; just replace based on key
frame = id3.Frames[mykey](encoding=3, text=val)
obj.tags.tags.setall(mykey, [frame])
else:
obj.tags[mykey] = out
def __params(self, obj):
return (self.keys[obj.type],
self.types[obj.type],
self.styles[obj.type])
def __get__(self, obj, owner):
"""Retrieve the value of this metadata field."""
out = None
(mykey, mytype, mystyle) = self.__params(obj)
out = self.__fetchdata(obj)
# deal with slashed and tuple storage
if mystyle & self.STYLE_SLASHED or mystyle & self.STYLE_2PLE:
if mystyle & self.STYLE_SLASHED:
out = fromslashed(out)
out = unpair(out, mystyle & self.STYLE_RIGHT, noneval=0)
# return the appropriate type
if mystyle & self.STYLE_INTEGER or mystyle & self.STYLE_SLASHED \
or mystyle & self.STYLE_2PLE:
if out is None:
return 0
else:
try:
return int(out)
except: # in case out is not convertible directly to an int
return int(unicode(out))
elif mystyle & self.STYLE_BOOLEAN:
if out is None:
return False
else:
return bool(int(out)) # should work for strings, bools, ints
elif mystyle & self.STYLE_UNICODE:
if out is None:
return u''
else:
return unicode(out)
else:
return out
def __set__(self, obj, val):
"""Set the value of this metadata field."""
(mykey, mytype, mystyle) = self.__params(obj)
# apply style filters
if mystyle & self.STYLE_SLASHED or mystyle & self.STYLE_2PLE:
# fetch the existing value so we can preserve half of it
pair = self.__fetchdata(obj)
if mystyle & self.STYLE_SLASHED:
pair = fromslashed(pair)
pair = normalize_pair(pair, noneval=0)
# set the appropriate side of the pair
if mystyle & self.STYLE_LEFT:
pair = (val, pair[1])
else:
pair = (pair[0], val)
if mystyle & self.STYLE_SLASHED:
out = toslashed(pair)
else:
out = pair
else: # plain, integer, or boolean
out = val
# deal with Nones according to abstract type if present
if out is None:
if mystyle & self.STYLE_INTEGER:
out = 0
elif mystyle & self.STYLE_BOOLEAN:
out = False
elif mystyle & self.STYLE_UNICODE:
out = u''
# We trust that SLASHED and 2PLE are handled above.
# convert to correct storage type
if mytype & self.TYPE_UNICODE:
if out is None:
out = u''
else:
if mystyle & self.STYLE_BOOLEAN:
# store bools as 1,0 instead of True,False
out = unicode(int(out))
else:
out = unicode(out)
elif mytype & self.TYPE_INTEGER:
if out is None:
out = 0
else:
out = int(out)
elif mytype & self.TYPE_BOOLEAN:
out = bool(out)
# store the data
self.__storedata(obj, out)
#########################
#### MediaFile class ####
#########################
class MediaFile(object):
"""Represents a multimedia file on disk and provides access to its
metadata."""
def __init__(self, path):
root, ext = os.path.splitext(path)
if ext == '.mp3':
self.type = 'mp3'
self.tags = mp3.Open(path)
elif ext == '.m4a' or ext == '.mp4' or ext == '.m4b' or ext == '.m4p':
self.type = 'mp4'
self.tags = mp4.Open(path)
else:
raise FileTypeError('unsupported file extension: ' + ext)
def save_tags(self):
self.tags.save()
###########################
#### FIELD DEFINITIONS ####
###########################
title = MediaField('TIT2', "\xa9nam")
artist = MediaField('TPE1', "\xa9ART")
album = MediaField('TALB', "\xa9alb")
genre = MediaField('TCON', "\xa9gen")
composer = MediaField('TCOM', "\xa9wrt")
grouping = MediaField('TIT1', "\xa9grp")
year = MediaField('TDRC', "\xa9day",
id3style=MediaField.STYLE_INTEGER,
mp4style=MediaField.STYLE_INTEGER)
track = MediaField('TRCK', 'trkn',
id3style=MediaField.STYLE_SLASHED | MediaField.STYLE_LEFT,
mp4type=MediaField.TYPE_LIST,
mp4style=MediaField.STYLE_2PLE | MediaField.STYLE_LEFT)
maxtrack = MediaField('TRCK', 'trkn',
id3style=MediaField.STYLE_SLASHED | MediaField.STYLE_RIGHT,
mp4type=MediaField.TYPE_LIST,
mp4style=MediaField.STYLE_2PLE | MediaField.STYLE_RIGHT)
disc = MediaField('TPOS', 'disk',
id3style=MediaField.STYLE_SLASHED | MediaField.STYLE_LEFT,
mp4type=MediaField.TYPE_LIST,
mp4style=MediaField.STYLE_2PLE | MediaField.STYLE_LEFT)
maxdisc = MediaField('TPOS', 'disk',
id3style=MediaField.STYLE_SLASHED | MediaField.STYLE_RIGHT,
mp4type=MediaField.TYPE_LIST,
mp4style=MediaField.STYLE_2PLE | MediaField.STYLE_RIGHT)
lyrics = MediaField(u"USLT", "\xa9lyr", id3desc=u'',
id3type=MediaField.TYPE_UNICODE)
comments = MediaField(u"COMM", "\xa9cmt", id3desc=u'')
bpm = MediaField('TBPM', 'tmpo',
id3style=MediaField.STYLE_INTEGER,
mp4type=MediaField.TYPE_LIST | MediaField.TYPE_INTEGER,
mp4style=MediaField.STYLE_INTEGER)
comp = MediaField('TCMP', 'cpil',
id3style=MediaField.STYLE_BOOLEAN,
mp4type=MediaField.TYPE_BOOLEAN,
mp4style=MediaField.STYLE_BOOLEAN)

42
bts.py Executable file
View file

@ -0,0 +1,42 @@
#!/usr/bin/env python
from optparse import OptionParser
from beets import Library
if __name__ == "__main__":
# parse options
usage = """usage: %prog [options] command
command is one of: add, remove, update, write, list, help"""
op = OptionParser(usage=usage)
op.add_option('-l', '--library', dest='libpath', metavar='PATH',
default='library.blb',
help='work on the specified library file')
op.remove_option('--help')
opts, args = op.parse_args()
# make sure we have a command
if len(args) < 1:
op.error('no command specified')
cmd = args.pop(0)
lib = Library(opts.libpath)
# make a "help" command
def help(*args): op.print_help()
# choose which command to invoke
avail_commands = [
(lib.add, ['add']),
(lib.remove, ['remove', 'rm']),
(lib.update, ['update', 'up']),
(lib.write, ['write', 'wr', 'w']),
(lib.list, ['list', 'ls']),
(help, ['help', 'h'])
]
for test_command in avail_commands:
if cmd in test_command[1]:
(test_command[0])(*args)
op.exit()
# no command matched
op.error('invalid command "' + cmd + '"')

BIN
test/rsrc/full.m4a Executable file

Binary file not shown.

BIN
test/rsrc/full.mp3 Executable file

Binary file not shown.

BIN
test/rsrc/min.m4a Executable file

Binary file not shown.

BIN
test/rsrc/min.mp3 Executable file

Binary file not shown.

BIN
test/rsrc/partial.m4a Executable file

Binary file not shown.

BIN
test/rsrc/partial.mp3 Executable file

Binary file not shown.

135
test/tagtest.py Executable file
View file

@ -0,0 +1,135 @@
#!/usr/bin/env python
import unittest, sys, os, shutil
sys.path.append('..')
import beets.tag
def MakeReadingTest(path, correct_dict, field):
class ReadingTest(unittest.TestCase):
def setUp(self):
self.f = beets.tag.MediaFile(path)
def runTest(self):
got = getattr(self.f, field)
correct = correct_dict[field]
self.assertEqual(got, correct,
field + ' incorrect (expected ' + repr(correct) + ', got ' + \
repr(got) + ') when testing ' + os.path.basename(path))
return ReadingTest
def MakeWritingTest(path, correct_dict, field, testsuffix='_test'):
class WritingTest(unittest.TestCase):
def setUp(self):
# make a copy of the file we'll work on
root, ext = os.path.splitext(path)
self.tpath = root + testsuffix + ext
shutil.copy(path, self.tpath)
# generate the new value we'll try storing
if type(correct_dict[field]) is unicode:
self.value = u'TestValue: ' + field
elif type(correct_dict[field]) is int:
self.value = correct_dict[field] + 42
elif type(correct_dict[field]) is bool:
self.value = not correct_dict[field]
else:
raise ValueError('unknown field type ' + \
str(type(correct_dict[field])))
def runTest(self):
# write new tag
a = beets.tag.MediaFile(self.tpath)
setattr(a, field, self.value)
a.save_tags()
# verify ALL tags are correct with modification
b = beets.tag.MediaFile(self.tpath)
for readfield in correct_dict.keys():
got = getattr(b, readfield)
if readfield is field:
self.assertEqual(got, self.value,
field + ' modified incorrectly (changed to ' + \
repr(self.value) + ' but read ' + repr(got) + \
') when testing ' + os.path.basename(path))
else:
correct = getattr(a, readfield)
self.assertEqual(got, correct,
readfield + ' changed when it should not have (expected'
' ' + repr(correct) + ', got ' + repr(got) + ') when '
'modifying ' + field + ' in ' + os.path.basename(path))
def tearDown(self):
os.remove(self.tpath)
return WritingTest
correct_dicts = {
'full': {
'title': u'full',
'artist': u'the artist',
'album': u'the album',
'genre': u'the genre',
'composer': u'the composer',
'grouping': u'the grouping',
'year': 2001,
'track': 2,
'maxtrack': 3,
'disc': 4,
'maxdisc': 5,
'lyrics': u'the lyrics',
'comments': u'the comments',
'bpm': 6,
'comp': True
},
'partial': {
'title': u'partial',
'artist': u'the artist',
'album': u'the album',
'genre': u'',
'composer': u'',
'grouping': u'',
'year': 0,
'track': 2,
'maxtrack': 0,
'disc': 4,
'maxdisc': 0,
'lyrics': u'',
'comments': u'',
'bpm': 0,
'comp': False
},
'min': {
'title': u'min',
'artist': u'',
'album': u'',
'genre': u'',
'composer': u'',
'grouping': u'',
'year': 0,
'track': 0,
'maxtrack': 0,
'disc': 0,
'maxdisc': 0,
'lyrics': u'',
'comments': u'',
'bpm': 0,
'comp': False
}
}
def suite():
s = unittest.TestSuite()
for kind in ('m4a', 'mp3'):
for tagset in ('full', 'partial', 'min'):
path = 'rsrc' + os.sep + tagset + '.' + kind
correct_dict = correct_dicts[tagset]
for field in correct_dict.keys():
s.addTest(MakeReadingTest(path, correct_dict, field)())
s.addTest(MakeWritingTest(path, correct_dict, field)())
return s
if __name__ == '__main__':
unittest.TextTestRunner(verbosity=2).run(suite())