embed python-musicbrainz-ngs; beets is now on /ws/2

This commit is contained in:
Adrian Sampson 2011-10-23 18:20:01 -07:00
parent 604a7d5aa0
commit adbfd06682
6 changed files with 1396 additions and 375 deletions

View file

@ -13,27 +13,14 @@
# included in all copies or substantial portions of the Software.
"""Searches for albums in the MusicBrainz database.
This is a thin layer over the official `python-musicbrainz2` module. It
abstracts away that module's object model, the server's Lucene query
syntax, and other uninteresting parts of using musicbrainz2. The
principal interface is the function `match_album`.
"""
from __future__ import with_statement # for Python 2.5
import re
import time
import logging
import musicbrainz2.webservice as mbws
import httplib
from musicbrainz2.model import Release
from threading import Lock
from musicbrainz2.model import VARIOUS_ARTISTS_ID
from . import musicbrainz3
import beets.autotag.hooks
SEARCH_LIMIT = 5
VARIOUS_ARTISTS_ID = VARIOUS_ARTISTS_ID.rsplit('/', 1)[1]
VARIOUS_ARTISTS_ID = '89ad4ac3-39f7-470e-963a-56509c546377'
class ServerBusyError(Exception): pass
@ -46,236 +33,66 @@ SPECIAL_CASE_ARTISTS = {
'!!!': 'f26c72d3-e52c-467b-b651-679c73d8e1a7',
}
RELEASE_TYPES = [
Release.TYPE_ALBUM,
Release.TYPE_SINGLE,
Release.TYPE_EP,
Release.TYPE_COMPILATION,
Release.TYPE_SOUNDTRACK,
Release.TYPE_SPOKENWORD,
Release.TYPE_INTERVIEW,
Release.TYPE_AUDIOBOOK,
Release.TYPE_LIVE,
Release.TYPE_REMIX,
Release.TYPE_OTHER
]
RELEASE_INCLUDES = ['artists', 'media', 'recordings', 'release-groups',
'labels']
TRACK_INCLUDES = ['artists']
RELEASE_INCLUDES = mbws.ReleaseIncludes(artist=True, tracks=True,
releaseEvents=True, labels=True,
releaseGroup=True)
TRACK_INCLUDES = mbws.TrackIncludes(artist=True)
# MusicBrainz requires that a client does not query the server more
# than once a second. This function enforces that limit using a
# module-global variable to keep track of the last time a query was
# sent.
MAX_QUERY_RETRY = 8
QUERY_WAIT_TIME = 1.0
last_query_time = 0.0
mb_lock = Lock()
def _query_wrap(fun, *args, **kwargs):
"""Wait until at least `QUERY_WAIT_TIME` seconds have passed since
the last invocation of this function. Then call
fun(*args, **kwargs). If it fails due to a "server busy" message,
then try again. Tries up to `MAX_QUERY_RETRY` times before
giving up.
"""
with mb_lock:
global last_query_time
for i in range(MAX_QUERY_RETRY):
since_last_query = time.time() - last_query_time
if since_last_query < QUERY_WAIT_TIME:
time.sleep(QUERY_WAIT_TIME - since_last_query)
last_query_time = time.time()
try:
# Try the function.
res = fun(*args, **kwargs)
except mbws.ConnectionError:
# Typically a timeout.
pass
except mbws.ResponseError, exc:
# Malformed response from server.
log.error('Bad response from MusicBrainz: ' + str(exc))
raise BadResponseError()
except httplib.BadStatusLine:
log.warn('Bad HTTP status line from MusicBrainz')
except mbws.WebServiceError, e:
# Server busy. Retry.
message = str(e.reason)
for errnum in (503, 504):
if 'Error %i' % errnum in message:
break
else:
# This is not the error we're looking for.
raise
else:
# Success. Return the result.
return res
# Gave up.
raise ServerBusyError()
# FIXME exponential backoff?
def get_releases(**params):
"""Given a list of parameters to ReleaseFilter, executes the
query and yields AlbumInfo objects.
"""
# Replace special cases.
if 'artistName' in params:
artist = params['artistName']
if artist in SPECIAL_CASE_ARTISTS:
del params['artistName']
params['artistId'] = SPECIAL_CASE_ARTISTS[artist]
# Issue query.
filt = mbws.ReleaseFilter(**params)
try:
results = _query_wrap(mbws.Query().getReleases, filter=filt)
except BadResponseError:
results = ()
# Construct results.
for result in results:
release = result.release
tracks, _ = release_info(release.id)
yield album_info(release, tracks)
def release_info(release_id):
"""Given a MusicBrainz release ID, fetch a list of tracks on the
release and the release group ID. If the release is not found,
returns None.
"""
try:
release = _query_wrap(mbws.Query().getReleaseById, release_id,
RELEASE_INCLUDES)
except BadResponseError:
release = None
if release:
return release.getTracks(), release.getReleaseGroup().getId()
else:
return None
def _lucene_escape(text):
"""Escapes a string so it may be used verbatim in a Lucene query
string.
"""
# Regex stolen from MusicBrainz Picard.
out = re.sub(r'([+\-&|!(){}\[\]\^"~*?:\\])', r'\\\1', text)
return out.replace('\x00', '')
def _lucene_query(criteria):
"""Given a dictionary containing search criteria, produce a string
that may be used as a MusicBrainz search query.
"""
query_parts = []
for name, value in criteria.items():
value = _lucene_escape(value).strip().lower()
if value:
query_parts.append(u'%s:(%s)' % (name, value))
return u' '.join(query_parts)
def find_releases(criteria, limit=SEARCH_LIMIT):
"""Get a list of AlbumInfo objects from the MusicBrainz database
that match `criteria`. The latter is a dictionary whose keys are
MusicBrainz field names and whose values are search terms
for those fields.
The field names are from MusicBrainz's Lucene query syntax, which
is detailed here:
http://wiki.musicbrainz.org/Text_Search_Syntax
"""
# Replace special cases.
if 'artist' in criteria:
artist = criteria['artist']
if artist in SPECIAL_CASE_ARTISTS:
del criteria['artist']
criteria['arid'] = SPECIAL_CASE_ARTISTS[artist]
# Build the filter and send the query.
if any(criteria.itervalues()):
query = _lucene_query(criteria)
log.debug('album query: %s' % query)
return get_releases(limit=limit, query=query)
def find_tracks(criteria, limit=SEARCH_LIMIT):
"""Get a sequence of TrackInfo objects from MusicBrainz that match
`criteria`, a search term dictionary similar to the one passed to
`find_releases`.
"""
if any(criteria.itervalues()):
query = _lucene_query(criteria)
log.debug('track query: %s' % query)
filt = mbws.TrackFilter(limit=limit, query=query)
try:
results = _query_wrap(mbws.Query().getTracks, filter=filt)
except BadResponseError:
results = ()
for result in results:
track = result.track
yield track_info(track)
def track_info(track):
"""Translates a MusicBrainz ``Track`` object into a beets
def track_info(recording):
"""Translates a MusicBrainz recording result dictionary into a beets
``TrackInfo`` object.
"""
info = beets.autotag.hooks.TrackInfo(track.title,
track.id.rsplit('/', 1)[1])
if track.artist is not None:
# Track artists will only be present for releases with
# multiple artists.
info.artist = track.artist.name
info.artist_id = track.artist.id.rsplit('/', 1)[1]
if track.duration is not None:
# Duration not always present.
info.length = track.duration/(1000.0)
info = beets.autotag.hooks.TrackInfo(recording['title'],
recording['id'])
if 'artist-credit' in recording: # XXX: when is this not included?
artist = recording['artist-credit'][0]['artist']
info.artist = artist['name']
info.artist_id = artist['id']
if recording.get('length'):
info.length = int(recording['length'])/(1000.0)
return info
def album_info(release, tracks):
"""Takes a MusicBrainz ``Release`` object and returns a beets
def album_info(release):
"""Takes a MusicBrainz release result dictionary and returns a beets
AlbumInfo object containing the interesting data about that release.
``tracks`` is a list of ``Track`` objects that make up the album.
"""
# Basic info.
artist = release['artist-credit'][0]['artist']
tracks = []
for medium in release['medium-list']:
tracks.extend(i['recording'] for i in medium['track-list'])
info = beets.autotag.hooks.AlbumInfo(
release.title,
release.id.rsplit('/', 1)[1],
release.artist.name,
release.artist.id.rsplit('/', 1)[1],
release['title'],
release['id'],
artist['name'],
artist['id'],
[track_info(track) for track in tracks],
release.asin
)
info.va = info.artist_id == VARIOUS_ARTISTS_ID
if 'asin' in release:
info.asin = release['asin']
# Release type not always populated.
for releasetype in release.types:
if releasetype in RELEASE_TYPES:
info.albumtype = releasetype.split('#')[1].lower()
break
reltype = release['release-group']['type']
if reltype:
info.albumtype = reltype.lower()
# Release date and label.
try:
event = release.getEarliestReleaseEvent()
except:
# The python-musicbrainz2 module has a bug that will raise an
# exception when there is no release date to be found. In this
# case, we just skip adding a release date to the result.
pass
else:
if event:
# Release date.
date_str = event.getDate()
if date_str:
date_parts = date_str.split('-')
for key in ('year', 'month', 'day'):
if date_parts:
setattr(info, key, int(date_parts.pop(0)))
# Release date.
if 'date' in release: # XXX: when is this not included?
date_str = release['date']
if date_str:
date_parts = date_str.split('-')
for key in ('year', 'month', 'day'):
if date_parts:
setattr(info, key, int(date_parts.pop(0)))
# Label name.
label = event.getLabel()
if label:
name = label.getName()
if name and name != '[no label]':
info.label = name
# Label name.
if release.get('label-info-list'):
label = release['label-info-list'][0]['label']['name']
if label != '[no label]':
info.label = label
return info
@ -296,42 +113,39 @@ def match_album(artist, album, tracks=None, limit=SEARCH_LIMIT):
if tracks is not None:
criteria['tracks'] = str(tracks)
# Search for the release.
return find_releases(criteria, limit)
res = musicbrainz3.release_search(limit=limit, **criteria)
for release in res['release-list']:
# The search result is missing some data (namely, the tracks),
# so we just use the ID and fetch the rest of the information.
yield album_for_id(release['id'])
def match_track(artist, title):
def match_track(artist, title, limit=SEARCH_LIMIT):
"""Searches for a single track and returns an iterable of TrackInfo
objects.
"""
return find_tracks({
'artist': artist,
'track': title,
})
res = musicbrainz3.recording_search(artist=artist, recording=title,
limit=limit)
for recording in res['recording-list']:
yield track_info(recording)
def album_for_id(albumid):
"""Fetches an album by its MusicBrainz ID and returns an AlbumInfo
object or None if the album is not found.
"""
query = mbws.Query()
try:
album = _query_wrap(query.getReleaseById, albumid, RELEASE_INCLUDES)
except BadResponseError:
res = musicbrainz3.get_release_by_id(albumid, RELEASE_INCLUDES)
except musicbrainz3.ResponseError:
log.debug('Album ID match failed.')
return None
except (mbws.ResourceNotFoundError, mbws.RequestError), exc:
log.debug('Album ID match failed: ' + str(exc))
return None
return album_info(album, album.tracks)
return album_info(res['release'])
def track_for_id(trackid):
"""Fetches a track by its MusicBrainz ID. Returns a TrackInfo object
or None if no track is found.
"""
query = mbws.Query()
try:
track = _query_wrap(query.getTrackById, trackid, TRACK_INCLUDES)
except BadResponseError:
res = musicbrainz3.get_recording_by_id(trackid, TRACK_INCLUDES)
except musicbrainz3.ResponseError:
log.debug('Track ID match failed.')
return None
except (mbws.ResourceNotFoundError, mbws.RequestError), exc:
log.debug('Track ID match failed: ' + str(exc))
return None
return track_info(track)
return track_info(res['recording'])

