# This file is part of beets. # Copyright 2013, Adrian Sampson. # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. """Adds Discogs album search support to the autotagger. Requires the discogs-client library. """ from beets.autotag.hooks import AlbumInfo, TrackInfo, Distance from beets.plugins import BeetsPlugin from discogs_client import DiscogsAPIError, Release, Search import beets import discogs_client import logging import re import time log = logging.getLogger('beets') # Silence spurious INFO log lines generated by urllib3. urllib3_logger = logging.getLogger('requests.packages.urllib3') urllib3_logger.setLevel(logging.CRITICAL) # Set user-agent for discogs client. discogs_client.user_agent = 'beets/%s +http://beets.radbox.org/' % \ beets.__version__ class DiscogsPlugin(BeetsPlugin): def __init__(self): super(DiscogsPlugin, self).__init__() self.config.add({ 'source_weight': 0.5, }) def album_distance(self, items, album_info, mapping): """Returns the album distance. """ dist = Distance() if album_info.data_source == 'Discogs': dist.add('source', self.config['source_weight'].as_number()) return dist def candidates(self, items, artist, album, va_likely): """Returns a list of AlbumInfo objects for discogs search results matching an album and artist (if not various). """ if va_likely: query = album else: query = '%s %s' % (artist, album) try: return self.get_albums(query) except DiscogsAPIError as e: log.debug('Discogs API Error: %s (query: %s' % (e, query)) return [] def album_for_id(self, album_id): """Fetches an album by its Discogs ID and returns an AlbumInfo object or None if the album is not found. """ log.debug('Searching discogs for release %s' % str(album_id)) # Discogs-IDs are simple integers. We only look for those at the end # of an input string as to avoid confusion with other metadata plugins. # An optional bracket can follow the integer, as this is how discogs # displays the release ID on its webpage. match = re.search(r'(^|\[*r|discogs\.com/.+/release/)(\d+)($|\])', album_id) if not match: return None result = Release(match.group(2)) # Try to obtain title to verify that we indeed have a valid Release try: getattr(result, 'title') except DiscogsAPIError as e: if e.message != '404 Not Found': log.debug('Discogs API Error: %s (query: %s)' % (e, result._uri)) return None return self.get_album_info(result) def get_albums(self, query): """Returns a list of AlbumInfo objects for a discogs search query. """ # Strip non-word characters from query. Things like "!" and "-" can # cause a query to return no results, even if they match the artist or # album title. Use `re.UNICODE` flag to avoid stripping non-english # word characters. query = re.sub(r'(?u)\W+', ' ', query) # Strip medium information from query, Things like "CD1" and "disk 1" # can also negate an otherwise positive result. query = re.sub(r'(?i)\b(CD|disc)\s*\d+', '', query) albums = [] for result in Search(query).results(): if isinstance(result, Release): albums.append(self.get_album_info(result)) if len(albums) >= 5: break return albums def get_album_info(self, result): """Returns an AlbumInfo object for a discogs Release object. """ album = re.sub(r' +', ' ', result.title) album_id = result.data['id'] artist, artist_id = self.get_artist(result.data['artists']) # Use `.data` to access the tracklist directly instead of the # convenient `.tracklist` property, which will strip out useful artist # information and leave us with skeleton `Artist` objects that will # each make an API call just to get the same data back. tracks = self.get_tracks(result.data['tracklist']) albumtype = ', '.join( result.data['formats'][0].get('descriptions', [])) or None va = result.data['artists'][0]['name'].lower() == 'various' year = result.data['year'] label = result.data['labels'][0]['name'] mediums = len(set(t.medium for t in tracks)) catalogno = result.data['labels'][0]['catno'] if catalogno == 'none': catalogno = None country = result.data.get('country') media = result.data['formats'][0]['name'] data_url = result.data['uri'] return AlbumInfo(album, album_id, artist, artist_id, tracks, asin=None, albumtype=albumtype, va=va, year=year, month=None, day=None, label=label, mediums=mediums, artist_sort=None, releasegroup_id=None, catalognum=catalogno, script=None, language=None, country=country, albumstatus=None, media=media, albumdisambig=None, artist_credit=None, original_year=None, original_month=None, original_day=None, data_source='Discogs', data_url=data_url) def get_artist(self, artists): """Returns an artist string (all artists) and an artist_id (the main artist) for a list of discogs album or track artists. """ artist_id = None bits = [] for artist in artists: if not artist_id: artist_id = artist['id'] name = artist['name'] # Strip disambiguation number. name = re.sub(r' \(\d+\)$', '', name) # Move articles to the front. name = re.sub(r'(?i)^(.*?), (a|an|the)$', r'\2 \1', name) bits.append(name) if artist['join']: bits.append(artist['join']) artist = ' '.join(bits).replace(' ,', ',') or None return artist, artist_id def get_tracks(self, tracklist): """Returns a list of TrackInfo objects for a discogs tracklist. """ tracks = [] index_tracks = {} index = 0 for track in tracklist: # Only real tracks have `position`. Otherwise, it's an index track. if track['position']: index += 1 tracks.append(self.get_track_info(track, index)) else: index_tracks[index + 1] = track['title'] # Fix up medium and medium_index for each track. Discogs position is # unreliable, but tracks are in order. medium = None medium_count, index_count = 0, 0 for track in tracks: # Handle special case where a different medium does not indicate a # new disc, when there is no medium_index and the ordinal of medium # is not sequential. For example, I, II, III, IV, V. Assume these # are the track index, not the medium. medium_is_index = track.medium and not track.medium_index and ( len(track.medium) != 1 or ord(track.medium) - 64 != medium_count + 1) if not medium_is_index and medium != track.medium: # Increment medium_count and reset index_count when medium # changes. medium = track.medium medium_count += 1 index_count = 0 index_count += 1 track.medium, track.medium_index = medium_count, index_count # Get `disctitle` from Discogs index tracks. Assume that an index track # before the first track of each medium is a disc title. for track in tracks: if track.medium_index == 1: if track.index in index_tracks: disctitle = index_tracks[track.index] else: disctitle = None track.disctitle = disctitle return tracks def get_track_info(self, track, index): """Returns a TrackInfo object for a discogs track. """ title = track['title'] track_id = None medium, medium_index = self.get_track_index(track['position']) artist, artist_id = self.get_artist(track.get('artists', [])) length = self.get_track_length(track['duration']) return TrackInfo(title, track_id, artist, artist_id, length, index, medium, medium_index, artist_sort=None, disctitle=None, artist_credit=None) def get_track_index(self, position): """Returns the medium and medium index for a discogs track position. """ # medium_index is a number at the end of position. medium is everything # else. E.g. (A)(1), (Side A, Track )(1), (A)(), ()(1), etc. match = re.match(r'^(.*?)(\d*)$', position.upper()) if match: medium, index = match.groups() else: log.debug('Invalid discogs position: %s' % position) medium = index = None return medium or None, index or None def get_track_length(self, duration): """Returns the track length in seconds for a discogs duration. """ try: length = time.strptime(duration, '%M:%S') except ValueError: return None return length.tm_min * 60 + length.tm_sec