Refactor get/set into StorageStyle

This commit is contained in:
Thomas Scholtes 2014-01-30 20:02:28 +01:00
parent 861b50d556
commit da13cb1825
2 changed files with 235 additions and 192 deletions

View file

@ -304,6 +304,182 @@ class StorageStyle(object):
if self.suffix and self.as_type in (str, unicode):
self.suffix = self.as_type(self.suffix)
def fetch(self, mediafile):
if mediafile.type == 'mp3':
if self.id3_desc is not None: # also match on 'desc' field
frames = mediafile.mgfile.tags.getall(self.key)
entry = None
for frame in frames:
if frame.desc.lower() == self.id3_desc.lower():
entry = getattr(frame, self.id3_frame_field)
break
if entry is None: # no desc match
return None
else:
# Get the metadata frame mediafileect.
try:
frame = mediafile.mgfile[self.key]
except KeyError:
return None
entry = getattr(frame, self.id3_frame_field)
else: # Not MP3.
try:
entry = mediafile.mgfile[self.key]
except KeyError:
return None
# Possibly index the list.
if self.list_elem:
if entry: # List must have at least one value.
# Handle Mutagen bugs when reading values (#356).
try:
return entry[0]
except:
log.error('Mutagen exception when reading field: %s' %
traceback.format_exc)
return None
else:
return None
else:
return entry
def get(self, mediafile):
out = self.fetch(mediafile)
if self.packing:
p = Packed(out, self.packing, out_type=self.pack_type)
out = p[self.pack_pos]
if self.suffix and isinstance(out, (str, unicode)):
if out.endswith(self.suffix):
out = out[:-len(self.suffix)]
if mediafile.type in MP4_TYPES and self.key.startswith('----:') and \
isinstance(out, str):
out = out.decode('utf8')
return out
def store(self, mediafile, val):
"""Store val for this descriptor's field in the tag dictionary
according to the provided StorageStyle. Store it as a
single-item list if necessary.
"""
# Wrap as a list if necessary.
if self.list_elem:
out = [val]
else:
out = val
if mediafile.type == 'mp3':
# Try to match on "desc" field.
if self.id3_desc is not None:
frames = mediafile.mgfile.tags.getall(self.key)
# try modifying in place
found = False
for frame in frames:
if frame.desc.lower() == self.id3_desc.lower():
setattr(frame, self.id3_frame_field, out)
found = True
break
# need to make a new frame?
if not found:
assert isinstance(self.id3_frame_field, str) # Keyword.
args = {
'encoding': 3,
'desc': self.id3_desc,
self.id3_frame_field: val,
}
if self.id3_lang:
args['lang'] = self.id3_lang
mediafile.mgfile.tags.add(mutagen.id3.Frames[self.key](**args))
# Try to match on "owner" field.
elif self.key.startswith('UFID:'):
owner = self.key.split(':', 1)[1]
frames = mediafile.mgfile.tags.getall(self.key)
for frame in frames:
# Replace existing frame data.
if frame.owner == owner:
setattr(frame, self.id3_frame_field, val)
else:
# New frame.
assert isinstance(self.id3_frame_field, str) # Keyword.
frame = mutagen.id3.UFID(owner=owner,
**{self.id3_frame_field: val})
mediafile.mgfile.tags.setall('UFID', [frame])
# Just replace based on key.
else:
assert isinstance(self.id3_frame_field, str) # Keyword.
frame = mutagen.id3.Frames[self.key](encoding=3,
**{self.id3_frame_field: val})
mediafile.mgfile.tags.setall(self.key, [frame])
else: # Not MP3.
mediafile.mgfile[self.key] = out
def set(self, mediafile, val):
if self.packing:
p = Packed(self.fetch(mediafile), self.packing,
out_type=self.pack_type)
p[self.pack_pos] = val
out = p.items
else: # Unicode, integer, boolean, or float scalar.
out = val
# deal with Nones according to abstract type if present
if out is None:
if self.out_type == int:
out = 0
elif self.out_type == float:
out = 0.0
elif self.out_type == bool:
out = False
elif self.out_type == unicode:
out = u''
# We trust that packed values are handled above.
# Convert to correct storage type (irrelevant for
# packed values).
if self.out_type == float and self.as_type in (str, unicode):
# Special case for float-valued data.
out = u'{0:.{1}f}'.format(out, self.float_places)
out = self.as_type(out)
elif self.as_type == unicode:
if out is None:
out = u''
else:
if self.out_type == bool:
# Store bools as 1/0 instead of True/False.
out = unicode(int(bool(out)))
elif isinstance(out, str):
out = out.decode('utf8', 'ignore')
else:
out = unicode(out)
elif self.as_type == int:
if out is None:
out = 0
else:
out = int(out)
elif self.as_type in (bool, str):
out = self.as_type(out)
# Add a suffix to string storage.
if self.as_type in (str, unicode) and self.suffix:
out += self.suffix
# MPEG-4 "freeform" (----) frames must be encoded as UTF-8
# byte strings.
if mediafile.type in MP4_TYPES and self.key.startswith('----:') and \
isinstance(out, unicode):
out = out.encode('utf8')
# Store the data.
self.store(mediafile, out)
# Dealing with packings.
@ -425,113 +601,6 @@ class MediaField(object):
'arguments mp3, mp4, asf, and etc')
self.styles = kwargs
def _fetchdata(self, obj, style):
"""Get the value associated with this descriptor's field stored
with the given StorageStyle. Unwraps from a list if necessary.
"""
# fetch the value, which may be a scalar or a list
if obj.type == 'mp3':
if style.id3_desc is not None: # also match on 'desc' field
frames = obj.mgfile.tags.getall(style.key)
entry = None
for frame in frames:
if frame.desc.lower() == style.id3_desc.lower():
entry = getattr(frame, style.id3_frame_field)
break
if entry is None: # no desc match
return None
else:
# Get the metadata frame object.
try:
frame = obj.mgfile[style.key]
except KeyError:
return None
entry = getattr(frame, style.id3_frame_field)
else: # Not MP3.
try:
entry = obj.mgfile[style.key]
except KeyError:
return None
# Possibly index the list.
if style.list_elem:
if entry: # List must have at least one value.
# Handle Mutagen bugs when reading values (#356).
try:
return entry[0]
except:
log.error('Mutagen exception when reading field: %s' %
traceback.format_exc)
return None
else:
return None
else:
return entry
def _storedata(self, obj, val, style):
"""Store val for this descriptor's field in the tag dictionary
according to the provided StorageStyle. Store it as a
single-item list if necessary.
"""
# Wrap as a list if necessary.
if style.list_elem:
out = [val]
else:
out = val
if obj.type == 'mp3':
# Try to match on "desc" field.
if style.id3_desc is not None:
frames = obj.mgfile.tags.getall(style.key)
# try modifying in place
found = False
for frame in frames:
if frame.desc.lower() == style.id3_desc.lower():
setattr(frame, style.id3_frame_field, out)
found = True
break
# need to make a new frame?
if not found:
assert isinstance(style.id3_frame_field, str) # Keyword.
args = {
'encoding': 3,
'desc': style.id3_desc,
style.id3_frame_field: val,
}
if style.id3_lang:
args['lang'] = style.id3_lang
obj.mgfile.tags.add(mutagen.id3.Frames[style.key](**args))
# Try to match on "owner" field.
elif style.key.startswith('UFID:'):
owner = style.key.split(':', 1)[1]
frames = obj.mgfile.tags.getall(style.key)
for frame in frames:
# Replace existing frame data.
if frame.owner == owner:
setattr(frame, style.id3_frame_field, val)
else:
# New frame.
assert isinstance(style.id3_frame_field, str) # Keyword.
frame = mutagen.id3.UFID(owner=owner,
**{style.id3_frame_field: val})
obj.mgfile.tags.setall('UFID', [frame])
# Just replace based on key.
else:
assert isinstance(style.id3_frame_field, str) # Keyword.
frame = mutagen.id3.Frames[style.key](encoding=3,
**{style.id3_frame_field: val})
obj.mgfile.tags.setall(style.key, [frame])
else: # Not MP3.
obj.mgfile[style.key] = out
def _styles(self, obj):
if obj.type in ('mp3', 'asf'):
styles = self.styles[obj.type]
@ -542,100 +611,23 @@ class MediaField(object):
# Make sure we always return a list of styles, even when given
# a single style for convenience.
if isinstance(styles, StorageStyle):
return [styles]
if hasattr(styles, '__iter__'):
return styles
return []
if not hasattr(styles, '__iter__'):
styles = [styles]
for style in styles:
style.out_type = self.out_type
yield style
def __get__(self, obj, owner):
"""Retrieve the value of this metadata field.
"""
for style in self._styles(obj):
# Use the first style that returns a reasonable value.
out = self._fetchdata(obj, style)
out = style.get(obj)
if out:
break
if style.packing:
p = Packed(out, style.packing, out_type=style.pack_type)
out = p[style.pack_pos]
# Remove suffix.
if style.suffix and isinstance(out, (str, unicode)):
if out.endswith(style.suffix):
out = out[:-len(style.suffix)]
# MPEG-4 freeform frames are (should be?) encoded as UTF-8.
if obj.type in MP4_TYPES and style.key.startswith('----:') and \
isinstance(out, str):
out = out.decode('utf8')
return _safe_cast(self.out_type, out)
def __set__(self, obj, val):
"""Set the value of this metadata field.
"""
for style in self._styles(obj):
if style.packing:
p = Packed(self._fetchdata(obj, style), style.packing,
out_type=style.pack_type)
p[style.pack_pos] = val
out = p.items
else: # Unicode, integer, boolean, or float scalar.
out = val
# deal with Nones according to abstract type if present
if out is None:
if self.out_type == int:
out = 0
elif self.out_type == float:
out = 0.0
elif self.out_type == bool:
out = False
elif self.out_type == unicode:
out = u''
# We trust that packed values are handled above.
# Convert to correct storage type (irrelevant for
# packed values).
if self.out_type == float and style.as_type in (str, unicode):
# Special case for float-valued data.
out = u'{0:.{1}f}'.format(out, style.float_places)
out = style.as_type(out)
elif style.as_type == unicode:
if out is None:
out = u''
else:
if self.out_type == bool:
# Store bools as 1/0 instead of True/False.
out = unicode(int(bool(out)))
elif isinstance(out, str):
out = out.decode('utf8', 'ignore')
else:
out = unicode(out)
elif style.as_type == int:
if out is None:
out = 0
else:
out = int(out)
elif style.as_type in (bool, str):
out = style.as_type(out)
# Add a suffix to string storage.
if style.as_type in (str, unicode) and style.suffix:
out += style.suffix
# MPEG-4 "freeform" (----) frames must be encoded as UTF-8
# byte strings.
if obj.type in MP4_TYPES and style.key.startswith('----:') and \
isinstance(out, unicode):
out = out.encode('utf8')
# Store the data.
self._storedata(obj, out, style)
style.set(obj, val)
class CompositeDateField(object):
"""A MediaFile field for conveniently accessing the year, month, and

View file

@ -294,6 +294,57 @@ class ID3v23Test(unittest.TestCase):
self._delete_test()
class ReadWriteTest(unittest.TestCase):
"""Test writing and reading tags
"""
extensions = ['mp3', 'm4a', 'alac.m4a', 'mpc',
'flac', 'ape', 'ogg', 'wma', 'wv']
def test_read_common(self):
for ext in self.extensions:
mediafile = full_mediafile_fixture()
self.assertEqual(mediafile.title, 'full')
self.assertEqual(mediafile.album, 'the album')
self.assertEqual(mediafile.artist, 'the artist')
self.assertEqual(mediafile.year, 2001)
self.assertEqual(mediafile.track, 2)
def test_read_write_original_year(self):
for ext in self.extensions:
mediafile = full_mediafile_fixture()
mediafile.original_year = 1999
mediafile.save()
mediafile = beets.mediafile.MediaFile(mediafile.path)
self.assertEqual(mediafile.original_year, 1999)
def test_write_common(self):
for ext in self.extensions:
mediafile = full_mediafile_fixture()
mediafile.title = 'empty'
mediafile.album = 'another album'
mediafile.artist = 'another artist'
mediafile.year = 2002
mediafile.track = 3
mediafile.save()
mediafile = beets.mediafile.MediaFile(mediafile.path)
self.assertEqual(mediafile.title, 'empty')
self.assertEqual(mediafile.album, 'another album')
self.assertEqual(mediafile.artist, 'another artist')
self.assertEqual(mediafile.year, 2002)
self.assertEqual(mediafile.track, 3)
def full_mediafile_fixture(ext='mp3'):
"""Returns a Mediafile with a lot of tags already set.
"""
src = os.path.join(_common.RSRC, 'full.{0}'.format(ext))
path = os.path.join(_common.RSRC, 'test.{0}'.format(ext))
shutil.copy(src, path)
return beets.mediafile.MediaFile(path)
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)