View file

@ -0,0 +1,744 @@
# This is a copy of changeset e60b5af77 from the python-musicbrainz-ngs
# project:
# https://github.com/alastair/python-musicbrainz-ngs/
# MIT license; by Alastair Porter and Adrian Sampson
import urlparse
import urllib2
import urllib
import re
import threading
import time
import logging
import httplib
import xml.etree.ElementTree as etree
from . import mbxml
_useragent = "pythonmusicbrainzngs-0.1"
_log = logging.getLogger("python-musicbrainz-ngs")
# Constants for validation.
VALID_INCLUDES = {
'artist': [
"recordings", "releases", "release-groups", "works", # Subqueries
"various-artists", "discids", "media",
"aliases", "tags", "user-tags", "ratings", "user-ratings", # misc
"artist-rels", "label-rels", "recording-rels", "release-rels",
"release-group-rels", "url-rels", "work-rels"
],
'label': [
"releases", # Subqueries
"discids", "media",
"aliases", "tags", "user-tags", "ratings", "user-ratings", # misc
"artist-rels", "label-rels", "recording-rels", "release-rels",
"release-group-rels", "url-rels", "work-rels"
],
'recording': [
"artists", "releases", # Subqueries
"discids", "media", "artist-credits",
"tags", "user-tags", "ratings", "user-ratings", # misc
"artist-rels", "label-rels", "recording-rels", "release-rels",
"release-group-rels", "url-rels", "work-rels"
],
'release': [
"artists", "labels", "recordings", "release-groups", "media",
"artist-credits", "discids", "puids", "echoprints", "isrcs",
"artist-rels", "label-rels", "recording-rels", "release-rels",
"release-group-rels", "url-rels", "work-rels", "recording-level-rels",
"work-level-rels"
],
'release-group': [
"artists", "releases", "discids", "media",
"artist-credits", "tags", "user-tags", "ratings", "user-ratings", # misc
"artist-rels", "label-rels", "recording-rels", "release-rels",
"release-group-rels", "url-rels", "work-rels"
],
'work': [
"artists", # Subqueries
"aliases", "tags", "user-tags", "ratings", "user-ratings", # misc
"artist-rels", "label-rels", "recording-rels", "release-rels",
"release-group-rels", "url-rels", "work-rels"
],
'discid': [
"artists", "labels", "recordings", "release-groups", "puids",
"echoprints", "isrcs"
],
'echoprint': ["artists", "releases"],
'puid': ["artists", "releases", "puids", "echoprints", "isrcs"],
'isrc': ["artists", "releases", "puids", "echoprints", "isrcs"],
'iswc': ["artists"],
}
VALID_RELEASE_TYPES = [
"nat", "album", "single", "ep", "compilation", "soundtrack", "spokenword",
"interview", "audiobook", "live", "remix", "other"
]
VALID_RELEASE_STATUSES = ["official", "promotion", "bootleg", "pseudo-release"]
VALID_SEARCH_FIELDS = {
'artist': [
'arid', 'artist', 'sortname', 'type', 'begin', 'end', 'comment',
'alias', 'country', 'gender', 'tag'
],
'release-group': [
'rgid', 'releasegroup', 'reid', 'release', 'arid', 'artist',
'artistname', 'creditname', 'type', 'tag'
],
'release': [
'reid', 'release', 'arid', 'artist', 'artistname', 'creditname',
'type', 'status', 'tracks', 'tracksmedium', 'discids',
'discidsmedium', 'mediums', 'date', 'asin', 'lang', 'script',
'country', 'date', 'label', 'catno', 'barcode', 'puid'
],
'recording': [
'rid', 'recording', 'isrc', 'arid', 'artist', 'artistname',
'creditname', 'reid', 'release', 'type', 'status', 'tracks',
'tracksrelease', 'dur', 'qdur', 'tnum', 'position', 'tag'
],
'label': [
'laid', 'label', 'sortname', 'type', 'code', 'country', 'begin',
'end', 'comment', 'alias', 'tag'
],
'work': [
'wid', 'work', 'iswc', 'type', 'arid', 'artist', 'alias', 'tag'
],
}
# Exceptions.
class MusicBrainzError(Exception):
"""Base class for all exceptions related to MusicBrainz."""
pass
class UsageError(MusicBrainzError):
"""Error related to misuse of the module API."""
pass
class InvalidSearchFieldError(UsageError):
pass
class InvalidIncludeError(UsageError):
def __init__(self, msg='Invalid Includes', reason=None):
super(InvalidIncludeError, self).__init__(self)
self.msg = msg
self.reason = reason
def __str__(self):
return self.msg
class InvalidFilterError(UsageError):
def __init__(self, msg='Invalid Includes', reason=None):
super(InvalidFilterError, self).__init__(self)
self.msg = msg
self.reason = reason
def __str__(self):
return self.msg
class WebServiceError(MusicBrainzError):
"""Error related to MusicBrainz API requests."""
def __init__(self, message=None, cause=None):
"""Pass ``cause`` if this exception was caused by another
exception.
"""
self.message = message
self.cause = cause
def __str__(self):
if self.message:
msg = "%s, " % self.message
else:
msg = ""
msg += "caused by: %s" % str(self.cause)
return msg
class NetworkError(WebServiceError):
"""Problem communicating with the MB server."""
pass
class ResponseError(WebServiceError):
"""Bad response sent by the MB server."""
pass
# Helpers for validating and formatting allowed sets.
def _check_includes_impl(includes, valid_includes):
for i in includes:
if i not in valid_includes:
raise InvalidIncludeError("Bad includes", "%s is not a valid include" % i)
def _check_includes(entity, inc):
_check_includes_impl(inc, VALID_INCLUDES[entity])
def _check_filter(values, valid):
for v in values:
if v not in valid:
raise InvalidFilterError(v)
def _check_filter_and_make_params(includes, release_status=[], release_type=[]):
"""Check that the status or type values are valid. Then, check that
the filters can be used with the given includes. Return a params
dict that can be passed to _do_mb_query.
"""
if isinstance(release_status, basestring):
release_status = [release_status]
if isinstance(release_type, basestring):
release_type = [release_type]
_check_filter(release_status, VALID_RELEASE_STATUSES)
_check_filter(release_type, VALID_RELEASE_TYPES)
if release_status and "releases" not in includes:
raise InvalidFilterError("Can't have a status with no release include")
if release_type and ("release-groups" not in includes and
"releases" not in includes):
raise InvalidFilterError("Can't have a release type with no "
"release-group include")
# Build parameters.
params = {}
if len(release_status):
params["status"] = "|".join(release_status)
if len(release_type):
params["type"] = "|".join(release_type)
return params
# Global authentication and endpoint details.
user = password = ""
hostname = "musicbrainz.org"
_client = ""
def auth(u, p):
"""Set the username and password to be used in subsequent queries to
the MusicBrainz XML API that require authentication.
"""
global user, password
user = u
password = p
def set_client(c):
""" Set the client to be used in requests. This must be set before any
data submissions are made.
"""
global _client
_client = c
# Rate limiting.
limit_interval = 1.0
limit_requests = 1
def set_rate_limit(new_interval=1.0, new_requests=1):
"""Sets the rate limiting behavior of the module. Must be invoked
before the first Web service call. Specify the number of requests
(`new_requests`) that may be made per given interval
(`new_interval`).
"""
global limit_interval
global limit_requests
limit_interval = new_interval
limit_requests = new_requests
class _rate_limit(object):
"""A decorator that limits the rate at which the function may be
called. The rate is controlled by the `limit_interval` and
`limit_requests` global variables. The limiting is thread-safe;
only one thread may be in the function at a time (acts like a
monitor in this sense). The globals must be set before the first
call to the limited function.
"""
def __init__(self, fun):
self.fun = fun
self.last_call = 0.0
self.lock = threading.Lock()
self.remaining_requests = None # Set on first invocation.
def _update_remaining(self):
"""Update remaining requests based on the elapsed time since
they were last calculated.
"""
# On first invocation, we have the maximum number of requests
# available.
if self.remaining_requests is None:
self.remaining_requests = float(limit_requests)
else:
since_last_call = time.time() - self.last_call
self.remaining_requests += since_last_call * \
(limit_requests / limit_interval)
self.remaining_requests = min(self.remaining_requests,
float(limit_requests))
self.last_call = time.time()
def __call__(self, *args, **kwargs):
with self.lock:
self._update_remaining()
# Delay if necessary.
while self.remaining_requests < 0.999:
time.sleep((1.0 - self.remaining_requests) *
(limit_requests / limit_interval))
self._update_remaining()
# Call the original function, "paying" for this call.
self.remaining_requests -= 1.0
return self.fun(*args, **kwargs)
# Generic support for making HTTP requests.
# From pymb2
class _RedirectPasswordMgr(urllib2.HTTPPasswordMgr):
def __init__(self):
self._realms = { }
def find_user_password(self, realm, uri):
# ignoring the uri parameter intentionally
try:
return self._realms[realm]
except KeyError:
return (None, None)
def add_password(self, realm, uri, username, password):
# ignoring the uri parameter intentionally
self._realms[realm] = (username, password)
class _DigestAuthHandler(urllib2.HTTPDigestAuthHandler):
def get_authorization (self, req, chal):
qop = chal.get ('qop', None)
if qop and ',' in qop and 'auth' in qop.split (','):
chal['qop'] = 'auth'
return urllib2.HTTPDigestAuthHandler.get_authorization (self, req, chal)
class _MusicbrainzHttpRequest(urllib2.Request):
""" A custom request handler that allows DELETE and PUT"""
def __init__(self, method, url, data=None):
urllib2.Request.__init__(self, url, data)
allowed_m = ["GET", "POST", "DELETE", "PUT"]
if method not in allowed_m:
raise ValueError("invalid method: %s" % method)
self.method = method
def get_method(self):
return self.method
# Core (internal) functions for calling the MB API.
def _safe_open(opener, req, body=None, max_retries=8, retry_delay_delta=2.0):
"""Open an HTTP request with a given URL opener and (optionally) a
request body. Transient errors lead to retries. Permanent errors
and repeated errors are translated into a small set of handleable
exceptions. Returns a file-like object.
"""
last_exc = None
for retry_num in range(max_retries):
if retry_num: # Not the first try: delay an increasing amount.
_log.debug("retrying after delay (#%i)" % retry_num)
time.sleep(retry_num * retry_delay_delta)
try:
if body:
f = opener.open(req, body)
else:
f = opener.open(req)
except urllib2.HTTPError, exc:
if exc.code in (400, 404):
# Bad request, not found, etc.
raise ResponseError(cause=exc)
elif exc.code in (503, 502, 500):
# Rate limiting, internal overloading...
_log.debug("HTTP error %i" % exc.code)
else:
# Other, unknown error. Should handle more cases, but
# retrying for now.
_log.debug("unknown HTTP error %i" % exc.code)
last_exc = exc
except httplib.BadStatusLine, exc:
_log.debug("bad status line")
last_exc = exc
except httplib.HTTPException, exc:
_log.debug("miscellaneous HTTP exception: %s" % str(exc))
last_exc = exc
except urllib2.URLError, exc:
raise NetworkError(cause=exc)
except IOError, exc:
raise NetworkError(cause=exc)
else:
# No exception! Yay!
return f
# Out of retries!
raise NetworkError("retried %i times" % max_retries, last_exc)
@_rate_limit
def _mb_request(path, method='GET', auth_required=False, client_required=False,
args=None, data=None, body=None):
"""Makes a request for the specified `path` (endpoint) on /ws/2 on
the globally-specified hostname. Parses the responses and returns
the resulting object. `auth_required` and `client_required` control
whether exceptions should be raised if the client and
username/password are left unspecified, respectively.
"""
args = dict(args) or {}
# Add client if required.
if client_required and _client == "":
raise UsageError("set a client name with "
"musicbrainz.set_client(\"client-version\")")
elif client_required:
args["client"] = _client
# Construct the full URL for the request, including hostname and
# query string.
url = urlparse.urlunparse((
'http',
hostname,
'/ws/2/%s' % path,
'',
urllib.urlencode(args),
''
))
_log.debug("%s request for %s" % (method, url))
# Set up HTTP request handler and URL opener.
httpHandler = urllib2.HTTPHandler(debuglevel=0)
handlers = [httpHandler]
opener = urllib2.build_opener(*handlers)
# Add credentials if required.
if auth_required:
if not user:
raise UsageError("authorization required; "
"use musicbrainz.auth(u, p) first")
passwordMgr = _RedirectPasswordMgr()
authHandler = _DigestAuthHandler(passwordMgr)
authHandler.add_password("musicbrainz.org", (), user, password)
handlers.append(authHandler)
# Make request.
req = _MusicbrainzHttpRequest(method, url, data)
req.add_header('User-Agent', _useragent)
if body:
req.add_header('Content-Type', 'application/xml; charset=UTF-8')
f = _safe_open(opener, req, body)
# Parse the response.
try:
return mbxml.parse_message(f)
except etree.ParseError, exc:
raise ResponseError(cause=exc)
except UnicodeError, exc:
raise ResponseError(cause=exc)
def _is_auth_required(entity, includes):
""" Some calls require authentication. This returns
True if a call does, False otherwise
"""
if "user-tags" in includes or "user-ratings" in includes:
return True
elif entity.startswith("collection"):
return True
else:
return False
def _do_mb_query(entity, id, includes=[], params={}):
"""Make a single GET call to the MusicBrainz XML API. `entity` is a
string indicated the type of object to be retrieved. The id may be
empty, in which case the query is a search. `includes` is a list
of strings that must be valid includes for the entity type. `params`
is a dictionary of additional parameters for the API call. The
response is parsed and returned.
"""
# Build arguments.
_check_includes(entity, includes)
auth_required = _is_auth_required(entity, includes)
args = dict(params)
if len(includes) > 0:
inc = " ".join(includes)
args["inc"] = inc
# Build the endpoint components.
path = '%s/%s' % (entity, id)
return _mb_request(path, 'GET', auth_required, args=args)
def _do_mb_search(entity, query='', fields={}, limit=None, offset=None):
"""Perform a full-text search on the MusicBrainz search server.
`query` is a free-form query string and `fields` is a dictionary
of key/value query parameters. They keys in `fields` must be valid
for the given entity type.
"""
# Encode the query terms as a Lucene query string.
query_parts = [query.replace('\x00', '').strip()]
for key, value in fields.iteritems():
# Ensure this is a valid search field.
if key not in VALID_SEARCH_FIELDS[entity]:
raise InvalidSearchFieldError(
'%s is not a valid search field for %s' % (key, entity)
)
# Escape Lucene's special characters.
value = re.sub(r'([+\-&|!(){}\[\]\^"~*?:\\])', r'\\\1', value)
value = value.replace('\x00', '').strip()
if value:
query_parts.append(u'%s:(%s)' % (key, value))
full_query = u' '.join(query_parts).strip()
if not full_query:
raise ValueError('at least one query term is required')
# Additional parameters to the search.
params = {'query': full_query}
if limit:
params['limit'] = str(limit)
if offset:
params['offset'] = str(offset)
return _do_mb_query(entity, '', [], params)
def _do_mb_delete(path):
"""Send a DELETE request for the specified object.
"""
return _mb_request(path, 'DELETE', True, True)
def _do_mb_put(path):
"""Send a PUT request for the specified object.
"""
return _mb_request(path, 'PUT', True, True)
def _do_mb_post(path, body):
"""Perform a single POST call for an endpoint with a specified
request body.
"""
return _mb_request(path, 'PUT', True, True, body=body)
# The main interface!
# Single entity by ID
def get_artist_by_id(id, includes=[], release_status=[], release_type=[]):
params = _check_filter_and_make_params(includes, release_status, release_type)
return _do_mb_query("artist", id, includes, params)
def get_label_by_id(id, includes=[], release_status=[], release_type=[]):
params = _check_filter_and_make_params(includes, release_status, release_type)
return _do_mb_query("label", id, includes, params)
def get_recording_by_id(id, includes=[], release_status=[], release_type=[]):
params = _check_filter_and_make_params(includes, release_status, release_type)
return _do_mb_query("recording", id, includes, params)
def get_release_by_id(id, includes=[], release_status=[], release_type=[]):
params = _check_filter_and_make_params(includes, release_status, release_type)
return _do_mb_query("release", id, includes, params)
def get_release_group_by_id(id, includes=[], release_status=[], release_type=[]):
params = _check_filter_and_make_params(includes, release_status, release_type)
return _do_mb_query("release-group", id, includes, params)
def get_work_by_id(id, includes=[]):
return _do_mb_query("work", id, includes)
# Searching
def artist_search(query='', limit=None, offset=None, **fields):
"""Search for artists by a free-form `query` string and/or any of
the following keyword arguments specifying field queries:
arid, artist, sortname, type, begin, end, comment, alias, country,
gender, tag
"""
return _do_mb_search('artist', query, fields, limit, offset)
def label_search(query='', limit=None, offset=None, **fields):
"""Search for labels by a free-form `query` string and/or any of
the following keyword arguments specifying field queries:
laid, label, sortname, type, code, country, begin, end, comment,
alias, tag
"""
return _do_mb_search('label', query, fields, limit, offset)
def recording_search(query='', limit=None, offset=None, **fields):
"""Search for recordings by a free-form `query` string and/or any of
the following keyword arguments specifying field queries:
rid, recording, isrc, arid, artist, artistname, creditname, reid,
release, type, status, tracks, tracksrelease, dur, qdur, tnum,
position, tag
"""
return _do_mb_search('recording', query, fields, limit, offset)
def release_search(query='', limit=None, offset=None, **fields):
"""Search for releases by a free-form `query` string and/or any of
the following keyword arguments specifying field queries:
reid, release, arid, artist, artistname, creditname, type, status,
tracks, tracksmedium, discids, discidsmedium, mediums, date, asin,
lang, script, country, date, label, catno, barcode, puid
"""
return _do_mb_search('release', query, fields, limit, offset)
def release_group_search(query='', limit=None, offset=None, **fields):
"""Search for release groups by a free-form `query` string and/or
any of the following keyword arguments specifying field queries:
rgid, releasegroup, reid, release, arid, artist, artistname,
creditname, type, tag
"""
return _do_mb_search('release-group', query, fields, limit, offset)
def work_search(query='', limit=None, offset=None, **fields):
"""Search for works by a free-form `query` string and/or any of
the following keyword arguments specifying field queries:
wid, work, iswc, type, arid, artist, alias, tag
"""
return _do_mb_search('work', query, fields, limit, offset)
# Lists of entities
def get_releases_by_discid(id, includes=[], release_type=[]):
params = _check_filter_and_make_params(includes, release_type=release_type)
return _do_mb_query("discid", id, includes, params)
def get_recordings_by_echoprint(echoprint, includes=[], release_status=[], release_type=[]):
params = _check_filter_and_make_params(includes, release_status, release_type)
return _do_mb_query("echoprint", echoprint, includes, params)
def get_recordings_by_puid(puid, includes=[], release_status=[], release_type=[]):
params = _check_filter_and_make_params(includes, release_status, release_type)
return _do_mb_query("puid", puid, includes, params)
def get_recordings_by_isrc(isrc, includes=[], release_status=[], release_type=[]):
params = _check_filter_and_make_params(includes, release_status, release_type)
return _do_mb_query("isrc", isrc, includes, params)
def get_works_by_iswc(iswc, includes=[]):
return _do_mb_query("iswc", iswc, includes)
# Browse methods
# Browse include are a subset of regular get includes, so we check them here
# and the test in _do_mb_query will pass anyway.
def browse_artist(recording=None, release=None, release_group=None, includes=[], limit=None, offset=None):
# optional parameter work?
_check_includes_impl(includes, ["aliases", "tags", "ratings", "user-tags", "user-ratings"])
p = {}
if recording: p["recording"] = recording
if release: p["release"] = release
if release_group: p["release-group"] = release_group
#if work: p["work"] = work
if len(p) > 1:
raise Exception("Can't have more than one of recording, release, release_group, work")
if limit: p["limit"] = limit
if offset: p["offset"] = offset
return _do_mb_query("artist", "", includes, p)
def browse_label(release=None, includes=[], limit=None, offset=None):
_check_includes_impl(includes, ["aliases", "tags", "ratings", "user-tags", "user-ratings"])
p = {"release": release}
if limit: p["limit"] = limit
if offset: p["offset"] = offset
return _do_mb_query("label", "", includes, p)
def browse_recording(artist=None, release=None, includes=[], limit=None, offset=None):
_check_includes_impl(includes, ["artist-credits", "tags", "ratings", "user-tags", "user-ratings"])
p = {}
if artist: p["artist"] = artist
if release: p["release"] = release
if len(p) > 1:
raise Exception("Can't have more than one of artist, release")
if limit: p["limit"] = limit
if offset: p["offset"] = offset
return _do_mb_query("recording", "", includes, p)
def browse_release(artist=None, label=None, recording=None, release_group=None, release_status=[], release_type=[], includes=[], limit=None, offset=None):
# track_artist param doesn't work yet
_check_includes_impl(includes, ["artist-credits", "labels", "recordings"])
p = {}
if artist: p["artist"] = artist
#if track_artist: p["track_artist"] = track_artist
if label: p["label"] = label
if recording: p["recording"] = recording
if release_group: p["release-group"] = release_group
if len(p) > 1:
raise Exception("Can't have more than one of artist, label, recording, release_group")
if limit: p["limit"] = limit
if offset: p["offset"] = offset
filterp = _check_filter_and_make_params("releases", release_status, release_type)
p.update(filterp)
if len(release_status) == 0 and len(release_type) == 0:
raise InvalidFilterError("Need at least one release status or type")
return _do_mb_query("release", "", includes, p)
def browse_release_group(artist=None, release=None, release_type=[], includes=[], limit=None, offset=None):
_check_includes_impl(includes, ["artist-credits", "tags", "ratings", "user-tags", "user-ratings"])
p = {}
if artist: p["artist"] = artist
if release: p["release"] = release
if len(p) > 1:
raise Exception("Can't have more than one of artist, release")
if limit: p["limit"] = limit
if offset: p["offset"] = offset
filterp = _check_filter_and_make_params("release-groups", [], release_type)
p.update(filterp)
if len(release_type) == 0:
raise InvalidFilterError("Need at least one release type")
return _do_mb_query("release-group", "", includes, p)
# browse_work is defined in the docs but has no browse criteria
# Collections
def get_all_collections():
# Missing <release-list count="n"> the count in the reply
return _do_mb_query("collection", '')
def get_releases_in_collection(collection):
return _do_mb_query("collection", "%s/releases" % collection)
# Submission methods
def submit_barcodes(barcodes):
"""
Submits a set of {release1: barcode1, release2:barcode2}
Must call auth(user, pass) first
"""
query = mbxml.make_barcode_request(barcodes)
return _do_mb_post("release", query)
def submit_puids(puids):
query = mbxml.make_puid_request(puids)
return _do_mb_post("recording", query)
def submit_echoprints(echoprints):
query = mbxml.make_echoprint_request(echoprints)
return _do_mb_post("recording", query)
def submit_isrcs(isrcs):
raise NotImplementedError
def submit_tags(artist_tags={}, recording_tags={}):
""" Submit user tags.
Artist or recording parameters are of the form:
{'entityid': [taglist]}
"""
query = mbxml.make_tag_request(artist_tags, recording_tags)
return _do_mb_post("tag", query)
def submit_ratings(artist_ratings={}, recording_ratings={}):
""" Submit user ratings.
Artist or recording parameters are of the form:
{'entityid': rating}
"""
query = mbxml.make_rating_request(artist_ratings, recording_ratings)
return _do_mb_post("rating", query)
def add_releases_to_collection(collection, releases=[]):
# XXX: Maximum URI length of 16kb means we should only allow ~400 releases
releaselist = ";".join(releases)
_do_mb_put("collection/%s/releases/%s" % (collection, releaselist))
def remove_releases_from_collection(collection, releases=[]):
releaselist = ";".join(releases)
_do_mb_delete("collection/%s/releases/%s" % (collection, releaselist))

