diff --git a/beets/__init__.py b/beets/__init__.py new file mode 100644 index 000000000..b7227d752 --- /dev/null +++ b/beets/__init__.py @@ -0,0 +1,2 @@ +#!/usr/bin/env python +from beets.library import Library \ No newline at end of file diff --git a/beets/library.py b/beets/library.py new file mode 100644 index 000000000..386dff15b --- /dev/null +++ b/beets/library.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python +import sqlite3, os, sys, operator +import beets.tag +from string import Template + +# Fields in the "items" table; all the metadata available for items in the +# library. These are used directly in SQL; they are vulnerable to injection if +# accessible to the user. +metadata_fields = [ + ('title', 'text'), + ('artist', 'text'), + ('album', 'text'), + ('genre', 'text'), + ('composer', 'text'), + ('grouping', 'text'), + ('year', 'int'), + ('track', 'int'), + ('maxtrack', 'int'), + ('disc', 'int'), + ('maxdisc', 'int'), + ('lyrics', 'text'), + ('comments', 'text'), + ('bpm', 'int'), + ('comp', 'bool') +] + +class Library(object): + def __init__(self, path='library.blb'): + self.path = path + self.conn = sqlite3.connect(self.path) + self.conn.row_factory = sqlite3.Row + self.setup() + + def setup(self): + "Set up the schema of the library file." + + # options (library data) table + setup_sql = """ + create table if not exists options ( + key text primary key, + value text + );""" + + # items (things in the library) table + setup_sql += """create table if not exists items ( + path text primary key, """ + setup_sql += ', '.join(map(' '.join, metadata_fields)) + setup_sql += ' );' + + c = self.conn.cursor() + c.executescript(setup_sql) + c.close() + self.conn.commit() + + # DATABASE UTILITY FUNCTIONS + def select(self, where='', subvars=[], columns='*'): + "Look up items in the library. Returns a cursor." + c = self.conn.cursor() + if where.strip(): # we have a where clause + where = ' where ' + where + c.execute('select ' + columns + ' from items' + where, + subvars) + return c + def selects_any(self, where, subvars): + "Returns True iff the SELECT query matches any rows." + c = self.select(where, subvars) + out = (c.fetchone() is not None) + c.close() + + # FILE/DB UTILITY FUNCTIONS + def add_file(self, path): + "Adds a new file to the library." + + # build query part for metadata fields + columns = ','.join(map(operator.itemgetter(0),metadata_fields)) + values = ','.join(['?']*len(metadata_fields)) + subvars = [] + f = beets.tag.MediaFile(path) + for field, value in metadata_fields: + subvars.append(getattr(f, field)) + + # other fields + columns += ',path' + values += ',?' + subvars.append(path) + + # issue query + c = self.conn.cursor() + query = 'insert into items (' + columns + ') values (' + values + ')' + c.execute(query, subvars) + c.close() + + def update_file(self, path): + "Updates a file already in the database with the file's metadata." + + # build query part for metadata fields + assignments = ','.join(['?=?']*len(metadata_fields)) + subvars = [] + f = beets.tag.MediaFile(path) + for field, value in metadata_fields: + subvars += [field, getattr(f, field)] + + # build the full query itself + query = 'update items set ' + assignments + ' where path=?' + subvars.append(path) + + c = self.conn.cursor() + c.execute(query, subvars) + c.close() + + # MISC. UTILITY FUNCTIONS + def mynormpath(self, path): + """Provide the canonical form of the path suitable for storing in the + database. In the future, options may modify the behavior of this + method.""" + # force absolute paths: + # os.path.normpath(os.path.abspath(os.path.expanduser(path))) + return os.path.normpath(os.path.expanduser(path)) + def log(self, msg): + """Print a log message.""" + print >>sys.stderr, msg + def pprint(self, item, form='$artist - $title'): + print Template(form).substitute(item) + + def add_path(self, path, clobber=False): + """Add a file to the library or recursively search a directory and add + all its contents.""" + if os.path.isdir(path): + # recurse into all directory contents + for ent in os.listdir(path): + self.add_path(path + os.sep + ent, clobber) + elif os.path.isfile(path): + # add _if_ it's legible (otherwise ignore but say so) + if self.selects_any('path=?', (self.mynormpath(path),)): + if not clobber: + self.log(path + ' already in database, skipping') + return + else: + self.update_file(self.mynormpath(path)) + else: + try: + self.add_file(self.mynormpath(path)) + except beets.tag.FileTypeError: + self.log(path + ' of unknown type, skipping') + elif not os.path.exists(path): + raise IOError('file not found: ' + path) + + # high-level (and command-line) interface + def add(self, *paths): + for path in paths: + self.add_path(path, clobber=False) + self.conn.commit() + def remove(self, *criteria): + raise NotImplementedError + def update(self, *criteria): + #c = self.select(criteria, [], 'path') + #for f in c: + # self.update_file(f.path) + #c.close() + raise NotImplementedError + def write(self, *criteria): + raise NotImplementedError + def list(self, *criteria): + c = self.select(' '.join(criteria), []) + for row in c: + self.pprint(row) + c.close() \ No newline at end of file diff --git a/beets/tag.py b/beets/tag.py new file mode 100644 index 000000000..624fcb7c6 --- /dev/null +++ b/beets/tag.py @@ -0,0 +1,360 @@ +#!/usr/bin/env python +"""Handles low-level interfacing for files' tags. Wraps mutagen to +automatically detect file types and provide a unified interface for the +specific tags Beets is interested in.""" + +from mutagen import mp4, mp3, id3 +import os.path + +__all__ = ['FileTypeError', 'MediaFile'] + +# Currently allowed values for type: +# mp3, mp4 +class FileTypeError(IOError): + pass + + + + + + +########################### +#### UTILITY FUNCTIONS #### +########################### +# For dealing with the sort of weirdnesses we find in metadata fields. + +def fromslashed(slashed, sep=u'/'): + """Extract a pair of items from a slashed string. If only one + value is present, it is assumed to be the left-hand value.""" + + if slashed is None: + return (None, None) + + items = slashed.split(sep) + + if len(items) == 1: + out = (items[0], None) + else: + out = (items[0], items[1]) + + # represent "nothing stored" more gracefully + if out[0] == '': out[0] = None + if out[1] == '': out[1] = None + + return out + +def toslashed(pair_or_val, sep=u'/'): + """Store a pair of items or a single item in a slashed string. If + only one value is provided (in a list/tuple or as a single value), + no slash is used.""" + if type(pair_or_val) is list or type(pair_or_val) is tuple: + if len(pair_or_val) == 0: + out = [u''] + elif len(pair_or_val) == 1: + out = [unicode(pair_or_val[0])] + else: + out = [unicode(pair_or_val[0]), unicode(pair_or_val[1])] + else: # "scalar" + out = [unicode(pair_or_val)] + return sep.join(out) + +def unpair(pair, right=False, noneval=None): + """Return the left or right value in a pair (as selected by the "right" + parameter. If the value on that side is not available, return noneval.)""" + if right: idx = 1 + else: idx = 0 + + try: + out = pair[idx] + except: + out = None + finally: + if out is None: + return noneval + else: + return out + +def normalize_pair(pair, noneval=None): + """Make sure the pair is a tuple that has exactly two entries. If we need + to fill anything in, we'll use noneval.""" + return (unpair(pair, False, noneval), + unpair(pair, True, noneval)) + + + + + +############################### +#### MediaField descriptor #### +############################### + +class MediaField(object): + """A descriptor providing access to a particular (abstract) metadata + field. The various messy parameters control the translation to concrete + metadata manipulations in the language of mutagen.""" + + # possible types used to store the relevant data + TYPE_RAW = 0 # stored as a single object (not in a list) + TYPE_LIST = 1 << 0 # stored in the first element of a list + TYPE_UNICODE = 1 << 1 # stored as a unicode object + TYPE_INTEGER = 1 << 2 # as an int + TYPE_BOOLEAN = 1 << 3 # as a bool + # RAW and LIST are mutually exclusive, as are UNICODE, INTEGER and + # BOOLEAN. Must pick either RAW or LIST, but none of the other types + # are necessary. + + # non-type aspects of data storage + STYLE_PLAIN = 0 # no filtering + STYLE_UNICODE = 1 << 0 # value is a string, stored as a string + STYLE_INTEGER = 1 << 1 # value is an integer, maybe stored as a string + STYLE_BOOLEAN = 1 << 2 # value is a boolean, probably stored as a string + STYLE_SLASHED = 1 << 3 # int stored in a string on one side of a / char + STYLE_2PLE = 1 << 4 # stored as one value in an integer 2-tuple + # The above styles are all mutually exclusive. + STYLE_LEFT = 1 << 5 # for SLASHED or 2PLE, value is in first entry + STYLE_RIGHT = 1 << 6 # likewise, in second entry + # These are mutually exclusive and relevant only with SLASHED and 2PLE. + + def __init__(self, id3key, mp4key, + # in ID3 tags, use only the frame with this "desc" field + id3desc=None, + # compositions of the TYPE_ flag above + id3type=TYPE_UNICODE|TYPE_LIST, mp4type=TYPE_UNICODE|TYPE_LIST, + # compositions of STYLE_ flags + id3style=STYLE_UNICODE, mp4style=STYLE_UNICODE + ): + + self.keys = { 'mp3': id3key, + 'mp4': mp4key } + self.types = { 'mp3': id3type, + 'mp4': mp4type } + self.styles = { 'mp3': id3style, + 'mp4': mp4style } + self.id3desc = id3desc + + def __fetchdata(self, obj): + """Get the value associated with this descriptor's key (and id3desc if + present) from the mutagen tag dict. Unwraps from a list if necessary.""" + (mykey, mytype, mystyle) = self.__params(obj) + + try: + # fetch the value, which may be a scalar or a list + if obj.type == 'mp3': + if self.id3desc is not None: # also match on 'desc' field + frames = obj.tags.tags.getall(mykey) + entry = None + for frame in frames: + if frame.desc == self.id3desc: + entry = frame.text + break + if entry is None: # no desc match + return None + else: + entry = obj.tags[mykey].text + else: + entry = obj.tags[mykey] + + # possibly index the list + if mytype & self.TYPE_LIST: + return entry[0] + else: + return entry + except KeyError: # the tag isn't present + return None + + def __storedata(self, obj, val): + """Store val for this descriptor's key in the tag dictionary. Store it + as a single-item list if necessary. Uses id3desc if present.""" + (mykey, mytype, mystyle) = self.__params(obj) + + # wrap as a list if necessary + if mytype & self.TYPE_LIST: out = [val] + else: out = val + + if obj.type == 'mp3': + if self.id3desc is not None: # match on id3desc + frames = obj.tags.tags.getall(mykey) + + # try modifying in place + found = False + for frame in frames: + if frame.desc == self.id3desc: + frame.text = out + found = True + break + + # need to make a new frame? + if not found: + frame = id3.Frames[mykey](encoding=3, desc=self.id3desc, + text=val) + obj.tags.tags.add(frame) + + else: # no match on desc; just replace based on key + frame = id3.Frames[mykey](encoding=3, text=val) + obj.tags.tags.setall(mykey, [frame]) + else: + obj.tags[mykey] = out + + def __params(self, obj): + return (self.keys[obj.type], + self.types[obj.type], + self.styles[obj.type]) + + def __get__(self, obj, owner): + """Retrieve the value of this metadata field.""" + out = None + (mykey, mytype, mystyle) = self.__params(obj) + + out = self.__fetchdata(obj) + + # deal with slashed and tuple storage + if mystyle & self.STYLE_SLASHED or mystyle & self.STYLE_2PLE: + if mystyle & self.STYLE_SLASHED: + out = fromslashed(out) + out = unpair(out, mystyle & self.STYLE_RIGHT, noneval=0) + + # return the appropriate type + if mystyle & self.STYLE_INTEGER or mystyle & self.STYLE_SLASHED \ + or mystyle & self.STYLE_2PLE: + if out is None: + return 0 + else: + try: + return int(out) + except: # in case out is not convertible directly to an int + return int(unicode(out)) + elif mystyle & self.STYLE_BOOLEAN: + if out is None: + return False + else: + return bool(int(out)) # should work for strings, bools, ints + elif mystyle & self.STYLE_UNICODE: + if out is None: + return u'' + else: + return unicode(out) + else: + return out + + def __set__(self, obj, val): + """Set the value of this metadata field.""" + (mykey, mytype, mystyle) = self.__params(obj) + + # apply style filters + if mystyle & self.STYLE_SLASHED or mystyle & self.STYLE_2PLE: + # fetch the existing value so we can preserve half of it + pair = self.__fetchdata(obj) + if mystyle & self.STYLE_SLASHED: + pair = fromslashed(pair) + pair = normalize_pair(pair, noneval=0) + + # set the appropriate side of the pair + if mystyle & self.STYLE_LEFT: + pair = (val, pair[1]) + else: + pair = (pair[0], val) + + if mystyle & self.STYLE_SLASHED: + out = toslashed(pair) + else: + out = pair + else: # plain, integer, or boolean + out = val + + # deal with Nones according to abstract type if present + if out is None: + if mystyle & self.STYLE_INTEGER: + out = 0 + elif mystyle & self.STYLE_BOOLEAN: + out = False + elif mystyle & self.STYLE_UNICODE: + out = u'' + # We trust that SLASHED and 2PLE are handled above. + + # convert to correct storage type + if mytype & self.TYPE_UNICODE: + if out is None: + out = u'' + else: + if mystyle & self.STYLE_BOOLEAN: + # store bools as 1,0 instead of True,False + out = unicode(int(out)) + else: + out = unicode(out) + elif mytype & self.TYPE_INTEGER: + if out is None: + out = 0 + else: + out = int(out) + elif mytype & self.TYPE_BOOLEAN: + out = bool(out) + + # store the data + self.__storedata(obj, out) + + + + + +######################### +#### MediaFile class #### +######################### + +class MediaFile(object): + """Represents a multimedia file on disk and provides access to its + metadata.""" + + def __init__(self, path): + root, ext = os.path.splitext(path) + if ext == '.mp3': + self.type = 'mp3' + self.tags = mp3.Open(path) + elif ext == '.m4a' or ext == '.mp4' or ext == '.m4b' or ext == '.m4p': + self.type = 'mp4' + self.tags = mp4.Open(path) + else: + raise FileTypeError('unsupported file extension: ' + ext) + + def save_tags(self): + self.tags.save() + + ########################### + #### FIELD DEFINITIONS #### + ########################### + + title = MediaField('TIT2', "\xa9nam") + artist = MediaField('TPE1', "\xa9ART") + album = MediaField('TALB', "\xa9alb") + genre = MediaField('TCON', "\xa9gen") + composer = MediaField('TCOM', "\xa9wrt") + grouping = MediaField('TIT1', "\xa9grp") + year = MediaField('TDRC', "\xa9day", + id3style=MediaField.STYLE_INTEGER, + mp4style=MediaField.STYLE_INTEGER) + track = MediaField('TRCK', 'trkn', + id3style=MediaField.STYLE_SLASHED | MediaField.STYLE_LEFT, + mp4type=MediaField.TYPE_LIST, + mp4style=MediaField.STYLE_2PLE | MediaField.STYLE_LEFT) + maxtrack = MediaField('TRCK', 'trkn', + id3style=MediaField.STYLE_SLASHED | MediaField.STYLE_RIGHT, + mp4type=MediaField.TYPE_LIST, + mp4style=MediaField.STYLE_2PLE | MediaField.STYLE_RIGHT) + disc = MediaField('TPOS', 'disk', + id3style=MediaField.STYLE_SLASHED | MediaField.STYLE_LEFT, + mp4type=MediaField.TYPE_LIST, + mp4style=MediaField.STYLE_2PLE | MediaField.STYLE_LEFT) + maxdisc = MediaField('TPOS', 'disk', + id3style=MediaField.STYLE_SLASHED | MediaField.STYLE_RIGHT, + mp4type=MediaField.TYPE_LIST, + mp4style=MediaField.STYLE_2PLE | MediaField.STYLE_RIGHT) + lyrics = MediaField(u"USLT", "\xa9lyr", id3desc=u'', + id3type=MediaField.TYPE_UNICODE) + comments = MediaField(u"COMM", "\xa9cmt", id3desc=u'') + bpm = MediaField('TBPM', 'tmpo', + id3style=MediaField.STYLE_INTEGER, + mp4type=MediaField.TYPE_LIST | MediaField.TYPE_INTEGER, + mp4style=MediaField.STYLE_INTEGER) + comp = MediaField('TCMP', 'cpil', + id3style=MediaField.STYLE_BOOLEAN, + mp4type=MediaField.TYPE_BOOLEAN, + mp4style=MediaField.STYLE_BOOLEAN) \ No newline at end of file diff --git a/bts.py b/bts.py new file mode 100755 index 000000000..9a581cfa2 --- /dev/null +++ b/bts.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +from optparse import OptionParser +from beets import Library + +if __name__ == "__main__": + # parse options + usage = """usage: %prog [options] command +command is one of: add, remove, update, write, list, help""" + op = OptionParser(usage=usage) + op.add_option('-l', '--library', dest='libpath', metavar='PATH', + default='library.blb', + help='work on the specified library file') + op.remove_option('--help') + opts, args = op.parse_args() + + # make sure we have a command + if len(args) < 1: + op.error('no command specified') + cmd = args.pop(0) + + lib = Library(opts.libpath) + + # make a "help" command + def help(*args): op.print_help() + + # choose which command to invoke + avail_commands = [ + (lib.add, ['add']), + (lib.remove, ['remove', 'rm']), + (lib.update, ['update', 'up']), + (lib.write, ['write', 'wr', 'w']), + (lib.list, ['list', 'ls']), + (help, ['help', 'h']) + ] + for test_command in avail_commands: + if cmd in test_command[1]: + (test_command[0])(*args) + op.exit() + + # no command matched + op.error('invalid command "' + cmd + '"') + \ No newline at end of file diff --git a/test/rsrc/full.m4a b/test/rsrc/full.m4a new file mode 100755 index 000000000..e2a952f12 Binary files /dev/null and b/test/rsrc/full.m4a differ diff --git a/test/rsrc/full.mp3 b/test/rsrc/full.mp3 new file mode 100755 index 000000000..06313f20e Binary files /dev/null and b/test/rsrc/full.mp3 differ diff --git a/test/rsrc/min.m4a b/test/rsrc/min.m4a new file mode 100755 index 000000000..6580e7b66 Binary files /dev/null and b/test/rsrc/min.m4a differ diff --git a/test/rsrc/min.mp3 b/test/rsrc/min.mp3 new file mode 100755 index 000000000..7a8006b36 Binary files /dev/null and b/test/rsrc/min.mp3 differ diff --git a/test/rsrc/partial.m4a b/test/rsrc/partial.m4a new file mode 100755 index 000000000..3bf491e16 Binary files /dev/null and b/test/rsrc/partial.m4a differ diff --git a/test/rsrc/partial.mp3 b/test/rsrc/partial.mp3 new file mode 100755 index 000000000..bed36a5b4 Binary files /dev/null and b/test/rsrc/partial.mp3 differ diff --git a/test/tagtest.py b/test/tagtest.py new file mode 100755 index 000000000..bad5e7c62 --- /dev/null +++ b/test/tagtest.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python +import unittest, sys, os, shutil +sys.path.append('..') +import beets.tag + +def MakeReadingTest(path, correct_dict, field): + class ReadingTest(unittest.TestCase): + def setUp(self): + self.f = beets.tag.MediaFile(path) + def runTest(self): + got = getattr(self.f, field) + correct = correct_dict[field] + self.assertEqual(got, correct, + field + ' incorrect (expected ' + repr(correct) + ', got ' + \ + repr(got) + ') when testing ' + os.path.basename(path)) + return ReadingTest + +def MakeWritingTest(path, correct_dict, field, testsuffix='_test'): + + class WritingTest(unittest.TestCase): + def setUp(self): + # make a copy of the file we'll work on + root, ext = os.path.splitext(path) + self.tpath = root + testsuffix + ext + shutil.copy(path, self.tpath) + + # generate the new value we'll try storing + if type(correct_dict[field]) is unicode: + self.value = u'TestValue: ' + field + elif type(correct_dict[field]) is int: + self.value = correct_dict[field] + 42 + elif type(correct_dict[field]) is bool: + self.value = not correct_dict[field] + else: + raise ValueError('unknown field type ' + \ + str(type(correct_dict[field]))) + + def runTest(self): + # write new tag + a = beets.tag.MediaFile(self.tpath) + setattr(a, field, self.value) + a.save_tags() + + # verify ALL tags are correct with modification + b = beets.tag.MediaFile(self.tpath) + for readfield in correct_dict.keys(): + got = getattr(b, readfield) + if readfield is field: + self.assertEqual(got, self.value, + field + ' modified incorrectly (changed to ' + \ + repr(self.value) + ' but read ' + repr(got) + \ + ') when testing ' + os.path.basename(path)) + else: + correct = getattr(a, readfield) + self.assertEqual(got, correct, + readfield + ' changed when it should not have (expected' + ' ' + repr(correct) + ', got ' + repr(got) + ') when ' + 'modifying ' + field + ' in ' + os.path.basename(path)) + + def tearDown(self): + os.remove(self.tpath) + + return WritingTest + +correct_dicts = { + + 'full': { + 'title': u'full', + 'artist': u'the artist', + 'album': u'the album', + 'genre': u'the genre', + 'composer': u'the composer', + 'grouping': u'the grouping', + 'year': 2001, + 'track': 2, + 'maxtrack': 3, + 'disc': 4, + 'maxdisc': 5, + 'lyrics': u'the lyrics', + 'comments': u'the comments', + 'bpm': 6, + 'comp': True + }, + + 'partial': { + 'title': u'partial', + 'artist': u'the artist', + 'album': u'the album', + 'genre': u'', + 'composer': u'', + 'grouping': u'', + 'year': 0, + 'track': 2, + 'maxtrack': 0, + 'disc': 4, + 'maxdisc': 0, + 'lyrics': u'', + 'comments': u'', + 'bpm': 0, + 'comp': False + }, + + 'min': { + 'title': u'min', + 'artist': u'', + 'album': u'', + 'genre': u'', + 'composer': u'', + 'grouping': u'', + 'year': 0, + 'track': 0, + 'maxtrack': 0, + 'disc': 0, + 'maxdisc': 0, + 'lyrics': u'', + 'comments': u'', + 'bpm': 0, + 'comp': False + } + +} + +def suite(): + s = unittest.TestSuite() + for kind in ('m4a', 'mp3'): + for tagset in ('full', 'partial', 'min'): + path = 'rsrc' + os.sep + tagset + '.' + kind + correct_dict = correct_dicts[tagset] + for field in correct_dict.keys(): + s.addTest(MakeReadingTest(path, correct_dict, field)()) + s.addTest(MakeWritingTest(path, correct_dict, field)()) + return s + +if __name__ == '__main__': + unittest.TextTestRunner(verbosity=2).run(suite()) \ No newline at end of file