Merge branch 'master' of https://github.com/beetbox/beets into 2826-deprecation-warnings

This commit is contained in:
Vladimir Zhelezov 2019-03-04 21:01:19 +01:00
commit 3eb5907da8
43 changed files with 1767 additions and 382 deletions

View file

@ -51,7 +51,8 @@ def get_art(log, item):
def embed_item(log, item, imagepath, maxwidth=None, itempath=None,
compare_threshold=0, ifempty=False, as_album=False):
compare_threshold=0, ifempty=False, as_album=False,
id3v23=None):
"""Embed an image into the item's media file.
"""
# Conditions and filters.
@ -60,8 +61,8 @@ def embed_item(log, item, imagepath, maxwidth=None, itempath=None,
log.info(u'Image not similar; skipping.')
return
if ifempty and get_art(log, item):
log.info(u'media file already contained art')
return
log.info(u'media file already contained art')
return
if maxwidth and not as_album:
imagepath = resize_image(log, imagepath, maxwidth)
@ -80,7 +81,7 @@ def embed_item(log, item, imagepath, maxwidth=None, itempath=None,
image.mime_type)
return
item.try_write(path=itempath, tags={'images': [image]})
item.try_write(path=itempath, tags={'images': [image]}, id3v23=id3v23)
def embed_album(log, album, maxwidth=None, quiet=False,

View file

@ -142,34 +142,46 @@ def apply_metadata(album_info, mapping):
# Compilation flag.
item.comp = album_info.va
# Miscellaneous metadata.
for field in ('albumtype',
'label',
'asin',
'catalognum',
'script',
'language',
'country',
'albumstatus',
'albumdisambig',
'releasegroupdisambig',
'data_source',):
value = getattr(album_info, field)
if value is not None:
item[field] = value
if track_info.disctitle is not None:
item.disctitle = track_info.disctitle
if track_info.media is not None:
item.media = track_info.media
if track_info.lyricist is not None:
item.lyricist = track_info.lyricist
if track_info.composer is not None:
item.composer = track_info.composer
if track_info.composer_sort is not None:
item.composer_sort = track_info.composer_sort
if track_info.arranger is not None:
item.arranger = track_info.arranger
# Track alt.
item.track_alt = track_info.track_alt
# Miscellaneous/nullable metadata.
misc_fields = {
'album': (
'albumtype',
'label',
'asin',
'catalognum',
'script',
'language',
'country',
'albumstatus',
'albumdisambig',
'releasegroupdisambig',
'data_source',
),
'track': (
'disctitle',
'lyricist',
'media',
'composer',
'composer_sort',
'arranger',
)
}
# Don't overwrite fields with empty values unless the
# field is explicitly allowed to be overwritten
for field in misc_fields['album']:
clobber = field in config['overwrite_null']['album'].as_str_seq()
value = getattr(album_info, field)
if value is None and not clobber:
continue
item[field] = value
for field in misc_fields['track']:
clobber = field in config['overwrite_null']['track'].as_str_seq()
value = getattr(track_info, field)
if value is None and not clobber:
continue
item[field] = value

View file

@ -72,8 +72,8 @@ class AlbumInfo(object):
- ``data_source``: The original data source (MusicBrainz, Discogs, etc.)
- ``data_url``: The data source release URL.
The fields up through ``tracks`` are required. The others are
optional and may be None.
``mediums`` along with the fields up through ``tracks`` are required.
The others are optional and may be None.
"""
def __init__(self, album, album_id, artist, artist_id, tracks, asin=None,
albumtype=None, va=False, year=None, month=None, day=None,

View file

@ -53,6 +53,9 @@ aunique:
disambiguators: albumtype year label catalognum albumdisambig releasegroupdisambig
bracket: '[]'
overwrite_null:
album: []
track: []
plugins: []
pluginpath: []

View file

@ -143,6 +143,11 @@ class Model(object):
are subclasses of `Sort`.
"""
_queries = {}
"""Named queries that use a field-like `name:value` syntax but which
do not relate to any specific field.
"""
_always_dirty = False
"""By default, fields only become "dirty" when their value actually
changes. Enabling this flag marks fields as dirty even when the new

View file

@ -119,12 +119,13 @@ def construct_query_part(model_cls, prefixes, query_part):
if not query_part:
return query.TrueQuery()
# Use `model_cls` to build up a map from field names to `Query`
# classes.
# Use `model_cls` to build up a map from field (or query) names to
# `Query` classes.
query_classes = {}
for k, t in itertools.chain(model_cls._fields.items(),
model_cls._types.items()):
query_classes[k] = t.query
query_classes.update(model_cls._queries) # Non-field queries.
# Parse the string.
key, pattern, query_class, negate = \
@ -137,26 +138,27 @@ def construct_query_part(model_cls, prefixes, query_part):
# The query type matches a specific field, but none was
# specified. So we use a version of the query that matches
# any field.
q = query.AnyFieldQuery(pattern, model_cls._search_fields,
query_class)
if negate:
return query.NotQuery(q)
else:
return q
out_query = query.AnyFieldQuery(pattern, model_cls._search_fields,
query_class)
else:
# Non-field query type.
if negate:
return query.NotQuery(query_class(pattern))
else:
return query_class(pattern)
out_query = query_class(pattern)
# Otherwise, this must be a `FieldQuery`. Use the field name to
# construct the query object.
key = key.lower()
q = query_class(key.lower(), pattern, key in model_cls._fields)
# Field queries get constructed according to the name of the field
# they are querying.
elif issubclass(query_class, query.FieldQuery):
key = key.lower()
out_query = query_class(key.lower(), pattern, key in model_cls._fields)
# Non-field (named) query.
else:
out_query = query_class(pattern)
# Apply negation.
if negate:
return query.NotQuery(q)
return q
return query.NotQuery(out_query)
else:
return out_query
def query_from_strings(query_cls, model_cls, prefixes, query_parts):

View file

@ -611,7 +611,7 @@ class Item(LibModel):
self.path = read_path
def write(self, path=None, tags=None):
def write(self, path=None, tags=None, id3v23=None):
"""Write the item's metadata to a media file.
All fields in `_media_fields` are written to disk according to
@ -623,6 +623,9 @@ class Item(LibModel):
`tags` is a dictionary of additional metadata the should be
written to the file. (These tags need not be in `_media_fields`.)
`id3v23` will override the global `id3v23` config option if it is
set to something other than `None`.
Can raise either a `ReadError` or a `WriteError`.
"""
if path is None:
@ -630,6 +633,9 @@ class Item(LibModel):
else:
path = normpath(path)
if id3v23 is None:
id3v23 = beets.config['id3v23'].get(bool)
# Get the data to write to the file.
item_tags = dict(self)
item_tags = {k: v for k, v in item_tags.items()
@ -640,8 +646,7 @@ class Item(LibModel):
# Open the file.
try:
mediafile = MediaFile(syspath(path),
id3v23=beets.config['id3v23'].get(bool))
mediafile = MediaFile(syspath(path), id3v23=id3v23)
except UnreadableFileError as exc:
raise ReadError(path, exc)
@ -657,14 +662,14 @@ class Item(LibModel):
self.mtime = self.current_mtime()
plugins.send('after_write', item=self, path=path)
def try_write(self, path=None, tags=None):
def try_write(self, *args, **kwargs):
"""Calls `write()` but catches and logs `FileOperationError`
exceptions.
Returns `False` an exception was caught and `True` otherwise.
"""
try:
self.write(path, tags)
self.write(*args, **kwargs)
return True
except FileOperationError as exc:
log.error(u"{0}", exc)

View file

@ -347,6 +347,16 @@ def types(model_cls):
return types
def named_queries(model_cls):
# Gather `item_queries` and `album_queries` from the plugins.
attr_name = '{0}_queries'.format(model_cls.__name__.lower())
queries = {}
for plugin in find_plugins():
plugin_queries = getattr(plugin, attr_name, {})
queries.update(plugin_queries)
return queries
def track_distance(item, info):
"""Gets the track distance calculated by all loaded plugins.
Returns a Distance object.

View file

@ -1143,8 +1143,12 @@ def _setup(options, lib=None):
if lib is None:
lib = _open_library(config)
plugins.send("library_opened", lib=lib)
# Add types and queries defined by plugins.
library.Item._types.update(plugins.types(library.Item))
library.Album._types.update(plugins.types(library.Album))
library.Item._queries.update(plugins.named_queries(library.Item))
library.Album._queries.update(plugins.named_queries(library.Album))
return subcommands, plugins, lib

View file

@ -1490,18 +1490,24 @@ def move_items(lib, dest, query, copy, album, pretend, confirm=False,
"""
items, albums = _do_query(lib, query, album, False)
objs = albums if album else items
num_objs = len(objs)
# Filter out files that don't need to be moved.
isitemmoved = lambda item: item.path != item.destination(basedir=dest)
isalbummoved = lambda album: any(isitemmoved(i) for i in album.items())
objs = [o for o in objs if (isalbummoved if album else isitemmoved)(o)]
num_unmoved = num_objs - len(objs)
# Report unmoved files that match the query.
unmoved_msg = u''
if num_unmoved > 0:
unmoved_msg = u' ({} already in place)'.format(num_unmoved)
copy = copy or export # Exporting always copies.
action = u'Copying' if copy else u'Moving'
act = u'copy' if copy else u'move'
entity = u'album' if album else u'item'
log.info(u'{0} {1} {2}{3}.', action, len(objs), entity,
u's' if len(objs) != 1 else u'')
log.info(u'{0} {1} {2}{3}{4}.', action, len(objs), entity,
u's' if len(objs) != 1 else u'', unmoved_msg)
if not objs:
return

View file

@ -24,6 +24,7 @@ import re
import shutil
import fnmatch
from collections import Counter
from multiprocessing.pool import ThreadPool
import traceback
import subprocess
import platform
@ -1009,3 +1010,24 @@ def asciify_path(path, sep_replace):
sep_replace
)
return os.sep.join(path_components)
def par_map(transform, items):
"""Apply the function `transform` to all the elements in the
iterable `items`, like `map(transform, items)` but with no return
value. The map *might* happen in parallel: it's parallel on Python 3
and sequential on Python 2.
The parallelism uses threads (not processes), so this is only useful
for IO-bound `transform`s.
"""
if sys.version_info[0] < 3:
# multiprocessing.pool.ThreadPool does not seem to work on
# Python 2. We could consider switching to futures instead.
for item in items:
transform(item)
else:
pool = ThreadPool()
pool.map(transform, items)
pool.close()
pool.join()

View file

@ -24,9 +24,7 @@ import json
import os
import subprocess
import tempfile
import sys
from multiprocessing.pool import ThreadPool
from distutils.spawn import find_executable
import requests
@ -106,15 +104,7 @@ class AcousticBrainzSubmitPlugin(plugins.BeetsPlugin):
def command(self, lib, opts, args):
# Get items from arguments
items = lib.items(ui.decargs(args))
if sys.version_info[0] < 3:
for item in items:
self.analyze_submit(item)
else:
# Analyze in parallel using a thread pool.
pool = ThreadPool()
pool.map(self.analyze_submit, items)
pool.close()
pool.join()
util.par_map(self.analyze_submit, items)
def analyze_submit(self, item):
analysis = self._get_analysis(item)

View file

@ -18,16 +18,17 @@
from __future__ import division, absolute_import, print_function
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand
from beets.util import displayable_path, confit
from beets import ui
from subprocess import check_output, CalledProcessError, list2cmdline, STDOUT
import shlex
import os
import errno
import sys
import six
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand
from beets.util import displayable_path, confit, par_map
from beets import ui
class CheckerCommandException(Exception):
@ -48,6 +49,10 @@ class CheckerCommandException(Exception):
class BadFiles(BeetsPlugin):
def __init__(self):
super(BadFiles, self).__init__()
self.verbose = False
def run_command(self, cmd):
self._log.debug(u"running command: {}",
displayable_path(list2cmdline(cmd)))
@ -61,7 +66,7 @@ class BadFiles(BeetsPlugin):
status = e.returncode
except OSError as e:
raise CheckerCommandException(cmd, e)
output = output.decode(sys.getfilesystemencoding())
output = output.decode(sys.getdefaultencoding(), 'replace')
return status, errors, [line for line in output.split("\n") if line]
def check_mp3val(self, path):
@ -89,56 +94,60 @@ class BadFiles(BeetsPlugin):
command = None
if command:
return self.check_custom(command)
elif ext == "mp3":
if ext == "mp3":
return self.check_mp3val
elif ext == "flac":
if ext == "flac":
return self.check_flac
def check_bad(self, lib, opts, args):
for item in lib.items(ui.decargs(args)):
def check_item(self, item):
# First, check whether the path exists. If not, the user
# should probably run `beet update` to cleanup your library.
dpath = displayable_path(item.path)
self._log.debug(u"checking path: {}", dpath)
if not os.path.exists(item.path):
ui.print_(u"{}: file does not exist".format(
ui.colorize('text_error', dpath)))
# First, check whether the path exists. If not, the user
# should probably run `beet update` to cleanup your library.
dpath = displayable_path(item.path)
self._log.debug(u"checking path: {}", dpath)
if not os.path.exists(item.path):
ui.print_(u"{}: file does not exist".format(
ui.colorize('text_error', dpath)))
# Run the checker against the file if one is found
ext = os.path.splitext(item.path)[1][1:].decode('utf8', 'ignore')
checker = self.get_checker(ext)
if not checker:
self._log.error(u"no checker specified in the config for {}",
ext)
return
path = item.path
if not isinstance(path, six.text_type):
path = item.path.decode(sys.getfilesystemencoding())
try:
status, errors, output = checker(path)
except CheckerCommandException as e:
if e.errno == errno.ENOENT:
self._log.error(
u"command not found: {} when validating file: {}",
e.checker,
e.path
)
else:
self._log.error(u"error invoking {}: {}", e.checker, e.msg)
return
if status > 0:
ui.print_(u"{}: checker exited with status {}"
.format(ui.colorize('text_error', dpath), status))
for line in output:
ui.print_(u" {}".format(line))
elif errors > 0:
ui.print_(u"{}: checker found {} errors or warnings"
.format(ui.colorize('text_warning', dpath), errors))
for line in output:
ui.print_(u" {}".format(line))
elif self.verbose:
ui.print_(u"{}: ok".format(ui.colorize('text_success', dpath)))
# Run the checker against the file if one is found
ext = os.path.splitext(item.path)[1][1:].decode('utf8', 'ignore')
checker = self.get_checker(ext)
if not checker:
self._log.error(u"no checker specified in the config for {}",
ext)
continue
path = item.path
if not isinstance(path, six.text_type):
path = item.path.decode(sys.getfilesystemencoding())
try:
status, errors, output = checker(path)
except CheckerCommandException as e:
if e.errno == errno.ENOENT:
self._log.error(
u"command not found: {} when validating file: {}",
e.checker,
e.path
)
else:
self._log.error(u"error invoking {}: {}", e.checker, e.msg)
continue
if status > 0:
ui.print_(u"{}: checker exited with status {}"
.format(ui.colorize('text_error', dpath), status))
for line in output:
ui.print_(u" {}".format(displayable_path(line)))
elif errors > 0:
ui.print_(u"{}: checker found {} errors or warnings"
.format(ui.colorize('text_warning', dpath), errors))
for line in output:
ui.print_(u" {}".format(displayable_path(line)))
elif opts.verbose:
ui.print_(u"{}: ok".format(ui.colorize('text_success', dpath)))
def command(self, lib, opts, args):
# Get items from arguments
items = lib.items(ui.decargs(args))
self.verbose = opts.verbose
par_map(self.check_item, items)
def commands(self):
bad_command = Subcommand('bad',
@ -148,5 +157,5 @@ class BadFiles(BeetsPlugin):
action='store_true', default=False, dest='verbose',
help=u'view results for both the bad and uncorrupted files'
)
bad_command.func = self.check_bad
bad_command.func = self.command
return [bad_command]

View file

@ -93,6 +93,7 @@ def acoustid_match(log, path):
log.error(u'fingerprinting of {0} failed: {1}',
util.displayable_path(repr(path)), exc)
return None
fp = fp.decode()
_fingerprints[path] = fp
try:
res = acoustid.lookup(API_KEY, fp, duration,
@ -334,7 +335,7 @@ def fingerprint_item(log, item, write=False):
util.displayable_path(item.path))
try:
_, fp = acoustid.fingerprint_file(util.syspath(item.path))
item.acoustid_fingerprint = fp
item.acoustid_fingerprint = fp.decode()
if write:
log.info(u'{0}: writing fingerprint',
util.displayable_path(item.path))

View file

@ -116,6 +116,7 @@ class ConvertPlugin(BeetsPlugin):
u'pretend': False,
u'threads': util.cpu_count(),
u'format': u'mp3',
u'id3v23': u'inherit',
u'formats': {
u'aac': {
u'command': u'ffmpeg -i $source -y -vn -acodec aac '
@ -316,8 +317,12 @@ class ConvertPlugin(BeetsPlugin):
if pretend:
continue
id3v23 = self.config['id3v23'].as_choice([True, False, 'inherit'])
if id3v23 == 'inherit':
id3v23 = None
# Write tags from the database to the converted file.
item.try_write(path=converted)
item.try_write(path=converted, id3v23=id3v23)
if keep_new:
# If we're keeping the transcoded file, read it again (after
@ -332,7 +337,7 @@ class ConvertPlugin(BeetsPlugin):
self._log.debug(u'embedding album art from {}',
util.displayable_path(album.artpath))
art.embed_item(self._log, item, album.artpath,
itempath=converted)
itempath=converted, id3v23=id3v23)
if keep_new:
plugins.send('after_convert', item=item,

View file

@ -61,6 +61,8 @@ class DiscogsPlugin(BeetsPlugin):
self.config['user_token'].redact = True
self.discogs_client = None
self.register_listener('import_begin', self.setup)
self.rate_limit_per_minute = 25
self.last_request_timestamp = 0
def setup(self, session=None):
"""Create the `discogs_client` field. Authenticate if necessary.
@ -71,6 +73,9 @@ class DiscogsPlugin(BeetsPlugin):
# Try using a configured user token (bypassing OAuth login).
user_token = self.config['user_token'].as_str()
if user_token:
# The rate limit for authenticated users goes up to 60
# requests per minute.
self.rate_limit_per_minute = 60
self.discogs_client = Client(USER_AGENT, user_token=user_token)
return
@ -88,6 +93,26 @@ class DiscogsPlugin(BeetsPlugin):
self.discogs_client = Client(USER_AGENT, c_key, c_secret,
token, secret)
def _time_to_next_request(self):
seconds_between_requests = 60 / self.rate_limit_per_minute
seconds_since_last_request = time.time() - self.last_request_timestamp
seconds_to_wait = seconds_between_requests - seconds_since_last_request
return seconds_to_wait
def request_start(self):
"""wait for rate limit if needed
"""
time_to_next_request = self._time_to_next_request()
if time_to_next_request > 0:
self._log.debug('hit rate limit, waiting for {0} seconds',
time_to_next_request)
time.sleep(time_to_next_request)
def request_finished(self):
"""update timestamp for rate limiting
"""
self.last_request_timestamp = time.time()
def reset_auth(self):
"""Delete token file & redo the auth steps.
"""
@ -206,9 +231,13 @@ class DiscogsPlugin(BeetsPlugin):
# Strip medium information from query, Things like "CD1" and "disk 1"
# can also negate an otherwise positive result.
query = re.sub(br'(?i)\b(CD|disc)\s*\d+', b'', query)
self.request_start()
try:
releases = self.discogs_client.search(query,
type='release').page(1)
self.request_finished()
except CONNECTION_ERRORS:
self._log.debug(u"Communication error while searching for {0!r}",
query, exc_info=True)
@ -222,8 +251,11 @@ class DiscogsPlugin(BeetsPlugin):
"""
self._log.debug(u'Searching for master release {0}', master_id)
result = Master(self.discogs_client, {'id': master_id})
self.request_start()
try:
year = result.fetch('year')
self.request_finished()
return year
except DiscogsAPIError as e:
if e.status_code != 404:
@ -286,7 +318,7 @@ class DiscogsPlugin(BeetsPlugin):
if va:
artist = config['va_name'].as_str()
if catalogno == 'none':
catalogno = None
catalogno = None
# Explicitly set the `media` for the tracks, since it is expected by
# `autotag.apply_metadata`, and set `medium_total`.
for track in tracks:

View file

@ -189,7 +189,7 @@ class EmbedCoverArtPlugin(BeetsPlugin):
def remove_artfile(self, album):
"""Possibly delete the album art file for an album (if the
appropriate configuration option is enabled.
appropriate configuration option is enabled).
"""
if self.config['remove_art_file'] and album.artpath:
if os.path.isfile(album.artpath):

View file

@ -31,7 +31,7 @@ from beets import util
from beets import config
from beets.mediafile import image_mime_type
from beets.util.artresizer import ArtResizer
from beets.util import confit
from beets.util import confit, sorted_walk
from beets.util import syspath, bytestring_path, py3_path
import six
@ -365,12 +365,17 @@ class GoogleImages(RemoteArtSource):
if not (album.albumartist and album.album):
return
search_string = (album.albumartist + ',' + album.album).encode('utf-8')
response = self.request(self.URL, params={
'key': self.key,
'cx': self.cx,
'q': search_string,
'searchType': 'image'
})
try:
response = self.request(self.URL, params={
'key': self.key,
'cx': self.cx,
'q': search_string,
'searchType': 'image'
})
except requests.RequestException:
self._log.debug(u'google: error receiving response')
return
# Get results using JSON.
try:
@ -406,10 +411,14 @@ class FanartTV(RemoteArtSource):
if not album.mb_releasegroupid:
return
response = self.request(
self.API_ALBUMS + album.mb_releasegroupid,
headers={'api-key': self.PROJECT_KEY,
'client-key': self.client_key})
try:
response = self.request(
self.API_ALBUMS + album.mb_releasegroupid,
headers={'api-key': self.PROJECT_KEY,
'client-key': self.client_key})
except requests.RequestException:
self._log.debug(u'fanart.tv: error receiving response')
return
try:
data = response.json()
@ -545,16 +554,22 @@ class Wikipedia(RemoteArtSource):
# Find the name of the cover art filename on DBpedia
cover_filename, page_id = None, None
dbpedia_response = self.request(
self.DBPEDIA_URL,
params={
'format': 'application/sparql-results+json',
'timeout': 2500,
'query': self.SPARQL_QUERY.format(
artist=album.albumartist.title(), album=album.album)
},
headers={'content-type': 'application/json'},
)
try:
dbpedia_response = self.request(
self.DBPEDIA_URL,
params={
'format': 'application/sparql-results+json',
'timeout': 2500,
'query': self.SPARQL_QUERY.format(
artist=album.albumartist.title(), album=album.album)
},
headers={'content-type': 'application/json'},
)
except requests.RequestException:
self._log.debug(u'dbpedia: error receiving response')
return
try:
data = dbpedia_response.json()
results = data['results']['bindings']
@ -584,17 +599,21 @@ class Wikipedia(RemoteArtSource):
lpart, rpart = cover_filename.rsplit(' .', 1)
# Query all the images in the page
wikipedia_response = self.request(
self.WIKIPEDIA_URL,
params={
'format': 'json',
'action': 'query',
'continue': '',
'prop': 'images',
'pageids': page_id,
},
headers={'content-type': 'application/json'},
)
try:
wikipedia_response = self.request(
self.WIKIPEDIA_URL,
params={
'format': 'json',
'action': 'query',
'continue': '',
'prop': 'images',
'pageids': page_id,
},
headers={'content-type': 'application/json'},
)
except requests.RequestException:
self._log.debug(u'wikipedia: error receiving response')
return
# Try to see if one of the images on the pages matches our
# incomplete cover_filename
@ -613,18 +632,22 @@ class Wikipedia(RemoteArtSource):
return
# Find the absolute url of the cover art on Wikipedia
wikipedia_response = self.request(
self.WIKIPEDIA_URL,
params={
'format': 'json',
'action': 'query',
'continue': '',
'prop': 'imageinfo',
'iiprop': 'url',
'titles': cover_filename.encode('utf-8'),
},
headers={'content-type': 'application/json'},
)
try:
wikipedia_response = self.request(
self.WIKIPEDIA_URL,
params={
'format': 'json',
'action': 'query',
'continue': '',
'prop': 'imageinfo',
'iiprop': 'url',
'titles': cover_filename.encode('utf-8'),
},
headers={'content-type': 'application/json'},
)
except requests.RequestException:
self._log.debug(u'wikipedia: error receiving response')
return
try:
data = wikipedia_response.json()
@ -666,12 +689,16 @@ class FileSystem(LocalArtSource):
# Find all files that look like images in the directory.
images = []
for fn in os.listdir(syspath(path)):
fn = bytestring_path(fn)
for ext in IMAGE_EXTENSIONS:
if fn.lower().endswith(b'.' + ext) and \
os.path.isfile(syspath(os.path.join(path, fn))):
images.append(fn)
ignore = config['ignore'].as_str_seq()
ignore_hidden = config['ignore_hidden'].get(bool)
for _, _, files in sorted_walk(path, ignore=ignore,
ignore_hidden=ignore_hidden):
for fn in files:
fn = bytestring_path(fn)
for ext in IMAGE_EXTENSIONS:
if fn.lower().endswith(b'.' + ext) and \
os.path.isfile(syspath(os.path.join(path, fn))):
images.append(fn)
# Look for "preferred" filenames.
images = sorted(images,

View file

@ -43,8 +43,8 @@ class FileFilterPlugin(BeetsPlugin):
bytestring_path(self.config['album_path'].get()))
if 'singleton_path' in self.config:
self.path_singleton_regex = re.compile(
bytestring_path(self.config['singleton_path'].get()))
self.path_singleton_regex = re.compile(
bytestring_path(self.config['singleton_path'].get()))
def import_task_created_event(self, session, task):
if task.items and len(task.items) > 0:

View file

@ -18,7 +18,6 @@ from __future__ import division, absolute_import, print_function
import string
import subprocess
import six
from beets.plugins import BeetsPlugin
from beets.util import shlex_split, arg_encoding
@ -46,10 +45,8 @@ class CodingFormatter(string.Formatter):
See str.format and string.Formatter.format.
"""
try:
if isinstance(format_string, bytes):
format_string = format_string.decode(self._coding)
except UnicodeEncodeError:
pass
return super(CodingFormatter, self).format(format_string, *args,
**kwargs)
@ -91,28 +88,25 @@ class HookPlugin(BeetsPlugin):
def create_and_register_hook(self, event, command):
def hook_function(**kwargs):
if command is None or len(command) == 0:
self._log.error('invalid command "{0}"', command)
return
if command is None or len(command) == 0:
self._log.error('invalid command "{0}"', command)
return
# Use a string formatter that works on Unicode strings.
if six.PY2:
formatter = CodingFormatter(arg_encoding())
else:
formatter = string.Formatter()
# Use a string formatter that works on Unicode strings.
formatter = CodingFormatter(arg_encoding())
command_pieces = shlex_split(command)
command_pieces = shlex_split(command)
for i, piece in enumerate(command_pieces):
command_pieces[i] = formatter.format(piece, event=event,
**kwargs)
for i, piece in enumerate(command_pieces):
command_pieces[i] = formatter.format(piece, event=event,
**kwargs)
self._log.debug(u'running command "{0}" for event {1}',
u' '.join(command_pieces), event)
self._log.debug(u'running command "{0}" for event {1}',
u' '.join(command_pieces), event)
try:
subprocess.Popen(command_pieces).wait()
except OSError as exc:
self._log.error(u'hook for {0} failed: {1}', event, exc)
try:
subprocess.Popen(command_pieces).wait()
except OSError as exc:
self._log.error(u'hook for {0} failed: {1}', event, exc)
self.register_listener(event, hook_function)

View file

@ -32,6 +32,7 @@ class IPFSPlugin(BeetsPlugin):
super(IPFSPlugin, self).__init__()
self.config.add({
'auto': True,
'nocopy': False,
})
if self.config['auto']:
@ -116,7 +117,10 @@ class IPFSPlugin(BeetsPlugin):
self._log.info('Adding {0} to ipfs', album_dir)
cmd = "ipfs add -q -r".split()
if self.config['nocopy']:
cmd = "ipfs add --nocopy -q -r".split()
else:
cmd = "ipfs add -q -r".split()
cmd.append(album_dir)
try:
output = util.command_output(cmd).split()
@ -174,7 +178,10 @@ class IPFSPlugin(BeetsPlugin):
with tempfile.NamedTemporaryFile() as tmp:
self.ipfs_added_albums(lib, tmp.name)
try:
cmd = "ipfs add -q ".split()
if self.config['nocopy']:
cmd = "ipfs add --nocopy -q ".split()
else:
cmd = "ipfs add -q ".split()
cmd.append(tmp.name)
output = util.command_output(cmd)
except (OSError, subprocess.CalledProcessError) as err:

181
beetsplug/playlist.py Normal file
View file

@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
# This file is part of beets.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
from __future__ import division, absolute_import, print_function
import os
import fnmatch
import tempfile
import beets
class PlaylistQuery(beets.dbcore.Query):
"""Matches files listed by a playlist file.
"""
def __init__(self, pattern):
self.pattern = pattern
config = beets.config['playlist']
# Get the full path to the playlist
playlist_paths = (
pattern,
os.path.abspath(os.path.join(
config['playlist_dir'].as_filename(),
'{0}.m3u'.format(pattern),
)),
)
self.paths = []
for playlist_path in playlist_paths:
if not fnmatch.fnmatch(playlist_path, '*.[mM]3[uU]'):
# This is not am M3U playlist, skip this candidate
continue
try:
f = open(beets.util.syspath(playlist_path), mode='rb')
except (OSError, IOError):
continue
if config['relative_to'].get() == 'library':
relative_to = beets.config['directory'].as_filename()
elif config['relative_to'].get() == 'playlist':
relative_to = os.path.dirname(playlist_path)
else:
relative_to = config['relative_to'].as_filename()
relative_to = beets.util.bytestring_path(relative_to)
for line in f:
if line[0] == '#':
# ignore comments, and extm3u extension
continue
self.paths.append(beets.util.normpath(
os.path.join(relative_to, line.rstrip())
))
f.close()
break
def col_clause(self):
if not self.paths:
# Playlist is empty
return '0', ()
clause = 'path IN ({0})'.format(', '.join('?' for path in self.paths))
return clause, (beets.library.BLOB_TYPE(p) for p in self.paths)
def match(self, item):
return item.path in self.paths
class PlaylistPlugin(beets.plugins.BeetsPlugin):
item_queries = {'playlist': PlaylistQuery}
def __init__(self):
super(PlaylistPlugin, self).__init__()
self.config.add({
'auto': False,
'playlist_dir': '.',
'relative_to': 'library',
})
self.playlist_dir = self.config['playlist_dir'].as_filename()
self.changes = {}
if self.config['relative_to'].get() == 'library':
self.relative_to = beets.util.bytestring_path(
beets.config['directory'].as_filename())
elif self.config['relative_to'].get() != 'playlist':
self.relative_to = beets.util.bytestring_path(
self.config['relative_to'].as_filename())
else:
self.relative_to = None
if self.config['auto']:
self.register_listener('item_moved', self.item_moved)
self.register_listener('item_removed', self.item_removed)
self.register_listener('cli_exit', self.cli_exit)
def item_moved(self, item, source, destination):
self.changes[source] = destination
def item_removed(self, item):
if not os.path.exists(beets.util.syspath(item.path)):
self.changes[item.path] = None
def cli_exit(self, lib):
for playlist in self.find_playlists():
self._log.info('Updating playlist: {0}'.format(playlist))
base_dir = beets.util.bytestring_path(
self.relative_to if self.relative_to
else os.path.dirname(playlist)
)
try:
self.update_playlist(playlist, base_dir)
except beets.util.FilesystemError:
self._log.error('Failed to update playlist: {0}'.format(
beets.util.displayable_path(playlist)))
def find_playlists(self):
"""Find M3U playlists in the playlist directory."""
try:
dir_contents = os.listdir(beets.util.syspath(self.playlist_dir))
except OSError:
self._log.warning('Unable to open playlist directory {0}'.format(
beets.util.displayable_path(self.playlist_dir)))
return
for filename in dir_contents:
if fnmatch.fnmatch(filename, '*.[mM]3[uU]'):
yield os.path.join(self.playlist_dir, filename)
def update_playlist(self, filename, base_dir):
"""Find M3U playlists in the specified directory."""
changes = 0
deletions = 0
with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as tempfp:
new_playlist = tempfp.name
with open(filename, mode='rb') as fp:
for line in fp:
original_path = line.rstrip(b'\r\n')
# Ensure that path from playlist is absolute
is_relative = not os.path.isabs(line)
if is_relative:
lookup = os.path.join(base_dir, original_path)
else:
lookup = original_path
try:
new_path = self.changes[beets.util.normpath(lookup)]
except KeyError:
tempfp.write(line)
else:
if new_path is None:
# Item has been deleted
deletions += 1
continue
changes += 1
if is_relative:
new_path = os.path.relpath(new_path, base_dir)
tempfp.write(line.replace(original_path, new_path))
if changes or deletions:
self._log.info(
'Updated playlist {0} ({1} changes, {2} deletions)'.format(
filename, changes, deletions))
beets.util.copy(new_playlist, filename, replace=True)
beets.util.remove(new_playlist)

View file

@ -935,10 +935,10 @@ class ReplayGainPlugin(BeetsPlugin):
if (any([self.should_use_r128(item) for item in album.items()]) and not
all(([self.should_use_r128(item) for item in album.items()]))):
raise ReplayGainError(
u"Mix of ReplayGain and EBU R128 detected"
u" for some tracks in album {0}".format(album)
)
raise ReplayGainError(
u"Mix of ReplayGain and EBU R128 detected"
u" for some tracks in album {0}".format(album)
)
if any([self.should_use_r128(item) for item in album.items()]):
if self.r128_backend_instance == '':

View file

@ -3,59 +3,452 @@
from __future__ import division, absolute_import, print_function
import re
import json
import base64
import webbrowser
import collections
import six
import unidecode
import requests
from beets.plugins import BeetsPlugin
from beets.ui import decargs
from beets import ui
from requests.exceptions import HTTPError
from beets.plugins import BeetsPlugin
from beets.util import confit
from beets.autotag.hooks import AlbumInfo, TrackInfo, Distance
class SpotifyPlugin(BeetsPlugin):
# URL for the Web API of Spotify
# Documentation here: https://developer.spotify.com/web-api/search-item/
base_url = "https://api.spotify.com/v1/search"
open_url = "http://open.spotify.com/track/"
playlist_partial = "spotify:trackset:Playlist:"
# Base URLs for the Spotify API
# Documentation: https://developer.spotify.com/web-api
oauth_token_url = 'https://accounts.spotify.com/api/token'
open_track_url = 'http://open.spotify.com/track/'
search_url = 'https://api.spotify.com/v1/search'
album_url = 'https://api.spotify.com/v1/albums/'
track_url = 'https://api.spotify.com/v1/tracks/'
playlist_partial = 'spotify:trackset:Playlist:'
def __init__(self):
super(SpotifyPlugin, self).__init__()
self.config.add({
'mode': 'list',
'tiebreak': 'popularity',
'show_failures': False,
'artist_field': 'albumartist',
'album_field': 'album',
'track_field': 'title',
'region_filter': None,
'regex': []
})
self.config.add(
{
'mode': 'list',
'tiebreak': 'popularity',
'show_failures': False,
'artist_field': 'albumartist',
'album_field': 'album',
'track_field': 'title',
'region_filter': None,
'regex': [],
'client_id': '4e414367a1d14c75a5c5129a627fcab8',
'client_secret': 'f82bdc09b2254f1a8286815d02fd46dc',
'tokenfile': 'spotify_token.json',
'source_weight': 0.5,
}
)
self.config['client_secret'].redact = True
self.tokenfile = self.config['tokenfile'].get(
confit.Filename(in_app_dir=True)
) # Path to the JSON file for storing the OAuth access token.
self.setup()
def setup(self):
"""Retrieve previously saved OAuth token or generate a new one."""
try:
with open(self.tokenfile) as f:
token_data = json.load(f)
except IOError:
self._authenticate()
else:
self.access_token = token_data['access_token']
def _authenticate(self):
"""Request an access token via the Client Credentials Flow:
https://developer.spotify.com/documentation/general/guides/authorization-guide/#client-credentials-flow
"""
headers = {
'Authorization': 'Basic {}'.format(
base64.b64encode(
':'.join(
self.config[k].as_str()
for k in ('client_id', 'client_secret')
).encode()
).decode()
)
}
response = requests.post(
self.oauth_token_url,
data={'grant_type': 'client_credentials'},
headers=headers,
)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise ui.UserError(
u'Spotify authorization failed: {}\n{}'.format(
e, response.text
)
)
self.access_token = response.json()['access_token']
# Save the token for later use.
self._log.debug(u'Spotify access token: {}', self.access_token)
with open(self.tokenfile, 'w') as f:
json.dump({'access_token': self.access_token}, f)
def _handle_response(self, request_type, url, params=None):
"""Send a request, reauthenticating if necessary.
:param request_type: Type of :class:`Request` constructor,
e.g. ``requests.get``, ``requests.post``, etc.
:type request_type: function
:param url: URL for the new :class:`Request` object.
:type url: str
:param params: (optional) list of tuples or bytes to send
in the query string for the :class:`Request`.
:type params: dict
:return: JSON data for the class:`Response <Response>` object.
:rtype: dict
"""
response = request_type(
url,
headers={'Authorization': 'Bearer {}'.format(self.access_token)},
params=params,
)
if response.status_code != 200:
if u'token expired' in response.text:
self._log.debug(
'Spotify access token has expired. Reauthenticating.'
)
self._authenticate()
return self._handle_response(request_type, url, params=params)
else:
raise ui.UserError(u'Spotify API error:\n{}', response.text)
return response.json()
def _get_spotify_id(self, url_type, id_):
"""Parse a Spotify ID from its URL if necessary.
:param url_type: Type of Spotify URL, either 'album' or 'track'.
:type url_type: str
:param id_: Spotify ID or URL.
:type id_: str
:return: Spotify ID.
:rtype: str
"""
# Spotify IDs consist of 22 alphanumeric characters
# (zero-left-padded base62 representation of randomly generated UUID4)
id_regex = r'(^|open\.spotify\.com/{}/)([0-9A-Za-z]{{22}})'
self._log.debug(u'Searching for {} {}', url_type, id_)
match = re.search(id_regex.format(url_type), id_)
return match.group(2) if match else None
def album_for_id(self, album_id):
"""Fetch an album by its Spotify ID or URL and return an
AlbumInfo object or None if the album is not found.
:param album_id: Spotify ID or URL for the album
:type album_id: str
:return: AlbumInfo object for album
:rtype: beets.autotag.hooks.AlbumInfo or None
"""
spotify_id = self._get_spotify_id('album', album_id)
if spotify_id is None:
return None
response_data = self._handle_response(
requests.get, self.album_url + spotify_id
)
artist, artist_id = self._get_artist(response_data['artists'])
date_parts = [
int(part) for part in response_data['release_date'].split('-')
]
release_date_precision = response_data['release_date_precision']
if release_date_precision == 'day':
year, month, day = date_parts
elif release_date_precision == 'month':
year, month = date_parts
day = None
elif release_date_precision == 'year':
year = date_parts
month = None
day = None
else:
raise ui.UserError(
u"Invalid `release_date_precision` returned "
u"by Spotify API: '{}'".format(release_date_precision)
)
tracks = []
medium_totals = collections.defaultdict(int)
for i, track_data in enumerate(response_data['tracks']['items']):
track = self._get_track(track_data)
track.index = i + 1
medium_totals[track.medium] += 1
tracks.append(track)
for track in tracks:
track.medium_total = medium_totals[track.medium]
return AlbumInfo(
album=response_data['name'],
album_id=spotify_id,
artist=artist,
artist_id=artist_id,
tracks=tracks,
albumtype=response_data['album_type'],
va=len(response_data['artists']) == 1
and artist.lower() == 'various artists',
year=year,
month=month,
day=day,
label=response_data['label'],
mediums=max(medium_totals.keys()),
data_source='Spotify',
data_url=response_data['external_urls']['spotify'],
)
def _get_track(self, track_data):
"""Convert a Spotify track object dict to a TrackInfo object.
:param track_data: Simplified track object
(https://developer.spotify.com/documentation/web-api/reference/object-model/#track-object-simplified)
:type track_data: dict
:return: TrackInfo object for track
:rtype: beets.autotag.hooks.TrackInfo
"""
artist, artist_id = self._get_artist(track_data['artists'])
return TrackInfo(
title=track_data['name'],
track_id=track_data['id'],
artist=artist,
artist_id=artist_id,
length=track_data['duration_ms'] / 1000,
index=track_data['track_number'],
medium=track_data['disc_number'],
medium_index=track_data['track_number'],
data_source='Spotify',
data_url=track_data['external_urls']['spotify'],
)
def track_for_id(self, track_id=None, track_data=None):
"""Fetch a track by its Spotify ID or URL and return a
TrackInfo object or None if the track is not found.
:param track_id: (Optional) Spotify ID or URL for the track. Either
``track_id`` or ``track_data`` must be provided.
:type track_id: str
:param track_data: (Optional) Simplified track object dict. May be
provided instead of ``track_id`` to avoid unnecessary API calls.
:type track_data: dict
:return: TrackInfo object for track
:rtype: beets.autotag.hooks.TrackInfo or None
"""
if track_data is None:
spotify_id = self._get_spotify_id('track', track_id)
if spotify_id is None:
return None
track_data = self._handle_response(
requests.get, self.track_url + spotify_id
)
track = self._get_track(track_data)
# Get album's tracks to set `track.index` (position on the entire
# release) and `track.medium_total` (total number of tracks on
# the track's disc).
album_data = self._handle_response(
requests.get, self.album_url + track_data['album']['id']
)
medium_total = 0
for i, track_data in enumerate(album_data['tracks']['items']):
if track_data['disc_number'] == track.medium:
medium_total += 1
if track_data['id'] == track.track_id:
track.index = i + 1
track.medium_total = medium_total
return track
@staticmethod
def _get_artist(artists):
"""Returns an artist string (all artists) and an artist_id (the main
artist) for a list of Spotify artist object dicts.
:param artists: Iterable of simplified Spotify artist objects
(https://developer.spotify.com/documentation/web-api/reference/object-model/#artist-object-simplified)
:type artists: list[dict]
:return: Normalized artist string
:rtype: str
"""
artist_id = None
artist_names = []
for artist in artists:
if not artist_id:
artist_id = artist['id']
name = artist['name']
# Move articles to the front.
name = re.sub(r'^(.*?), (a|an|the)$', r'\2 \1', name, flags=re.I)
artist_names.append(name)
artist = ', '.join(artist_names).replace(' ,', ',') or None
return artist, artist_id
def album_distance(self, items, album_info, mapping):
"""Returns the Spotify source weight and the maximum source weight
for albums.
"""
dist = Distance()
if album_info.data_source == 'Spotify':
dist.add('source', self.config['source_weight'].as_number())
return dist
def track_distance(self, item, track_info):
"""Returns the Spotify source weight and the maximum source weight
for individual tracks.
"""
dist = Distance()
if track_info.data_source == 'Spotify':
dist.add('source', self.config['source_weight'].as_number())
return dist
def candidates(self, items, artist, album, va_likely):
"""Returns a list of AlbumInfo objects for Spotify Search API results
matching an ``album`` and ``artist`` (if not various).
:param items: List of items comprised by an album to be matched.
:type items: list[beets.library.Item]
:param artist: The artist of the album to be matched.
:type artist: str
:param album: The name of the album to be matched.
:type album: str
:param va_likely: True if the album to be matched likely has
Various Artists.
:type va_likely: bool
:return: Candidate AlbumInfo objects.
:rtype: list[beets.autotag.hooks.AlbumInfo]
"""
query_filters = {'album': album}
if not va_likely:
query_filters['artist'] = artist
response_data = self._search_spotify(
query_type='album', filters=query_filters
)
if response_data is None:
return []
return [
self.album_for_id(album_id=album_data['id'])
for album_data in response_data['albums']['items']
]
def item_candidates(self, item, artist, title):
"""Returns a list of TrackInfo objects for Spotify Search API results
matching ``title`` and ``artist``.
:param item: Singleton item to be matched.
:type item: beets.library.Item
:param artist: The artist of the track to be matched.
:type artist: str
:param title: The title of the track to be matched.
:type title: str
:return: Candidate TrackInfo objects.
:rtype: list[beets.autotag.hooks.TrackInfo]
"""
response_data = self._search_spotify(
query_type='track', keywords=title, filters={'artist': artist}
)
if response_data is None:
return []
return [
self.track_for_id(track_data=track_data)
for track_data in response_data['tracks']['items']
]
@staticmethod
def _construct_search_query(filters=None, keywords=''):
"""Construct a query string with the specified filters and keywords to
be provided to the Spotify Search API
(https://developer.spotify.com/documentation/web-api/reference/search/search/#writing-a-query---guidelines).
:param filters: (Optional) Field filters to apply.
:type filters: dict
:param keywords: (Optional) Query keywords to use.
:type keywords: str
:return: Query string to be provided to the Search API.
:rtype: str
"""
query_components = [
keywords,
' '.join(':'.join((k, v)) for k, v in filters.items()),
]
query = ' '.join([q for q in query_components if q])
if not isinstance(query, six.text_type):
query = query.decode('utf8')
return unidecode.unidecode(query)
def _search_spotify(self, query_type, filters=None, keywords=''):
"""Query the Spotify Search API for the specified ``keywords``, applying
the provided ``filters``.
:param query_type: A comma-separated list of item types to search
across. Valid types are: 'album', 'artist', 'playlist', and
'track'. Search results include hits from all the specified item
types.
:type query_type: str
:param filters: (Optional) Field filters to apply.
:type filters: dict
:param keywords: (Optional) Query keywords to use.
:type keywords: str
:return: JSON data for the class:`Response <Response>` object or None
if no search results are returned.
:rtype: dict or None
"""
query = self._construct_search_query(
keywords=keywords, filters=filters
)
if not query:
return None
self._log.debug(u"Searching Spotify for '{}'".format(query))
response_data = self._handle_response(
requests.get,
self.search_url,
params={'q': query, 'type': query_type},
)
num_results = 0
for result_type_data in response_data.values():
num_results += len(result_type_data['items'])
self._log.debug(
u"Found {} results from Spotify for '{}'", num_results, query
)
return response_data if num_results > 0 else None
def commands(self):
def queries(lib, opts, args):
success = self.parse_opts(opts)
success = self._parse_opts(opts)
if success:
results = self.query_spotify(lib, decargs(args))
self.output_results(results)
results = self._match_library_tracks(lib, ui.decargs(args))
self._output_match_results(results)
spotify_cmd = ui.Subcommand(
'spotify',
help=u'build a Spotify playlist'
'spotify', help=u'build a Spotify playlist'
)
spotify_cmd.parser.add_option(
u'-m', u'--mode', action='store',
u'-m',
u'--mode',
action='store',
help=u'"open" to open Spotify with playlist, '
u'"list" to print (default)'
u'"list" to print (default)',
)
spotify_cmd.parser.add_option(
u'-f', u'--show-failures',
action='store_true', dest='show_failures',
help=u'list tracks that did not match a Spotify ID'
u'-f',
u'--show-failures',
action='store_true',
dest='show_failures',
help=u'list tracks that did not match a Spotify ID',
)
spotify_cmd.func = queries
return [spotify_cmd]
def parse_opts(self, opts):
def _parse_opts(self, opts):
if opts.mode:
self.config['mode'].set(opts.mode)
@ -63,35 +456,46 @@ class SpotifyPlugin(BeetsPlugin):
self.config['show_failures'].set(True)
if self.config['mode'].get() not in ['list', 'open']:
self._log.warning(u'{0} is not a valid mode',
self.config['mode'].get())
self._log.warning(
u'{0} is not a valid mode', self.config['mode'].get()
)
return False
self.opts = opts
return True
def query_spotify(self, lib, query):
def _match_library_tracks(self, library, keywords):
"""Get a list of simplified track object dicts for library tracks
matching the specified ``keywords``.
:param library: beets library object to query.
:type library: beets.library.Library
:param keywords: Query to match library items against.
:type keywords: str
:return: List of simplified track object dicts for library items
matching the specified query.
:rtype: list[dict]
"""
results = []
failures = []
items = lib.items(query)
items = library.items(keywords)
if not items:
self._log.debug(u'Your beets query returned no items, '
u'skipping spotify')
self._log.debug(
u'Your beets query returned no items, skipping Spotify.'
)
return
self._log.info(u'Processing {0} tracks...', len(items))
self._log.info(u'Processing {} tracks...', len(items))
for item in items:
# Apply regex transformations if provided
for regex in self.config['regex'].get():
if (
not regex['field'] or
not regex['search'] or
not regex['replace']
not regex['field']
or not regex['search']
or not regex['replace']
):
continue
@ -103,73 +507,84 @@ class SpotifyPlugin(BeetsPlugin):
# Custom values can be passed in the config (just in case)
artist = item[self.config['artist_field'].get()]
album = item[self.config['album_field'].get()]
query = item[self.config['track_field'].get()]
search_url = query + " album:" + album + " artist:" + artist
keywords = item[self.config['track_field'].get()]
# Query the Web API for each track, look for the items' JSON data
r = requests.get(self.base_url, params={
"q": search_url, "type": "track"
})
self._log.debug('{}', r.url)
try:
r.raise_for_status()
except HTTPError as e:
self._log.debug(u'URL returned a {0} error',
e.response.status_code)
failures.append(search_url)
query_filters = {'artist': artist, 'album': album}
response_data = self._search_spotify(
query_type='track', keywords=keywords, filters=query_filters
)
if response_data is None:
query = self._construct_search_query(
keywords=keywords, filters=query_filters
)
failures.append(query)
continue
r_data = r.json()['tracks']['items']
response_data_tracks = response_data['tracks']['items']
# Apply market filter if requested
region_filter = self.config['region_filter'].get()
if region_filter:
r_data = [x for x in r_data if region_filter
in x['available_markets']]
response_data_tracks = [
track_data
for track_data in response_data_tracks
if region_filter in track_data['available_markets']
]
# Simplest, take the first result
chosen_result = None
if len(r_data) == 1 or self.config['tiebreak'].get() == "first":
self._log.debug(u'Spotify track(s) found, count: {0}',
len(r_data))
chosen_result = r_data[0]
elif len(r_data) > 1:
# Use the popularity filter
self._log.debug(u'Most popular track chosen, count: {0}',
len(r_data))
chosen_result = max(r_data, key=lambda x: x['popularity'])
if chosen_result:
results.append(chosen_result)
if (
len(response_data_tracks) == 1
or self.config['tiebreak'].get() == 'first'
):
self._log.debug(
u'Spotify track(s) found, count: {}',
len(response_data_tracks),
)
chosen_result = response_data_tracks[0]
else:
self._log.debug(u'No spotify track found: {0}', search_url)
failures.append(search_url)
# Use the popularity filter
self._log.debug(
u'Most popular track chosen, count: {}',
len(response_data_tracks),
)
chosen_result = max(
response_data_tracks, key=lambda x: x['popularity']
)
results.append(chosen_result)
failure_count = len(failures)
if failure_count > 0:
if self.config['show_failures'].get():
self._log.info(u'{0} track(s) did not match a Spotify ID:',
failure_count)
self._log.info(
u'{} track(s) did not match a Spotify ID:', failure_count
)
for track in failures:
self._log.info(u'track: {0}', track)
self._log.info(u'track: {}', track)
self._log.info(u'')
else:
self._log.warning(u'{0} track(s) did not match a Spotify ID;\n'
u'use --show-failures to display',
failure_count)
self._log.warning(
u'{} track(s) did not match a Spotify ID;\n'
u'use --show-failures to display',
failure_count,
)
return results
def output_results(self, results):
if results:
ids = [x['id'] for x in results]
if self.config['mode'].get() == "open":
self._log.info(u'Attempting to open Spotify with playlist')
spotify_url = self.playlist_partial + ",".join(ids)
webbrowser.open(spotify_url)
def _output_match_results(self, results):
"""Open a playlist or print Spotify URLs for the provided track
object dicts.
:param results: List of simplified track object dicts
(https://developer.spotify.com/documentation/web-api/reference/object-model/#track-object-simplified)
:type results: list[dict]
"""
if results:
spotify_ids = [track_data['id'] for track_data in results]
if self.config['mode'].get() == 'open':
self._log.info(u'Attempting to open Spotify with playlist')
spotify_url = self.playlist_partial + ",".join(spotify_ids)
webbrowser.open(spotify_url)
else:
for item in ids:
print(self.open_url + item)
for spotify_id in spotify_ids:
print(self.open_track_url + spotify_id)
else:
self._log.warning(u'No Spotify tracks found from beets query')

View file

@ -78,7 +78,7 @@ class SubsonicUpdate(BeetsPlugin):
'v': '1.15.0', # Subsonic 6.1 and newer.
'c': 'beets'
}
if contextpath is '/':
if contextpath == '/':
contextpath = ''
url = "http://{}:{}{}/rest/startScan".format(host, port, contextpath)
response = requests.post(url, params=payload)

View file

@ -14,6 +14,10 @@ New features:
issues with foobar2000 and Winamp.
Thanks to :user:`mz2212`.
:bug:`2944`
* A new :doc:`/plugins/playlist` can query the beets library using
M3U playlists.
Thanks to :user:`Holzhaus` and :user:`Xenopathic`.
:bug:`123` :bug:`3145`
* Added whitespace padding to missing tracks dialog to improve readability.
Thanks to :user:`jams2`.
:bug:`2962`
@ -37,6 +41,10 @@ New features:
relevant releases according to the :ref:`preferred` configuration options.
Thanks to :user:`archer4499`.
:bug:`3017`
* :doc:`/plugins/convert`: The plugin now has a ``id3v23`` option that allows
to override the global ``id3v23`` option.
Thanks to :user:`Holzhaus`.
:bug:`3104`
* A new ``aunique`` configuration option allows setting default options
for the :ref:`aunique` template function.
* The ``albumdisambig`` field no longer includes the MusicBrainz release group
@ -48,6 +56,27 @@ New features:
:bug:`2497`
* Modify selection can now be applied early without selecting every item.
:bug:`3083`
* :doc:`/plugins/chroma`: Fingerprint values are now properly stored as
strings, which prevents strange repeated output when running ``beet write``.
Thanks to :user:`Holzhaus`.
:bug:`3097` :bug:`2942`
* The ``move`` command now lists the number of items already in-place.
Thanks to :user:`RollingStar`.
:bug:`3117`
* :doc:`/plugins/spotify`: The plugin now uses OAuth for authentication to the
Spotify API.
Thanks to :user:`rhlahuja`.
:bug:`2694` :bug:`3123`
* :doc:`/plugins/spotify`: The plugin now works as an import metadata
provider: you can match tracks and albums using the Spotify database.
Thanks to :user:`rhlahuja`.
:bug:`3123`
* :doc:`/plugins/ipfs`: The plugin now supports a ``nocopy`` option which passes that flag to ipfs.
Thanks to :user:`wildthyme`.
* :doc:`/plugins/discogs`: The plugin has rate limiting for the discogs API now.
:bug:`3081`
* The `badfiles` plugin now works in parallel (on Python 3 only).
Thanks to :user:`bemeurer`.
Changes:
@ -63,6 +92,8 @@ Changes:
Fixes:
* On Python 2, pin the Jellyfish requirement to version 0.6.0 for
compatibility.
* A new importer option, :ref:`ignore_data_tracks`, lets you skip audio tracks
contained in data files :bug:`3021`
* Restore iTunes Store album art source, and remove the dependency on
@ -111,9 +142,27 @@ Fixes:
* The ``%title`` template function now works correctly with apostrophes.
Thanks to :user:`GuilhermeHideki`.
:bug:`3033`
* :doc:`/plugins/fetchart`: Added network connection error handling to backends
so that beets won't crash if a request fails.
Thanks to :user:`Holzhaus`.
:bug:`1579`
* Fetchart now respects the ``ignore`` and ``ignore_hidden`` settings. :bug:`1632`
* :doc:`/plugins/badfiles`: Avoid a crash when the underlying tool emits
undecodable output.
:bug:`3165`
* :doc:`/plugins/hook`: Fix byte string interpolation in hook commands.
:bug:`2967` :bug:`3167`
.. _python-itunes: https://github.com/ocelma/python-itunes
For developers:
* In addition to prefix-based field queries, plugins can now define *named
queries* that are not associated with any specific field.
For example, the new :doc:`/plugins/playlist` supports queries like
``playlist:name`` although there is no field named ``playlist``.
See :ref:`extend-query` for details.
1.4.7 (May 29, 2018)
--------------------

View file

@ -443,15 +443,24 @@ Extend the Query Syntax
^^^^^^^^^^^^^^^^^^^^^^^
You can add new kinds of queries to beets' :doc:`query syntax
</reference/query>` indicated by a prefix. As an example, beets already
</reference/query>`. There are two ways to add custom queries: using a prefix
and using a name. Prefix-based query extension can apply to *any* field, while
named queries are not associated with any field. For example, beets already
supports regular expression queries, which are indicated by a colon
prefix---plugins can do the same.
To do so, define a subclass of the ``Query`` type from the
``beets.dbcore.query`` module. Then, in the ``queries`` method of your plugin
class, return a dictionary mapping prefix strings to query classes.
For either kind of query extension, define a subclass of the ``Query`` type
from the ``beets.dbcore.query`` module. Then:
One simple kind of query you can extend is the ``FieldQuery``, which
- To define a prefix-based query, define a ``queries`` method in your plugin
class. Return from this method a dictionary mapping prefix strings to query
classes.
- To define a named query, defined dictionaries named either ``item_queries``
or ``album_queries``. These should map names to query types. So if you
use ``{ "foo": FooQuery }``, then the query ``foo:bar`` will construct a
query like ``FooQuery("bar")``.
For prefix-based queries, you will want to extend ``FieldQuery``, which
implements string comparisons on fields. To use it, create a subclass
inheriting from that class and override the ``value_match`` class method.
(Remember the ``@classmethod`` decorator!) The following example plugin

View file

@ -68,6 +68,8 @@ file. The available options are:
- **dest**: The directory where the files will be converted (or copied) to.
Default: none.
- **embed**: Embed album art in converted items. Default: ``yes``.
- **id3v23**: Can be used to override the global ``id3v23`` option. Default:
``inherit``.
- **max_bitrate**: All lossy files with a higher bitrate will be
transcoded and those with a lower bitrate will simply be copied. Note that
this does not guarantee that all converted files will have a lower

View file

@ -81,6 +81,7 @@ like this::
mpdupdate
permissions
play
playlist
plexupdate
random
replaygain
@ -158,6 +159,7 @@ Interoperability
* :doc:`mpdupdate`: Automatically notifies `MPD`_ whenever the beets library
changes.
* :doc:`play`: Play beets queries in your music player.
* :doc:`playlist`: Use M3U playlists to query the beets library.
* :doc:`plexupdate`: Automatically notifies `Plex`_ whenever the beets library
changes.
* :doc:`smartplaylist`: Generate smart playlists based on beets queries.
@ -254,6 +256,8 @@ Here are a few of the plugins written by the beets community:
* `beets-barcode`_ lets you scan or enter barcodes for physical media to
search for their metadata.
* `beets-ydl`_ download audio from youtube-dl sources and import into beets
.. _beets-barcode: https://github.com/8h2a/beets-barcode
.. _beets-check: https://github.com/geigerzaehler/beets-check
.. _copyartifacts: https://github.com/sbarakat/beets-copyartifacts
@ -273,3 +277,4 @@ Here are a few of the plugins written by the beets community:
.. _whatlastgenre: https://github.com/YetAnotherNerd/whatlastgenre/tree/master/plugin/beets
.. _beets-usertag: https://github.com/igordertigor/beets-usertag
.. _beets-popularity: https://github.com/abba23/beets-popularity
.. _beets-ydl: https://github.com/vmassuchetto/beets-ydl

View file

@ -70,3 +70,5 @@ Configuration
The ipfs plugin will automatically add imported albums to ipfs and add those
hashes to the database. This can be turned off by setting the ``auto`` option
in the ``ipfs:`` section of the config to ``no``.
If the setting ``nocopy`` is true (defaults false) then the plugin will pass the ``--nocopy`` option when adding things to ipfs. If the filestore option of ipfs is enabled this will mean files are neither removed from beets nor copied somewhere else.

47
docs/plugins/playlist.rst Normal file
View file

@ -0,0 +1,47 @@
Smart Playlist Plugin
=====================
``playlist`` is a plugin to use playlists in m3u format.
To use it, enable the ``playlist`` plugin in your configuration
(see :ref:`using-plugins`).
Then configure your playlists like this::
playlist:
auto: no
relative_to: ~/Music
playlist_dir: ~/.mpd/playlists
It is possible to query the library based on a playlist by speicifying its
absolute path::
$ beet ls playlist:/path/to/someplaylist.m3u
The plugin also supports referencing playlists by name. The playlist is then
seached in the playlist_dir and the ".m3u" extension is appended to the
name::
$ beet ls playlist:anotherplaylist
The plugin can also update playlists in the playlist directory automatically
every time an item is moved or deleted. This can be controlled by the ``auto``
configuration option.
Configuration
-------------
To configure the plugin, make a ``smartplaylist:`` section in your
configuration file. In addition to the ``playlists`` described above, the
other configuration options are:
- **auto**: If this is set to ``yes``, then anytime an item in the library is
moved or removed, the plugin will update all playlists in the
``playlist_dir`` directory that contain that item to reflect the change.
Default: ``no``
- **playlist_dir**: Where to read playlist files from.
Default: The current working directory (i.e., ``'.'``).
- **relative_to**: Interpret paths in the playlist files relative to a base
directory. Instead of setting it to a fixed path, it is also possible to
set it to ``playlist`` to use the playlist's parent directory or to
``library`` to use the library directory.
Default: ``library``

View file

@ -10,9 +10,9 @@ playback levels.
Installation
------------
This plugin can use one of four backends to compute the ReplayGain values:
GStreamer, mp3gain (and its cousin, aacgain), Python Audio Tools and bs1770gain. mp3gain
can be easier to install but GStreamer, Audio Tools and bs1770gain support more audio
This plugin can use one of three backends to compute the ReplayGain values:
GStreamer, mp3gain (and its cousin, aacgain), Python Audio Tools. mp3gain
can be easier to install but GStreamer and Audio Tools support more audio
formats.
Once installed, this plugin analyzes all files during the import process. This
@ -75,25 +75,6 @@ On OS X, most of the dependencies can be installed with `Homebrew`_::
.. _Python Audio Tools: http://audiotools.sourceforge.net
bs1770gain
``````````
To use this backend, you will need to install the `bs1770gain`_ command-line
tool, version 0.4.6 or greater. Follow the instructions at the `bs1770gain`_
Web site and ensure that the tool is on your ``$PATH``.
.. _bs1770gain: http://bs1770gain.sourceforge.net/
Then, enable the plugin (see :ref:`using-plugins`) and specify the
backend in your configuration file::
replaygain:
backend: bs1770gain
For Windows users: the tool currently has issues with long and non-ASCII path
names. You may want to use the :ref:`asciify-paths` configuration option until
this is resolved.
Configuration
-------------
@ -110,7 +91,7 @@ configuration file. The available options are:
Default: 89.
- **r128**: A space separated list of formats that will use ``R128_`` tags with
integer values instead of the common ``REPLAYGAIN_`` tags with floating point
values. Requires the "bs1770gain" backend.
values. Requires the "ffmpeg" backend.
Default: ``Opus``.
These options only work with the "command" backend:
@ -123,16 +104,6 @@ These options only work with the "command" backend:
would keep clipping from occurring.
Default: ``yes``.
These options only works with the "bs1770gain" backend:
- **method**: The loudness scanning standard: either `replaygain` for
ReplayGain 2.0, `ebu` for EBU R128, or `atsc` for ATSC A/85. This dictates
the reference level: -18, -23, or -24 LUFS respectively. Default:
`replaygain`
- **chunk_at**: Splits an album in groups of tracks of this amount.
Useful when running into memory problems when analysing albums with
an exceptionally large amount of tracks. Default:5000
Manual Analysis
---------------

View file

@ -1,10 +1,16 @@
Spotify Plugin
==============
The ``spotify`` plugin generates `Spotify`_ playlists from tracks in your library. Using the `Spotify Web API`_, any tracks that can be matched with a Spotify ID are returned, and the results can be either pasted in to a playlist or opened directly in the Spotify app.
The ``spotify`` plugin generates `Spotify`_ playlists from tracks in your
library with the ``beet spotify`` command using the `Spotify Search API`_.
Also, the plugin can use the Spotify `Album`_ and `Track`_ APIs to provide
metadata matches for the importer.
.. _Spotify: https://www.spotify.com/
.. _Spotify Web API: https://developer.spotify.com/web-api/search-item/
.. _Spotify Search API: https://developer.spotify.com/documentation/web-api/reference/search/search/
.. _Album: https://developer.spotify.com/documentation/web-api/reference/albums/get-album/
.. _Track: https://developer.spotify.com/documentation/web-api/reference/tracks/get-track/
Why Use This Plugin?
--------------------
@ -12,10 +18,10 @@ Why Use This Plugin?
* You're a Beets user and Spotify user already.
* You have playlists or albums you'd like to make available in Spotify from Beets without having to search for each artist/album/track.
* You want to check which tracks in your library are available on Spotify.
* You want to autotag music with metadata from the Spotify API.
Basic Usage
-----------
First, enable the ``spotify`` plugin (see :ref:`using-plugins`).
Then, use the ``spotify`` command with a beets query::
@ -37,6 +43,12 @@ Command-line options include:
* ``--show-failures`` or ``-f``: List the tracks that did not match a Spotify
ID.
You can enter the URL for an album or song on Spotify at the ``enter Id``
prompt during import::
Enter search, enter Id, aBort, eDit, edit Candidates, plaY? i
Enter release ID: https://open.spotify.com/album/2rFYTHFBLQN3AYlrymBPPA
Configuration
-------------
@ -67,10 +79,14 @@ in config.yaml under the ``spotify:`` section:
track/album/artist fields before sending them to Spotify. Can be useful for
changing certain abbreviations, like ft. -> feat. See the examples below.
Default: None.
- **source_weight**: Penalty applied to Spotify matches during import. Set to
0.0 to disable.
Default: ``0.5``.
Here's an example::
spotify:
source_weight: 0.7
mode: open
region_filter: US
show_failures: on

View file

@ -303,6 +303,7 @@ The defaults look like this::
See :ref:`aunique` for more details.
.. _terminal_encoding:
terminal_encoding
@ -654,8 +655,8 @@ Default: ``{}`` (empty).
MusicBrainz Options
-------------------
If you run your own `MusicBrainz`_ server, you can instruct beets to use it
instead of the main server. Use the ``host`` and ``ratelimit`` options under a
You can instruct beets to use `your own MusicBrainz database`_ instead of
the `main server`_. Use the ``host`` and ``ratelimit`` options under a
``musicbrainz:`` header, like so::
musicbrainz:
@ -663,14 +664,18 @@ instead of the main server. Use the ``host`` and ``ratelimit`` options under a
ratelimit: 100
The ``host`` key, of course, controls the Web server hostname (and port,
optionally) that will be contacted by beets (default: musicbrainz.org). The
``ratelimit`` option, an integer, controls the number of Web service requests
optionally) that will be contacted by beets (default: musicbrainz.org).
The server must have search indices enabled (see `Building search indexes`_).
The ``ratelimit`` option, an integer, controls the number of Web service requests
per second (default: 1). **Do not change the rate limit setting** if you're
using the main MusicBrainz server---on this public server, you're `limited`_
to one request per second.
.. _your own MusicBrainz database: https://musicbrainz.org/doc/MusicBrainz_Server/Setup
.. _main server: https://musicbrainz.org/
.. _limited: http://musicbrainz.org/doc/XML_Web_Service/Rate_Limiting
.. _MusicBrainz: http://musicbrainz.org/
.. _Building search indexes: https://musicbrainz.org/doc/MusicBrainz_Server/Setup#Building_search_indexes
.. _searchlimit:

View file

@ -88,13 +88,24 @@ setup(
install_requires=[
'six>=1.9',
'mutagen>=1.33',
'munkres',
'unidecode',
'musicbrainzngs>=0.4',
'pyyaml',
'jellyfish',
] + (['colorama'] if (sys.platform == 'win32') else []) +
(['enum34>=1.0.4'] if sys.version_info < (3, 4, 0) else []),
] + [
# Avoid a version of munkres incompatible with Python 3.
'munkres~=1.0.0' if sys.version_info < (3, 5, 0) else
'munkres!=1.1.0,!=1.1.1' if sys.version_info < (3, 6, 0) else
'munkres>=1.0.0',
] + (
# Use the backport of Python 3.4's `enum` module.
['enum34>=1.0.4'] if sys.version_info < (3, 4, 0) else []
) + (
# Pin a Python 2-compatible version of Jellyfish.
['jellyfish==0.6.0'] if sys.version_info < (3, 4, 0) else ['jellyfish']
) + (
# Support for ANSI console colors on Windows.
['colorama'] if (sys.platform == 'win32') else []
),
tests_require=[
'beautifulsoup4',

View file

@ -222,12 +222,19 @@ class TestHelper(object):
beets.config['plugins'] = plugins
beets.plugins.load_plugins(plugins)
beets.plugins.find_plugins()
# Take a backup of the original _types to restore when unloading
# Take a backup of the original _types and _queries to restore
# when unloading.
Item._original_types = dict(Item._types)
Album._original_types = dict(Album._types)
Item._types.update(beets.plugins.types(Item))
Album._types.update(beets.plugins.types(Album))
Item._original_queries = dict(Item._queries)
Album._original_queries = dict(Album._queries)
Item._queries.update(beets.plugins.named_queries(Item))
Album._queries.update(beets.plugins.named_queries(Album))
def unload_plugins(self):
"""Unload all plugins and remove the from the configuration.
"""
@ -237,6 +244,8 @@ class TestHelper(object):
beets.plugins._instances = {}
Item._types = Item._original_types
Album._types = Album._original_types
Item._queries = Item._original_queries
Album._queries = Album._original_queries
def create_importer(self, item_count=1, album_count=1):
"""Create files to import and return corresponding session.

View file

@ -25,6 +25,7 @@ import responses
from mock import patch
from test import _common
from test.helper import capture_log
from beetsplug import fetchart
from beets.autotag import AlbumInfo, AlbumMatch
from beets import config
@ -274,6 +275,111 @@ class AAOTest(UseThePlugin):
next(self.source.get(album, self.settings, []))
class ITunesStoreTest(UseThePlugin):
def setUp(self):
super(ITunesStoreTest, self).setUp()
self.source = fetchart.ITunesStore(logger, self.plugin.config)
self.settings = Settings()
self.album = _common.Bag(albumartist="some artist", album="some album")
@responses.activate
def run(self, *args, **kwargs):
super(ITunesStoreTest, self).run(*args, **kwargs)
def mock_response(self, url, json):
responses.add(responses.GET, url, body=json,
content_type='application/json')
def test_itunesstore_finds_image(self):
json = """{
"results":
[
{
"artistName": "some artist",
"collectionName": "some album",
"artworkUrl100": "url_to_the_image"
}
]
}"""
self.mock_response(fetchart.ITunesStore.API_URL, json)
candidate = next(self.source.get(self.album, self.settings, []))
self.assertEqual(candidate.url, 'url_to_the_image')
self.assertEqual(candidate.match, fetchart.Candidate.MATCH_EXACT)
def test_itunesstore_no_result(self):
json = '{"results": []}'
self.mock_response(fetchart.ITunesStore.API_URL, json)
expected = u"got no results"
with capture_log('beets.test_art') as logs:
with self.assertRaises(StopIteration):
next(self.source.get(self.album, self.settings, []))
self.assertIn(expected, logs[1])
def test_itunesstore_requestexception(self):
responses.add(responses.GET, fetchart.ITunesStore.API_URL,
json={'error': 'not found'}, status=404)
expected = u'iTunes search failed: 404 Client Error'
with capture_log('beets.test_art') as logs:
with self.assertRaises(StopIteration):
next(self.source.get(self.album, self.settings, []))
self.assertIn(expected, logs[1])
def test_itunesstore_fallback_match(self):
json = """{
"results":
[
{
"collectionName": "some album",
"artworkUrl100": "url_to_the_image"
}
]
}"""
self.mock_response(fetchart.ITunesStore.API_URL, json)
candidate = next(self.source.get(self.album, self.settings, []))
self.assertEqual(candidate.url, 'url_to_the_image')
self.assertEqual(candidate.match, fetchart.Candidate.MATCH_FALLBACK)
def test_itunesstore_returns_result_without_artwork(self):
json = """{
"results":
[
{
"artistName": "some artist",
"collectionName": "some album"
}
]
}"""
self.mock_response(fetchart.ITunesStore.API_URL, json)
expected = u'Malformed itunes candidate'
with capture_log('beets.test_art') as logs:
with self.assertRaises(StopIteration):
next(self.source.get(self.album, self.settings, []))
self.assertIn(expected, logs[1])
def test_itunesstore_returns_no_result_when_error_received(self):
json = '{"error": {"errors": [{"reason": "some reason"}]}}'
self.mock_response(fetchart.ITunesStore.API_URL, json)
expected = u"not found in json. Fields are"
with capture_log('beets.test_art') as logs:
with self.assertRaises(StopIteration):
next(self.source.get(self.album, self.settings, []))
self.assertIn(expected, logs[1])
def test_itunesstore_returns_no_result_with_malformed_response(self):
json = """bla blup"""
self.mock_response(fetchart.ITunesStore.API_URL, json)
expected = u"Could not decode json response:"
with capture_log('beets.test_art') as logs:
with self.assertRaises(StopIteration):
next(self.source.get(self.album, self.settings, []))
self.assertIn(expected, logs[1])
class GoogleImageTest(UseThePlugin):
def setUp(self):
super(GoogleImageTest, self).setUp()

View file

@ -36,6 +36,17 @@ class TestSort(dbcore.query.FieldSort):
pass
class TestQuery(dbcore.query.Query):
def __init__(self, pattern):
self.pattern = pattern
def clause(self):
return None, ()
def match(self):
return True
class TestModel1(dbcore.Model):
_table = 'test'
_flex_table = 'testflex'
@ -49,6 +60,9 @@ class TestModel1(dbcore.Model):
_sorts = {
'some_sort': TestSort,
}
_queries = {
'some_query': TestQuery,
}
@classmethod
def _getters(cls):
@ -519,6 +533,10 @@ class QueryFromStringsTest(unittest.TestCase):
q = self.qfs([''])
self.assertIsInstance(q.subqueries[0], dbcore.query.TrueQuery)
def test_parse_named_query(self):
q = self.qfs(['some_query:foo'])
self.assertIsInstance(q.subqueries[0], TestQuery)
class SortFromStringsTest(unittest.TestCase):
def sfs(self, strings):

View file

@ -15,7 +15,9 @@
from __future__ import division, absolute_import, print_function
import ctypes
import os
import sys
import unittest
from test.helper import TestHelper
from beets import util
@ -29,21 +31,31 @@ class FetchartCliTest(unittest.TestCase, TestHelper):
self.config['fetchart']['cover_names'] = 'c\xc3\xb6ver.jpg'
self.config['art_filename'] = 'mycover'
self.album = self.add_album()
self.cover_path = os.path.join(self.album.path, b'mycover.jpg')
def tearDown(self):
self.unload_plugins()
self.teardown_beets()
def check_cover_is_stored(self):
self.assertEqual(self.album['artpath'], self.cover_path)
with open(util.syspath(self.cover_path), 'r') as f:
self.assertEqual(f.read(), 'IMAGE')
def hide_file_windows(self):
hidden_mask = 2
success = ctypes.windll.kernel32.SetFileAttributesW(self.cover_path,
hidden_mask)
if not success:
self.skipTest("unable to set file attributes")
def test_set_art_from_folder(self):
self.touch(b'c\xc3\xb6ver.jpg', dir=self.album.path, content='IMAGE')
self.run_command('fetchart')
cover_path = os.path.join(self.album.path, b'mycover.jpg')
self.album.load()
self.assertEqual(self.album['artpath'], cover_path)
with open(util.syspath(cover_path), 'r') as f:
self.assertEqual(f.read(), 'IMAGE')
self.check_cover_is_stored()
def test_filesystem_does_not_pick_up_folder(self):
os.makedirs(os.path.join(self.album.path, b'mycover.jpg'))
@ -51,6 +63,47 @@ class FetchartCliTest(unittest.TestCase, TestHelper):
self.album.load()
self.assertEqual(self.album['artpath'], None)
def test_filesystem_does_not_pick_up_ignored_file(self):
self.touch(b'co_ver.jpg', dir=self.album.path, content='IMAGE')
self.config['ignore'] = ['*_*']
self.run_command('fetchart')
self.album.load()
self.assertEqual(self.album['artpath'], None)
def test_filesystem_picks_up_non_ignored_file(self):
self.touch(b'cover.jpg', dir=self.album.path, content='IMAGE')
self.config['ignore'] = ['*_*']
self.run_command('fetchart')
self.album.load()
self.check_cover_is_stored()
def test_filesystem_does_not_pick_up_hidden_file(self):
self.touch(b'.cover.jpg', dir=self.album.path, content='IMAGE')
if sys.platform == 'win32':
self.hide_file_windows()
self.config['ignore'] = [] # By default, ignore includes '.*'.
self.config['ignore_hidden'] = True
self.run_command('fetchart')
self.album.load()
self.assertEqual(self.album['artpath'], None)
def test_filesystem_picks_up_non_hidden_file(self):
self.touch(b'cover.jpg', dir=self.album.path, content='IMAGE')
self.config['ignore_hidden'] = True
self.run_command('fetchart')
self.album.load()
self.check_cover_is_stored()
def test_filesystem_picks_up_hidden_file(self):
self.touch(b'.cover.jpg', dir=self.album.path, content='IMAGE')
if sys.platform == 'win32':
self.hide_file_windows()
self.config['ignore'] = [] # By default, ignore includes '.*'.
self.config['ignore_hidden'] = False
self.run_command('fetchart')
self.album.load()
self.check_cover_is_stored()
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View file

@ -110,6 +110,25 @@ class HookTest(_common.TestCase, TestHelper):
self.assertTrue(os.path.isfile(path))
os.remove(path)
def test_hook_bytes_interpolation(self):
temporary_paths = [
get_temporary_path().encode('utf-8')
for i in range(self.TEST_HOOK_COUNT)
]
for index, path in enumerate(temporary_paths):
self._add_hook('test_bytes_event_{0}'.format(index),
'touch "{path}"')
self.load_plugins('hook')
for index, path in enumerate(temporary_paths):
plugins.send('test_bytes_event_{0}'.format(index), path=path)
for path in temporary_paths:
self.assertTrue(os.path.isfile(path))
os.remove(path)
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)

308
test/test_playlist.py Normal file
View file

@ -0,0 +1,308 @@
# -*- coding: utf-8 -*-
# This file is part of beets.
# Copyright 2016, Thomas Scholtes.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
from __future__ import division, absolute_import, print_function
from six.moves import shlex_quote
import os
import shutil
import tempfile
import unittest
from test import _common
from test import helper
import beets
class PlaylistTestHelper(helper.TestHelper):
def setUp(self):
self.setup_beets()
self.lib = beets.library.Library(':memory:')
self.music_dir = os.path.expanduser(os.path.join('~', 'Music'))
i1 = _common.item()
i1.path = beets.util.normpath(os.path.join(
self.music_dir,
'a', 'b', 'c.mp3',
))
i1.title = u'some item'
i1.album = u'some album'
self.lib.add(i1)
self.lib.add_album([i1])
i2 = _common.item()
i2.path = beets.util.normpath(os.path.join(
self.music_dir,
'd', 'e', 'f.mp3',
))
i2.title = 'another item'
i2.album = 'another album'
self.lib.add(i2)
self.lib.add_album([i2])
i3 = _common.item()
i3.path = beets.util.normpath(os.path.join(
self.music_dir,
'x', 'y', 'z.mp3',
))
i3.title = 'yet another item'
i3.album = 'yet another album'
self.lib.add(i3)
self.lib.add_album([i3])
self.playlist_dir = tempfile.mkdtemp()
self.config['directory'] = self.music_dir
self.config['playlist']['playlist_dir'] = self.playlist_dir
self.setup_test()
self.load_plugins('playlist')
def setup_test(self):
raise NotImplementedError
def tearDown(self):
self.unload_plugins()
shutil.rmtree(self.playlist_dir)
self.teardown_beets()
class PlaylistQueryTestHelper(PlaylistTestHelper):
def test_name_query_with_absolute_paths_in_playlist(self):
q = u'playlist:absolute'
results = self.lib.items(q)
self.assertEqual(set([i.title for i in results]), set([
u'some item',
u'another item',
]))
def test_path_query_with_absolute_paths_in_playlist(self):
q = u'playlist:{0}'.format(shlex_quote(os.path.join(
self.playlist_dir,
'absolute.m3u',
)))
results = self.lib.items(q)
self.assertEqual(set([i.title for i in results]), set([
u'some item',
u'another item',
]))
def test_name_query_with_relative_paths_in_playlist(self):
q = u'playlist:relative'
results = self.lib.items(q)
self.assertEqual(set([i.title for i in results]), set([
u'some item',
u'another item',
]))
def test_path_query_with_relative_paths_in_playlist(self):
q = u'playlist:{0}'.format(shlex_quote(os.path.join(
self.playlist_dir,
'relative.m3u',
)))
results = self.lib.items(q)
self.assertEqual(set([i.title for i in results]), set([
u'some item',
u'another item',
]))
def test_name_query_with_nonexisting_playlist(self):
q = u'playlist:nonexisting'.format(self.playlist_dir)
results = self.lib.items(q)
self.assertEqual(set(results), set())
def test_path_query_with_nonexisting_playlist(self):
q = u'playlist:{0}'.format(shlex_quote(os.path.join(
self.playlist_dir,
self.playlist_dir,
'nonexisting.m3u',
)))
results = self.lib.items(q)
self.assertEqual(set(results), set())
class PlaylistTestRelativeToLib(PlaylistQueryTestHelper, unittest.TestCase):
def setup_test(self):
with open(os.path.join(self.playlist_dir, 'absolute.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'd', 'e', 'f.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'nonexisting.mp3')))
with open(os.path.join(self.playlist_dir, 'relative.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join('a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join('d', 'e', 'f.mp3')))
f.write('{0}\n'.format('nonexisting.mp3'))
self.config['playlist']['relative_to'] = 'library'
class PlaylistTestRelativeToDir(PlaylistQueryTestHelper, unittest.TestCase):
def setup_test(self):
with open(os.path.join(self.playlist_dir, 'absolute.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'd', 'e', 'f.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'nonexisting.mp3')))
with open(os.path.join(self.playlist_dir, 'relative.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join('a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join('d', 'e', 'f.mp3')))
f.write('{0}\n'.format('nonexisting.mp3'))
self.config['playlist']['relative_to'] = self.music_dir
class PlaylistTestRelativeToPls(PlaylistQueryTestHelper, unittest.TestCase):
def setup_test(self):
with open(os.path.join(self.playlist_dir, 'absolute.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'd', 'e', 'f.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'nonexisting.mp3')))
with open(os.path.join(self.playlist_dir, 'relative.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.relpath(
os.path.join(self.music_dir, 'a', 'b', 'c.mp3'),
start=self.playlist_dir,
)))
f.write('{0}\n'.format(os.path.relpath(
os.path.join(self.music_dir, 'd', 'e', 'f.mp3'),
start=self.playlist_dir,
)))
f.write('{0}\n'.format(os.path.relpath(
os.path.join(self.music_dir, 'nonexisting.mp3'),
start=self.playlist_dir,
)))
self.config['playlist']['relative_to'] = 'playlist'
self.config['playlist']['playlist_dir'] = self.playlist_dir
class PlaylistUpdateTestHelper(PlaylistTestHelper):
def setup_test(self):
with open(os.path.join(self.playlist_dir, 'absolute.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'd', 'e', 'f.mp3')))
f.write('{0}\n'.format(os.path.join(
self.music_dir, 'nonexisting.mp3')))
with open(os.path.join(self.playlist_dir, 'relative.m3u'), 'w') as f:
f.write('{0}\n'.format(os.path.join('a', 'b', 'c.mp3')))
f.write('{0}\n'.format(os.path.join('d', 'e', 'f.mp3')))
f.write('{0}\n'.format('nonexisting.mp3'))
self.config['playlist']['auto'] = True
self.config['playlist']['relative_to'] = 'library'
class PlaylistTestItemMoved(PlaylistUpdateTestHelper, unittest.TestCase):
def test_item_moved(self):
# Emit item_moved event for an item that is in a playlist
results = self.lib.items(u'path:{0}'.format(shlex_quote(
os.path.join(self.music_dir, 'd', 'e', 'f.mp3'))))
item = results[0]
beets.plugins.send(
'item_moved', item=item, source=item.path,
destination=beets.util.bytestring_path(
os.path.join(self.music_dir, 'g', 'h', 'i.mp3')))
# Emit item_moved event for an item that is not in a playlist
results = self.lib.items(u'path:{0}'.format(shlex_quote(
os.path.join(self.music_dir, 'x', 'y', 'z.mp3'))))
item = results[0]
beets.plugins.send(
'item_moved', item=item, source=item.path,
destination=beets.util.bytestring_path(
os.path.join(self.music_dir, 'u', 'v', 'w.mp3')))
# Emit cli_exit event
beets.plugins.send('cli_exit', lib=self.lib)
# Check playlist with absolute paths
playlist_path = os.path.join(self.playlist_dir, 'absolute.m3u')
with open(playlist_path, 'r') as f:
lines = [line.strip() for line in f.readlines()]
self.assertEqual(lines, [
os.path.join(self.music_dir, 'a', 'b', 'c.mp3'),
os.path.join(self.music_dir, 'g', 'h', 'i.mp3'),
os.path.join(self.music_dir, 'nonexisting.mp3'),
])
# Check playlist with relative paths
playlist_path = os.path.join(self.playlist_dir, 'relative.m3u')
with open(playlist_path, 'r') as f:
lines = [line.strip() for line in f.readlines()]
self.assertEqual(lines, [
os.path.join('a', 'b', 'c.mp3'),
os.path.join('g', 'h', 'i.mp3'),
'nonexisting.mp3',
])
class PlaylistTestItemRemoved(PlaylistUpdateTestHelper, unittest.TestCase):
def test_item_removed(self):
# Emit item_removed event for an item that is in a playlist
results = self.lib.items(u'path:{0}'.format(shlex_quote(
os.path.join(self.music_dir, 'd', 'e', 'f.mp3'))))
item = results[0]
beets.plugins.send('item_removed', item=item)
# Emit item_removed event for an item that is not in a playlist
results = self.lib.items(u'path:{0}'.format(shlex_quote(
os.path.join(self.music_dir, 'x', 'y', 'z.mp3'))))
item = results[0]
beets.plugins.send('item_removed', item=item)
# Emit cli_exit event
beets.plugins.send('cli_exit', lib=self.lib)
# Check playlist with absolute paths
playlist_path = os.path.join(self.playlist_dir, 'absolute.m3u')
with open(playlist_path, 'r') as f:
lines = [line.strip() for line in f.readlines()]
self.assertEqual(lines, [
os.path.join(self.music_dir, 'a', 'b', 'c.mp3'),
os.path.join(self.music_dir, 'nonexisting.mp3'),
])
# Check playlist with relative paths
playlist_path = os.path.join(self.playlist_dir, 'relative.m3u')
with open(playlist_path, 'r') as f:
lines = [line.strip() for line in f.readlines()]
self.assertEqual(lines, [
os.path.join('a', 'b', 'c.mp3'),
'nonexisting.mp3',
])
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == '__main__':
unittest.main(defaultTest='suite')

View file

@ -29,87 +29,111 @@ def _params(url):
class SpotifyPluginTest(_common.TestCase, TestHelper):
@responses.activate
def setUp(self):
config.clear()
self.setup_beets()
responses.add(
responses.POST,
spotify.SpotifyPlugin.oauth_token_url,
status=200,
json={
'access_token': '3XyiC3raJySbIAV5LVYj1DaWbcocNi3LAJTNXRnYY'
'GVUl6mbbqXNhW3YcZnQgYXNWHFkVGSMlc0tMuvq8CF',
'token_type': 'Bearer',
'expires_in': 3600,
'scope': '',
},
)
self.spotify = spotify.SpotifyPlugin()
opts = ArgumentsMock("list", False)
self.spotify.parse_opts(opts)
self.spotify._parse_opts(opts)
def tearDown(self):
self.teardown_beets()
def test_args(self):
opts = ArgumentsMock("fail", True)
self.assertEqual(False, self.spotify.parse_opts(opts))
self.assertEqual(False, self.spotify._parse_opts(opts))
opts = ArgumentsMock("list", False)
self.assertEqual(True, self.spotify.parse_opts(opts))
self.assertEqual(True, self.spotify._parse_opts(opts))
def test_empty_query(self):
self.assertEqual(None, self.spotify.query_spotify(self.lib, u"1=2"))
self.assertEqual(
None, self.spotify._match_library_tracks(self.lib, u"1=2")
)
@responses.activate
def test_missing_request(self):
json_file = os.path.join(_common.RSRC, b'spotify',
b'missing_request.json')
json_file = os.path.join(
_common.RSRC, b'spotify', b'missing_request.json'
)
with open(json_file, 'rb') as f:
response_body = f.read()
responses.add(responses.GET, 'https://api.spotify.com/v1/search',
body=response_body, status=200,
content_type='application/json')
responses.add(
responses.GET,
spotify.SpotifyPlugin.search_url,
body=response_body,
status=200,
content_type='application/json',
)
item = Item(
mb_trackid=u'01234',
album=u'lkajsdflakjsd',
albumartist=u'ujydfsuihse',
title=u'duifhjslkef',
length=10
length=10,
)
item.add(self.lib)
self.assertEqual([], self.spotify.query_spotify(self.lib, u""))
self.assertEqual([], self.spotify._match_library_tracks(self.lib, u""))
params = _params(responses.calls[0].request.url)
self.assertEqual(
params['q'],
[u'duifhjslkef album:lkajsdflakjsd artist:ujydfsuihse'],
)
query = params['q'][0]
self.assertIn(u'duifhjslkef', query)
self.assertIn(u'artist:ujydfsuihse', query)
self.assertIn(u'album:lkajsdflakjsd', query)
self.assertEqual(params['type'], [u'track'])
@responses.activate
def test_track_request(self):
json_file = os.path.join(_common.RSRC, b'spotify',
b'track_request.json')
json_file = os.path.join(
_common.RSRC, b'spotify', b'track_request.json'
)
with open(json_file, 'rb') as f:
response_body = f.read()
responses.add(responses.GET, 'https://api.spotify.com/v1/search',
body=response_body, status=200,
content_type='application/json')
responses.add(
responses.GET,
spotify.SpotifyPlugin.search_url,
body=response_body,
status=200,
content_type='application/json',
)
item = Item(
mb_trackid=u'01234',
album=u'Despicable Me 2',
albumartist=u'Pharrell Williams',
title=u'Happy',
length=10
length=10,
)
item.add(self.lib)
results = self.spotify.query_spotify(self.lib, u"Happy")
results = self.spotify._match_library_tracks(self.lib, u"Happy")
self.assertEqual(1, len(results))
self.assertEqual(u"6NPVjNh8Jhru9xOmyQigds", results[0]['id'])
self.spotify.output_results(results)
self.spotify._output_match_results(results)
params = _params(responses.calls[0].request.url)
self.assertEqual(
params['q'],
[u'Happy album:Despicable Me 2 artist:Pharrell Williams'],
)
query = params['q'][0]
self.assertIn(u'Happy', query)
self.assertIn(u'artist:Pharrell Williams', query)
self.assertIn(u'album:Despicable Me 2', query)
self.assertEqual(params['type'], [u'track'])
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == '__main__':
unittest.main(defaultTest='suite')

View file

@ -31,7 +31,7 @@ deps =
flake8-coding
flake8-future-import
flake8-blind-except
pep8-naming
pep8-naming~=0.7.0
files = beets beetsplug beet test setup.py docs
[testenv]