View file

@ -0,0 +1,545 @@
import xml.etree.ElementTree as ET
import string
import StringIO
import logging
try:
from ET import fixtag
except:
# Python < 2.7
def fixtag(tag, namespaces):
# given a decorated tag (of the form {uri}tag), return prefixed
# tag and namespace declaration, if any
if isinstance(tag, ET.QName):
tag = tag.text
namespace_uri, tag = string.split(tag[1:], "}", 1)
prefix = namespaces.get(namespace_uri)
if prefix is None:
prefix = "ns%d" % len(namespaces)
namespaces[namespace_uri] = prefix
if prefix == "xml":
xmlns = None
else:
xmlns = ("xmlns:%s" % prefix, namespace_uri)
else:
xmlns = None
return "%s:%s" % (prefix, tag), xmlns
NS_MAP = {"http://musicbrainz.org/ns/mmd-2.0#": "ws2"}
def make_artist_credit(artists):
names = []
for artist in artists:
if isinstance(artist, dict):
names.append(artist.get("artist", {}).get("name", ""))
else:
names.append(artist)
return "".join(names)
def parse_elements(valid_els, element):
""" Extract single level subelements from an element.
For example, given the element:
<element>
<subelement>Text</subelement>
</element>
and a list valid_els that contains "subelement",
return a dict {'subelement': 'Text'}
"""
result = {}
for sub in element:
t = fixtag(sub.tag, NS_MAP)[0]
if ":" in t:
t = t.split(":")[1]
if t in valid_els:
result[t] = sub.text
else:
logging.debug("in <%s>, uncaught <%s>", fixtag(element.tag, NS_MAP)[0], t)
return result
def parse_attributes(attributes, element):
""" Extract attributes from an element.
For example, given the element:
<element type="Group" />
and a list attributes that contains "type",
return a dict {'type': 'Group'}
"""
result = {}
for attr in attributes:
if attr in element.attrib:
result[attr] = element.attrib[attr]
else:
logging.debug("in <%s>, uncaught attribute %s", fixtag(element.tag, NS_MAP)[0], attr)
return result
def parse_inner(inner_els, element):
""" Delegate the parsing of a subelement to another function.
For example, given the element:
<element>
<subelement>
<a>Foo</a><b>Bar</b>
</subelement>
</element>
and a dictionary {'subelement': parse_subelement},
call parse_subelement(<subelement>) and
return a dict {'subelement': <result>}
if parse_subelement returns a tuple of the form
('subelement-key', <result>) then return a dict
{'subelement-key': <result>} instead
"""
result = {}
for sub in element:
t = fixtag(sub.tag, NS_MAP)[0]
if ":" in t:
t = t.split(":")[1]
if t in inner_els.keys():
inner_result = inner_els[t](sub)
if isinstance(inner_result, tuple):
result[inner_result[0]] = inner_result[1]
else:
result[t] = inner_result
else:
logging.debug("in <%s>, not delegating <%s>", fixtag(element.tag, NS_MAP)[0], t)
return result
def parse_message(message):
s = message.read()
f = StringIO.StringIO(s)
tree = ET.ElementTree(file=f)
root = tree.getroot()
result = {}
valid_elements = {"artist": parse_artist,
"label": parse_label,
"release": parse_release,
"release-group": parse_release_group,
"recording": parse_recording,
"work": parse_work,
"disc": parse_disc,
"puid": parse_puid,
"echoprint": parse_puid,
"artist-list": parse_artist_list,
"label-list": parse_label_list,
"release-list": parse_release_list,
"release-group-list": parse_release_group_list,
"recording-list": parse_recording_list,
"work-list": parse_work_list,
"collection-list": parse_collection_list,
"collection": parse_collection,
"message": parse_response_message
}
result.update(parse_inner(valid_elements, root))
return result
def parse_response_message(message):
return parse_elements(["text"], message)
def parse_collection_list(cl):
return [parse_collection(c) for c in cl]
def parse_collection(collection):
result = {}
attribs = ["id"]
elements = ["name", "editor"]
inner_els = {"release-list": parse_release_list}
result.update(parse_attributes(attribs, collection))
result.update(parse_elements(elements, collection))
result.update(parse_inner(inner_els, collection))
return result
def parse_collection_release_list(rl):
attribs = ["count"]
return parse_attributes(attribs, rl)
def parse_artist_lifespan(lifespan):
parts = parse_elements(["begin", "end"], lifespan)
beginval = parts.get("begin", "")
endval = parts.get("end", "")
return (beginval, endval)
def parse_artist_list(al):
return [parse_artist(a) for a in al]
def parse_artist(artist):
result = {}
attribs = ["id", "type"]
elements = ["name", "sort-name", "country", "user-rating"]
inner_els = {"life-span": parse_artist_lifespan,
"recording-list": parse_recording_list,
"release-list": parse_release_list,
"release-group-list": parse_release_group_list,
"work-list": parse_work_list,
"tag-list": parse_tag_list,
"user-tag-list": parse_tag_list,
"rating": parse_rating,
"alias-list": parse_alias_list}
result.update(parse_attributes(attribs, artist))
result.update(parse_elements(elements, artist))
result.update(parse_inner(inner_els, artist))
return result
def parse_label_list(ll):
return [parse_label(l) for l in ll]
def parse_label(label):
result = {}
attribs = ["id", "type"]
elements = ["name", "sort-name", "country", "label-code", "user-rating"]
inner_els = {"life-span": parse_artist_lifespan,
"release-list": parse_release_list,
"tag-list": parse_tag_list,
"user-tag-list": parse_tag_list,
"rating": parse_rating,
"alias-list": parse_alias_list}
result.update(parse_attributes(attribs, label))
result.update(parse_elements(elements, label))
result.update(parse_inner(inner_els, label))
return result
def parse_attribute_list(al):
return [parse_attribute_tag(a) for a in al]
def parse_attribute_tag(attribute):
return attribute.text
def parse_relation_list(rl):
attribs = ["target-type"]
ttype = parse_attributes(attribs, rl)
key = "%s-relation-list" % ttype["target-type"]
return (key, [parse_relation(r) for r in rl])
def parse_relation(relation):
result = {}
attribs = ["type"]
elements = ["target", "direction"]
inner_els = {"artist": parse_artist,
"label": parse_label,
"recording": parse_recording,
"release": parse_release,
"release-group": parse_release_group,
"attribute-list": parse_attribute_list,
"work": parse_work
}
result.update(parse_attributes(attribs, relation))
result.update(parse_elements(elements, relation))
result.update(parse_inner(inner_els, relation))
return result
def parse_release(release):
result = {}
attribs = ["id"]
elements = ["title", "status", "disambiguation", "quality", "country", "barcode", "date", "packaging", "asin"]
inner_els = {"text-representation": parse_text_representation,
"artist-credit": parse_artist_credit,
"label-info-list": parse_label_info_list,
"medium-list": parse_medium_list,
"release-group": parse_release_group,
"relation-list": parse_relation_list}
result.update(parse_attributes(attribs, release))
result.update(parse_elements(elements, release))
result.update(parse_inner(inner_els, release))
if "artist-credit" in result:
result["artist-credit-phrase"] = make_artist_credit(result["artist-credit"])
return result
def parse_medium_list(ml):
return [parse_medium(m) for m in ml]
def parse_medium(medium):
result = {}
elements = ["position", "format", "title"]
inner_els = {"disc-list": parse_disc_list,
"track-list": parse_track_list}
result.update(parse_elements(elements, medium))
result.update(parse_inner(inner_els, medium))
return result
def parse_disc_list(dl):
return [parse_disc(d) for d in dl]
def parse_text_representation(textr):
return parse_elements(["language", "script"], textr)
def parse_release_group(rg):
result = {}
attribs = ["id", "type"]
elements = ["title", "user-rating", "first-release-date"]
inner_els = {"artist-credit": parse_artist_credit,
"release-list": parse_release_list,
"tag-list": parse_tag_list,
"user-tag-list": parse_tag_list,
"rating": parse_rating}
result.update(parse_attributes(attribs, rg))
result.update(parse_elements(elements, rg))
result.update(parse_inner(inner_els, rg))
if "artist-credit" in result:
result["artist-credit-phrase"] = make_artist_credit(result["artist-credit"])
return result
def parse_recording(recording):
result = {}
attribs = ["id"]
elements = ["title", "length", "user-rating"]
inner_els = {"artist-credit": parse_artist_credit,
"release-list": parse_release_list,
"tag-list": parse_tag_list,
"user-tag-list": parse_tag_list,
"rating": parse_rating,
"puid-list": parse_external_id_list,
"isrc-list": parse_external_id_list,
"echoprint-list": parse_external_id_list}
result.update(parse_attributes(attribs, recording))
result.update(parse_elements(elements, recording))
result.update(parse_inner(inner_els, recording))
if "artist-credit" in result:
result["artist-credit-phrase"] = make_artist_credit(result["artist-credit"])
return result
def parse_external_id_list(pl):
return [parse_attributes(["id"], p)["id"] for p in pl]
def parse_work_list(wl):
result = []
for w in wl:
result.append(parse_work(w))
return result
def parse_work(work):
result = {}
attribs = ["id"]
elements = ["title", "user-rating"]
inner_els = {"tag-list": parse_tag_list,
"user-tag-list": parse_tag_list,
"rating": parse_rating,
"alias-list": parse_alias_list}
result.update(parse_attributes(attribs, work))
result.update(parse_elements(elements, work))
result.update(parse_inner(inner_els, work))
return result
def parse_disc(disc):
result = {}
attribs = ["id"]
elements = ["sectors"]
inner_els = {"release-list": parse_release_list}
result.update(parse_attributes(attribs, disc))
result.update(parse_elements(elements, disc))
result.update(parse_inner(inner_els, disc))
return result
def parse_release_list(rl):
result = []
for r in rl:
result.append(parse_release(r))
return result
def parse_release_group_list(rgl):
result = []
for rg in rgl:
result.append(parse_release_group(rg))
return result
def parse_puid(puid):
result = {}
attribs = ["id"]
inner_els = {"recording-list": parse_recording_list}
result.update(parse_attributes(attribs, puid))
result.update(parse_inner(inner_els, puid))
return result
def parse_recording_list(recs):
result = []
for r in recs:
result.append(parse_recording(r))
return result
def parse_artist_credit(ac):
result = []
for namecredit in ac:
result.append(parse_name_credit(namecredit))
join = parse_attributes(["joinphrase"], namecredit)
if "joinphrase" in join:
result.append(join["joinphrase"])
return result
def parse_name_credit(nc):
result = {}
elements = ["name"]
inner_els = {"artist": parse_artist}
result.update(parse_elements(elements, nc))
result.update(parse_inner(inner_els, nc))
return result
def parse_label_info_list(lil):
result = []
for li in lil:
result.append(parse_label_info(li))
return result
def parse_label_info(li):
result = {}
elements = ["catalog-number"]
inner_els = {"label": parse_label}
result.update(parse_elements(elements, li))
result.update(parse_inner(inner_els, li))
return result
def parse_track_list(tl):
result = []
for t in tl:
result.append(parse_track(t))
return result
def parse_track(track):
result = {}
elements = ["position"]
inner_els = {"recording": parse_recording}
result.update(parse_elements(elements, track))
result.update(parse_inner(inner_els, track))
return result
def parse_tag_list(tl):
result = []
for t in tl:
result.append(parse_tag(t))
return result
def parse_tag(tag):
result = {}
attribs = ["count"]
elements = ["name"]
result.update(parse_attributes(attribs, tag))
result.update(parse_elements(elements, tag))
return result
def parse_rating(rating):
result = {}
attribs = ["votes-count"]
result.update(parse_attributes(attribs, rating))
result["rating"] = rating.text
return result
def parse_alias_list(al):
result = []
for a in al:
result.append(a.text)
return result
###
def make_barcode_request(barcodes):
NS = "http://musicbrainz.org/ns/mmd-2.0#"
root = ET.Element("{%s}metadata" % NS)
rel_list = ET.SubElement(root, "{%s}release-list" % NS)
for release, barcode in barcodes.items():
rel_xml = ET.SubElement(rel_list, "{%s}release" % NS)
bar_xml = ET.SubElement(rel_xml, "{%s}barcode" % NS)
rel_xml.set("{%s}id" % NS, release)
bar_xml.text = barcode
return ET.tostring(root, "utf-8")
def make_puid_request(puids):
NS = "http://musicbrainz.org/ns/mmd-2.0#"
root = ET.Element("{%s}metadata" % NS)
rec_list = ET.SubElement(root, "{%s}recording-list" % NS)
for recording, puid_list in puids.items():
rec_xml = ET.SubElement(rec_list, "{%s}recording" % NS)
rec_xml.set("id", recording)
p_list_xml = ET.SubElement(rec_xml, "{%s}puid-list" % NS)
l = puid_list if isinstance(puid_list, list) else [puid_list]
for p in l:
p_xml = ET.SubElement(p_list_xml, "{%s}puid" % NS)
p_xml.set("id", p)
return ET.tostring(root, "utf-8")
def make_echoprint_request(echoprints):
NS = "http://musicbrainz.org/ns/mmd-2.0#"
root = ET.Element("{%s}metadata" % NS)
rec_list = ET.SubElement(root, "{%s}recording-list" % NS)
for recording, echoprint_list in echoprints.items():
rec_xml = ET.SubElement(rec_list, "{%s}recording" % NS)
rec_xml.set("id", recording)
e_list_xml = ET.SubElement(rec_xml, "{%s}echoprint-list" % NS)
l = echoprint_list if isinstance(echoprint_list, list) else [echoprint_list]
for e in l:
e_xml = ET.SubElement(e_list_xml, "{%s}echoprint" % NS)
e_xml.set("id", e)
return ET.tostring(root, "utf-8")
def make_tag_request(artist_tags, recording_tags):
NS = "http://musicbrainz.org/ns/mmd-2.0#"
root = ET.Element("{%s}metadata" % NS)
rec_list = ET.SubElement(root, "{%s}recording-list" % NS)
for rec, tags in recording_tags.items():
rec_xml = ET.SubElement(rec_list, "{%s}recording" % NS)
rec_xml.set("{%s}id" % NS, rec)
taglist = ET.SubElement(rec_xml, "{%s}user-tag-list" % NS)
for t in tags:
usertag_xml = ET.SubElement(taglist, "{%s}user-tag" % NS)
name_xml = ET.SubElement(usertag_xml, "{%s}name" % NS)
name_xml.text = t
art_list = ET.SubElement(root, "{%s}artist-list" % NS)
for art, tags in artist_tags.items():
art_xml = ET.SubElement(art_list, "{%s}artist" % NS)
art_xml.set("{%s}id" % NS, art)
taglist = ET.SubElement(art_xml, "{%s}user-tag-list" % NS)
for t in tags:
usertag_xml = ET.SubElement(taglist, "{%s}user-tag" % NS)
name_xml = ET.SubElement(usertag_xml, "{%s}name" % NS)
name_xml.text = t
return ET.tostring(root, "utf-8")
def make_rating_request(artist_ratings, recording_ratings):
NS = "http://musicbrainz.org/ns/mmd-2.0#"
root = ET.Element("{%s}metadata" % NS)
rec_list = ET.SubElement(root, "{%s}recording-list" % NS)
for rec, rating in recording_ratings.items():
rec_xml = ET.SubElement(rec_list, "{%s}recording" % NS)
rec_xml.set("{%s}id" % NS, rec)
rating_xml = ET.SubElement(rec_xml, "{%s}user-rating" % NS)
if isinstance(rating, int):
rating = "%d" % rating
rating_xml.text = rating
art_list = ET.SubElement(root, "{%s}artist-list" % NS)
for art, rating in artist_ratings.items():
art_xml = ET.SubElement(art_list, "{%s}artist" % NS)
art_xml.set("{%s}id" % NS, art)
rating_xml = ET.SubElement(rec_xml, "{%s}user-rating" % NS)
if isinstance(rating, int):
rating = "%d" % rating
rating_xml.text = rating
return ET.tostring(root, "utf-8")

View file

@ -4,6 +4,10 @@ Changelog
1.0b11 (In Development)
-----------------------
* Beets now communicates with MusicBrainz via the new `Next Generation Schema`_
(NGS) service via `python-musicbrainz-ngs`_. The bindings are included with
this version of beets, but a future version will make them an external
dependency.
* The new :doc:`/plugins/lastgenre` automatically assigns genres to imported
albums and items based on Last.fm tags and an internal whitelist. (Thanks to
`KraYmer`_.)
@ -13,6 +17,8 @@ Changelog
* Fix a crash after using the "as Tracks" option during import.
.. _KraYmer: https://github.com/KraYmer
.. _Next Generation Schema: http://musicbrainz.org/doc/XML_Web_Service/Version_2
.. _python-musicbrainz-ngs: https://github.com/alastair/python-musicbrainz-ngs
1.0b10 (September 22, 2011)
---------------------------

View file

@ -52,7 +52,6 @@ setup(name='beets',
install_requires=[
'mutagen',
'python-musicbrainz2 >= 0.7.2',
'munkres',
'unidecode',
],

View file

@ -14,121 +14,41 @@
"""Tests for MusicBrainz API wrapper.
"""
import unittest
import time
import musicbrainz2.model
import musicbrainz2.webservice as mbws
import httplib
import _common
from beets.autotag import mb
def nullfun(): pass
class MBQueryWaitTest(unittest.TestCase):
def setUp(self):
# simulate startup
mb.last_query_time = 0.0
self.cop = _common.Timecop()
self.cop.install()
def tearDown(self):
self.cop.restore()
def test_do_not_wait_initially(self):
time1 = time.time()
mb._query_wrap(nullfun)
time2 = time.time()
self.assertTrue(time2 - time1 < 1.0)
def test_second_rapid_query_waits(self):
mb._query_wrap(nullfun)
time1 = time.time()
mb._query_wrap(nullfun)
time2 = time.time()
self.assertTrue(time2 - time1 >= 1.0)
def test_second_distant_query_does_not_wait(self):
mb._query_wrap(nullfun)
time.sleep(1.0)
time1 = time.time()
mb._query_wrap(nullfun)
time2 = time.time()
self.assertTrue(time2 - time1 < 1.0)
def raise_once_func(exc):
count = [0] # use a list to get a reference (avoid need for nonlocal)
def fun():
count[0] += 1
if count[0] == 1:
raise exc
else:
return 1
return fun
def raise_func(exc):
def fun():
raise exc
return fun
class MBQueryErrorTest(unittest.TestCase):
def setUp(self):
mb.last_query_time = 0.0
self.cop = _common.Timecop()
self.cop.install()
def tearDown(self):
self.cop.restore()
def test_503_error_retries(self):
exc = mbws.WebServiceError(reason=Exception('Error 503'))
mb._query_wrap(raise_once_func(exc))
def test_504_error_retries(self):
exc = mbws.WebServiceError(reason=Exception('Error 504'))
mb._query_wrap(raise_once_func(exc))
def test_status_line_error_retries(self):
exc = httplib.BadStatusLine('dummy')
mb._query_wrap(raise_once_func(exc))
def test_999_error_passes_through(self):
exc = mbws.WebServiceError(reason=Exception('Error 999'))
with self.assertRaises(mbws.WebServiceError):
mb._query_wrap(raise_once_func(exc))
def test_repeated_error_raises_busy(self):
exc = mbws.WebServiceError(reason=Exception('Error 503'))
with self.assertRaises(mb.ServerBusyError):
mb._query_wrap(raise_func(exc))
class MBAlbumInfoTest(unittest.TestCase):
def _make_release(self, date_str='2009'):
release = musicbrainz2.model.Release()
release.title = 'ALBUM TITLE'
release.id = 'domain/ALBUM ID'
release.addType(musicbrainz2.model.Release.TYPE_ALBUM)
release.addType(musicbrainz2.model.Release.TYPE_OFFICIAL)
release.artist = musicbrainz2.model.Artist()
release.artist.name = 'ARTIST NAME'
release.artist.id = 'domain/ARTIST ID'
event = musicbrainz2.model.ReleaseEvent()
if date_str is not None:
event.date = date_str
release.releaseEvents.append(event)
def _make_release(self, date_str='2009', tracks=None):
release = {
'title': 'ALBUM TITLE',
'id': 'ALBUM ID',
'release-group': {'type': 'Album'},
'artist-credit': [
{'artist': {'name': 'ARTIST NAME', 'id': 'ARTIST ID'}}
],
'date': date_str,
'medium-list': [],
}
if tracks:
release['medium-list'].append({
'track-list': [{'recording': track} for track in tracks]
})
return release
def _make_track(self, title, tr_id, duration):
track = musicbrainz2.model.Track()
track.title = title
track.id = tr_id
track = {
'title': title,
'id': tr_id,
}
if duration is not None:
track.duration = duration
track['length'] = duration
return track
def test_parse_release_with_year(self):
release = self._make_release('1984')
d = mb.album_info(release, [])
d = mb.album_info(release)
self.assertEqual(d.album, 'ALBUM TITLE')
self.assertEqual(d.album_id, 'ALBUM ID')
self.assertEqual(d.artist, 'ARTIST NAME')
@ -137,21 +57,22 @@ class MBAlbumInfoTest(unittest.TestCase):
def test_parse_release_type(self):
release = self._make_release('1984')
d = mb.album_info(release, [])
d = mb.album_info(release)
self.assertEqual(d.albumtype, 'album')
def test_parse_release_full_date(self):
release = self._make_release('1987-03-31')
d = mb.album_info(release, [])
d = mb.album_info(release)
self.assertEqual(d.year, 1987)
self.assertEqual(d.month, 3)
self.assertEqual(d.day, 31)
def test_parse_tracks(self):
release = self._make_release()
tracks = [self._make_track('TITLE ONE', 'dom/ID ONE', 100.0 * 1000.0),
self._make_track('TITLE TWO', 'dom/ID TWO', 200.0 * 1000.0)]
d = mb.album_info(release, tracks)
tracks = [self._make_track('TITLE ONE', 'ID ONE', 100.0 * 1000.0),
self._make_track('TITLE TWO', 'ID TWO', 200.0 * 1000.0)]
release = self._make_release(tracks=tracks)
d = mb.album_info(release)
t = d.tracks
self.assertEqual(len(t), 2)
self.assertEqual(t[0].title, 'TITLE ONE')
@ -163,43 +84,35 @@ class MBAlbumInfoTest(unittest.TestCase):
def test_parse_release_year_month_only(self):
release = self._make_release('1987-03')
d = mb.album_info(release, [])
d = mb.album_info(release)
self.assertEqual(d.year, 1987)
self.assertEqual(d.month, 3)
def test_no_durations(self):
release = self._make_release()
tracks = [self._make_track('TITLE', 'dom/ID', None)]
d = mb.album_info(release, tracks)
tracks = [self._make_track('TITLE', 'ID', None)]
release = self._make_release(tracks=tracks)
d = mb.album_info(release)
self.assertEqual(d.tracks[0].length, None)
def test_no_release_date(self):
release = self._make_release(None)
d = mb.album_info(release, [])
d = mb.album_info(release)
self.assertFalse(d.year)
self.assertFalse(d.month)
self.assertFalse(d.day)
def test_various_artists_defaults_false(self):
release = self._make_release(None)
d = mb.album_info(release, [])
d = mb.album_info(release)
self.assertFalse(d.va)
def test_detect_various_artists(self):
release = self._make_release(None)
release.artist.id = musicbrainz2.model.VARIOUS_ARTISTS_ID
d = mb.album_info(release, [])
release['artist-credit'][0]['artist']['id'] = \
mb.VARIOUS_ARTISTS_ID
d = mb.album_info(release)
self.assertTrue(d.va)
class QuerySanitationTest(unittest.TestCase):
def test_special_char_escaped(self):
res = mb._lucene_escape('!')
self.assertEqual(res, '\\!')
def test_null_character_removed(self):
res = mb._lucene_escape('\0')
self.assertEqual(res, '')
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)