Merge remote-tracking branch 'upstream/master' into trunc_artist

This commit is contained in:
Alok Saboo 2024-03-01 10:16:09 -05:00
commit c556989ee7
26 changed files with 422 additions and 97 deletions

View file

@ -3,12 +3,12 @@
repos:
- repo: https://github.com/psf/black
rev: 23.11.0
rev: 24.2.0
hooks:
- id: black
- repo: https://github.com/pycqa/isort
rev: 5.12.0
rev: 5.13.2
hooks:
- id: isort
name: isort (python)

View file

@ -122,8 +122,7 @@ class NamedQuery(Query):
"""
@abstractmethod
def __init__(self, pattern):
...
def __init__(self, pattern): ...
P = TypeVar("P")

View file

@ -35,8 +35,7 @@ if TYPE_CHECKING and sys.version_info >= (3, 8):
given type.
"""
def __init__(self, value: Any = None):
...
def __init__(self, value: Any = None): ...
else:
# No structural subtyping in Python < 3.8...

View file

@ -216,12 +216,14 @@ def get_singleton_disambig_fields(info: hooks.TrackInfo) -> Sequence[str]:
calculated_values = {
"index": "Index {}".format(str(info.index)),
"track_alt": "Track {}".format(info.track_alt),
"album": "[{}]".format(info.album)
if (
config["import"]["singleton_album_disambig"].get()
and info.get("album")
)
else "",
"album": (
"[{}]".format(info.album)
if (
config["import"]["singleton_album_disambig"].get()
and info.get("album")
)
else ""
),
}
for field in chosen_fields:
@ -240,9 +242,11 @@ def get_album_disambig_fields(info: hooks.AlbumInfo) -> Sequence[str]:
out = []
chosen_fields = config["match"]["album_disambig_fields"].as_str_seq()
calculated_values = {
"media": "{}x{}".format(info.mediums, info.media)
if (info.mediums and info.mediums > 1)
else info.media,
"media": (
"{}x{}".format(info.mediums, info.media)
if (info.mediums and info.mediums > 1)
else info.media
),
}
for field in chosen_fields:
@ -1160,9 +1164,11 @@ class TerminalImportSession(importer.ImportSession):
print_(
"Old: "
+ summarize_items(
list(duplicate.items())
if task.is_album
else [duplicate],
(
list(duplicate.items())
if task.is_album
else [duplicate]
),
not task.is_album,
)
)

View file

@ -58,11 +58,6 @@ class Environment:
# Code generation helpers.
def ex_lvalue(name):
"""A variable load expression."""
return ast.Name(name, ast.Store())
def ex_rvalue(name):
"""A variable store expression."""
return ast.Name(name, ast.Load())
@ -75,15 +70,6 @@ def ex_literal(val):
return ast.Constant(val)
def ex_varassign(name, expr):
"""Assign an expression into a single variable. The expression may
either be an `ast.expr` object or a value to be used as a literal.
"""
if not isinstance(expr, ast.expr):
expr = ex_literal(expr)
return ast.Assign([ex_lvalue(name)], expr)
def ex_call(func, args):
"""A function-call expression with only positional parameters. The
function may be an expression or the name of a function. Each

View file

@ -27,37 +27,22 @@ from beets.plugins import BeetsPlugin
from beets.ui import UserError
def simple_rewriter(field, rules):
def rewriter(field, simple_rules, advanced_rules):
"""Template field function factory.
Create a template field function that rewrites the given field
with the given rewriting rules.
``rules`` must be a list of (pattern, replacement) pairs.
``simple_rules`` must be a list of (pattern, replacement) pairs.
``advanced_rules`` must be a list of (query, replacement) pairs.
"""
def fieldfunc(item):
value = item._values_fixed[field]
for pattern, replacement in rules:
for pattern, replacement in simple_rules:
if pattern.match(value.lower()):
# Rewrite activated.
return replacement
# Not activated; return original value.
return value
return fieldfunc
def advanced_rewriter(field, rules):
"""Template field function factory.
Create a template field function that rewrites the given field
with the given rewriting rules.
``rules`` must be a list of (query, replacement) pairs.
"""
def fieldfunc(item):
value = item._values_fixed[field]
for query, replacement in rules:
for query, replacement in advanced_rules:
if query.match(item):
# Rewrite activated.
return replacement
@ -97,8 +82,12 @@ class AdvancedRewritePlugin(BeetsPlugin):
}
# Gather all the rewrite rules for each field.
simple_rules = defaultdict(list)
advanced_rules = defaultdict(list)
class RulesContainer:
def __init__(self):
self.simple = []
self.advanced = []
rules = defaultdict(RulesContainer)
for rule in self.config.get(template):
if "match" not in rule:
# Simple syntax
@ -124,12 +113,12 @@ class AdvancedRewritePlugin(BeetsPlugin):
f"for field {fieldname}"
)
pattern = re.compile(pattern.lower())
simple_rules[fieldname].append((pattern, value))
rules[fieldname].simple.append((pattern, value))
# Apply the same rewrite to the corresponding album field.
if fieldname in corresponding_album_fields:
album_fieldname = corresponding_album_fields[fieldname]
simple_rules[album_fieldname].append((pattern, value))
rules[album_fieldname].simple.append((pattern, value))
else:
# Advanced syntax
match = rule["match"]
@ -168,24 +157,18 @@ class AdvancedRewritePlugin(BeetsPlugin):
f"for field {fieldname}"
)
advanced_rules[fieldname].append((query, replacement))
rules[fieldname].advanced.append((query, replacement))
# Apply the same rewrite to the corresponding album field.
if fieldname in corresponding_album_fields:
album_fieldname = corresponding_album_fields[fieldname]
advanced_rules[album_fieldname].append(
rules[album_fieldname].advanced.append(
(query, replacement)
)
# Replace each template field with the new rewriter function.
for fieldname, fieldrules in simple_rules.items():
getter = simple_rewriter(fieldname, fieldrules)
self.template_fields[fieldname] = getter
if fieldname in Album._fields:
self.album_template_fields[fieldname] = getter
for fieldname, fieldrules in advanced_rules.items():
getter = advanced_rewriter(fieldname, fieldrules)
for fieldname, fieldrules in rules.items():
getter = rewriter(fieldname, fieldrules.simple, fieldrules.advanced)
self.template_fields[fieldname] = getter
if fieldname in Album._fields:
self.album_template_fields[fieldname] = getter

View file

@ -74,7 +74,7 @@ class ExportPlugin(BeetsPlugin):
"xml": {
# XML module formatting options.
"formatting": {}
}
},
# TODO: Use something like the edit plugin
# 'item_fields': []
}

View file

@ -204,12 +204,20 @@ def process_tracks(lib, tracks, log):
for num in range(0, total):
song = None
trackid = tracks[num]["mbid"].strip()
artist = tracks[num]["artist"].get("name", "").strip()
title = tracks[num]["name"].strip()
trackid = tracks[num]["mbid"].strip() if tracks[num]["mbid"] else None
artist = (
tracks[num]["artist"].get("name", "").strip()
if tracks[num]["artist"].get("name", "")
else None
)
title = tracks[num]["name"].strip() if tracks[num]["name"] else None
album = ""
if "album" in tracks[num]:
album = tracks[num]["album"].get("name", "").strip()
album = (
tracks[num]["album"].get("name", "").strip()
if tracks[num]["album"]
else None
)
log.debug("query: {0} - {1} ({2})", artist, title, album)
@ -219,6 +227,19 @@ def process_tracks(lib, tracks, log):
dbcore.query.MatchQuery("mb_trackid", trackid)
).get()
# If not, try just album/title
if song is None:
log.debug(
"no album match, trying by album/title: {0} - {1}", album, title
)
query = dbcore.AndQuery(
[
dbcore.query.SubstringQuery("album", album),
dbcore.query.SubstringQuery("title", title),
]
)
song = lib.items(query).get()
# If not, try just artist/title
if song is None:
log.debug("no album match, trying by artist/title")
@ -244,7 +265,7 @@ def process_tracks(lib, tracks, log):
if song is not None:
count = int(song.get("play_count", 0))
new_count = int(tracks[num]["playcount"])
new_count = int(tracks[num].get("playcount", 1))
log.debug(
"match: {0} - {1} ({2}) " "updating: play_count {3} => {4}",
song.artist,

266
beetsplug/listenbrainz.py Normal file
View file

@ -0,0 +1,266 @@
"""Adds Listenbrainz support to Beets."""
import datetime
import musicbrainzngs
import requests
from beets import config, ui
from beets.plugins import BeetsPlugin
from beetsplug.lastimport import process_tracks
class ListenBrainzPlugin(BeetsPlugin):
"""A Beets plugin for interacting with ListenBrainz."""
data_source = "ListenBrainz"
ROOT = "http://api.listenbrainz.org/1/"
def __init__(self):
"""Initialize the plugin."""
super().__init__()
self.token = self.config["token"].get()
self.username = self.config["username"].get()
self.AUTH_HEADER = {"Authorization": f"Token {self.token}"}
config["listenbrainz"]["token"].redact = True
def commands(self):
"""Add beet UI commands to interact with ListenBrainz."""
lbupdate_cmd = ui.Subcommand(
"lbimport", help=f"Import {self.data_source} history"
)
def func(lib, opts, args):
self._lbupdate(lib, self._log)
lbupdate_cmd.func = func
return [lbupdate_cmd]
def _lbupdate(self, lib, log):
"""Obtain view count from Listenbrainz."""
found_total = 0
unknown_total = 0
ls = self.get_listens()
tracks = self.get_tracks_from_listens(ls)
log.info(f"Found {len(ls)} listens")
if tracks:
found, unknown = process_tracks(lib, tracks, log)
found_total += found
unknown_total += unknown
log.info("... done!")
log.info("{0} unknown play-counts", unknown_total)
log.info("{0} play-counts imported", found_total)
def _make_request(self, url, params=None):
"""Makes a request to the ListenBrainz API."""
try:
response = requests.get(
url=url,
headers=self.AUTH_HEADER,
timeout=10,
params=params,
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self._log.debug(f"Invalid Search Error: {e}")
return None
def get_listens(self, min_ts=None, max_ts=None, count=None):
"""Gets the listen history of a given user.
Args:
username: User to get listen history of.
min_ts: History before this timestamp will not be returned.
DO NOT USE WITH max_ts.
max_ts: History after this timestamp will not be returned.
DO NOT USE WITH min_ts.
count: How many listens to return. If not specified,
uses a default from the server.
Returns:
A list of listen info dictionaries if there's an OK status.
Raises:
An HTTPError if there's a failure.
A ValueError if the JSON in the response is invalid.
An IndexError if the JSON is not structured as expected.
"""
url = f"{self.ROOT}/user/{self.username}/listens"
params = {
k: v
for k, v in {
"min_ts": min_ts,
"max_ts": max_ts,
"count": count,
}.items()
if v is not None
}
response = self._make_request(url, params)
if response is not None:
return response["payload"]["listens"]
else:
return None
def get_tracks_from_listens(self, listens):
"""Returns a list of tracks from a list of listens."""
tracks = []
for track in listens:
if track["track_metadata"].get("release_name") is None:
continue
mbid_mapping = track["track_metadata"].get("mbid_mapping", {})
# print(json.dumps(track, indent=4, sort_keys=True))
if mbid_mapping.get("recording_mbid") is None:
# search for the track using title and release
mbid = self.get_mb_recording_id(track)
tracks.append(
{
"album": {
"name": track["track_metadata"].get("release_name")
},
"name": track["track_metadata"].get("track_name"),
"artist": {
"name": track["track_metadata"].get("artist_name")
},
"mbid": mbid,
"release_mbid": mbid_mapping.get("release_mbid"),
"listened_at": track.get("listened_at"),
}
)
return tracks
def get_mb_recording_id(self, track):
"""Returns the MusicBrainz recording ID for a track."""
resp = musicbrainzngs.search_recordings(
query=track["track_metadata"].get("track_name"),
release=track["track_metadata"].get("release_name"),
strict=True,
)
if resp.get("recording-count") == "1":
return resp.get("recording-list")[0].get("id")
else:
return None
def get_playlists_createdfor(self, username):
"""Returns a list of playlists created by a user."""
url = f"{self.ROOT}/user/{username}/playlists/createdfor"
return self._make_request(url)
def get_listenbrainz_playlists(self):
"""Returns a list of playlists created by ListenBrainz."""
import re
resp = self.get_playlists_createdfor(self.username)
playlists = resp.get("playlists")
listenbrainz_playlists = []
for playlist in playlists:
playlist_info = playlist.get("playlist")
if playlist_info.get("creator") == "listenbrainz":
title = playlist_info.get("title")
match = re.search(
r"(Missed Recordings of \d{4}|Discoveries of \d{4})", title
)
if "Exploration" in title:
playlist_type = "Exploration"
elif "Jams" in title:
playlist_type = "Jams"
elif match:
playlist_type = match.group(1)
else:
playlist_type = None
if "week of " in title:
date_str = title.split("week of ")[1].split(" ")[0]
date = datetime.datetime.strptime(
date_str, "%Y-%m-%d"
).date()
else:
date = None
identifier = playlist_info.get("identifier")
id = identifier.split("/")[-1]
if playlist_type in ["Jams", "Exploration"]:
listenbrainz_playlists.append(
{
"type": playlist_type,
"date": date,
"identifier": id,
"title": title,
}
)
return listenbrainz_playlists
def get_playlist(self, identifier):
"""Returns a playlist."""
url = f"{self.ROOT}/playlist/{identifier}"
return self._make_request(url)
def get_tracks_from_playlist(self, playlist):
"""This function returns a list of tracks in the playlist."""
tracks = []
for track in playlist.get("playlist").get("track"):
tracks.append(
{
"artist": track.get("creator"),
"identifier": track.get("identifier").split("/")[-1],
"title": track.get("title"),
}
)
return self.get_track_info(tracks)
def get_track_info(self, tracks):
"""Returns a list of track info."""
track_info = []
for track in tracks:
identifier = track.get("identifier")
resp = musicbrainzngs.get_recording_by_id(
identifier, includes=["releases", "artist-credits"]
)
recording = resp.get("recording")
title = recording.get("title")
artist_credit = recording.get("artist-credit", [])
if artist_credit:
artist = artist_credit[0].get("artist", {}).get("name")
else:
artist = None
releases = recording.get("release-list", [])
if releases:
album = releases[0].get("title")
date = releases[0].get("date")
year = date.split("-")[0] if date else None
else:
album = None
year = None
track_info.append(
{
"identifier": identifier,
"title": title,
"artist": artist,
"album": album,
"year": year,
}
)
return track_info
def get_weekly_playlist(self, index):
"""Returns a list of weekly playlists based on the index."""
playlists = self.get_listenbrainz_playlists()
playlist = self.get_playlist(playlists[index].get("identifier"))
self._log.info(f"Getting {playlist.get('playlist').get('title')}")
return self.get_tracks_from_playlist(playlist)
def get_weekly_exploration(self):
"""Returns a list of weekly exploration."""
return self.get_weekly_playlist(0)
def get_weekly_jams(self):
"""Returns a list of weekly jams."""
return self.get_weekly_playlist(1)
def get_last_weekly_exploration(self):
"""Returns a list of weekly exploration."""
return self.get_weekly_playlist(3)
def get_last_weekly_jams(self):
"""Returns a list of weekly jams."""
return self.get_weekly_playlist(3)

View file

@ -5,6 +5,7 @@ like the following in your config.yaml to configure:
file: 644
dir: 755
"""
import os
import stat

View file

@ -819,9 +819,9 @@ class GStreamerBackend(Backend):
self._files = [i.path for i in items]
# FIXME: Turn this into DefaultDict[bytes, Gain]
self._file_tags: DefaultDict[
bytes, Dict[str, float]
] = collections.defaultdict(dict)
self._file_tags: DefaultDict[bytes, Dict[str, float]] = (
collections.defaultdict(dict)
)
self._rg.set_property("reference-level", target_level)
@ -930,9 +930,9 @@ class GStreamerBackend(Backend):
tag
)[1]
elif tag == self.Gst.TAG_REFERENCE_LEVEL:
self._file_tags[self._file][
"REFERENCE_LEVEL"
] = taglist.get_double(tag)[1]
self._file_tags[self._file]["REFERENCE_LEVEL"] = (
taglist.get_double(tag)[1]
)
tags.foreach(handle_tag, None)

View file

@ -17,6 +17,8 @@ Major new features:
New features:
* :doc:`/plugins/listenbrainz`: Add initial support for importing history and playlists from `ListenBrainz`
:bug:`1719`
* :doc:`plugins/mbsubmit`: add new prompt choices helping further to submit unmatched tracks to MusicBrainz faster.
* :doc:`plugins/spotify`: We now fetch track's ISRC, EAN, and UPC identifiers from Spotify when using the ``spotifysync`` command.
:bug:`4992`
@ -158,6 +160,7 @@ Bug fixes:
* :doc:`/plugins/spotify`: Prevent Spotify errors caused by long artist search strings.
:bug:`4893`
* :doc:`/plugins/lastimport`: Improve error handling in the `process_tracks` function and enable it to be used with other plugins.
* :doc:`/plugins/spotify`: Improve handling of ConnectionError.
* :doc:`/plugins/deezer`: Improve Deezer plugin error handling and set requests timeout to 10 seconds.
:bug:`4983`
@ -287,6 +290,7 @@ Bug fixes:
variant of `awk` installed and required specific settings for `sqlite3`
and caching in `zsh`.
:bug:`3546`
* Remove unused functions :bug:`5103`
For plugin developers:

View file

@ -30,6 +30,9 @@ Beets works on Python 3.7 or later.
beets``. (There's also a bleeding-edge `dev package <AUR_>`_ in the AUR, which will
probably set your computer on fire.)
* On **Alpine Linux**, `beets is in the community repository <Alpine package_>`_
and can be installed with ``apk add beets``.
* For **Gentoo Linux**, beets is in Portage as ``media-sound/beets``. Just run
``emerge beets`` to install. There are several USE flags available for
optional plugin dependencies.
@ -54,6 +57,7 @@ Beets works on Python 3.7 or later.
.. _Ubuntu details: https://launchpad.net/ubuntu/+source/beets
.. _OpenBSD: http://openports.se/audio/beets
.. _Arch community: https://www.archlinux.org/packages/community/any/beets/
.. _Alpine package: https://pkgs.alpinelinux.org/package/edge/community/x86_64/beets
.. _NixOS: https://github.com/NixOS/nixpkgs/tree/master/pkgs/tools/audio/beets
.. _MacPorts: https://www.macports.org

View file

@ -101,6 +101,7 @@ following to your configuration::
lastgenre
lastimport
limit
listenbrainz
loadext
lyrics
mbcollection

View file

@ -0,0 +1,31 @@
.. _listenbrainz:
ListenBrainz Plugin
===================
The ListenBrainz plugin for beets allows you to interact with the ListenBrainz service.
Installation
------------
To enable the ListenBrainz plugin, add the following to your beets configuration file (`config.yaml`):
.. code-block:: yaml
plugins:
- listenbrainz
You can then configure the plugin by providing your Listenbrainz token (see intructions `here`_`)and username::
listenbrainz:
token: TOKEN
username: LISTENBRAINZ_USERNAME
Usage
-----
Once the plugin is enabled, you can import the listening history using the `lbimport` command in beets.
.. _here: https://listenbrainz.readthedocs.io/en/latest/users/api/index.html#get-the-user-token

View file

@ -27,8 +27,9 @@ ignore =
C901,
# Exception subclasses should be named with an Error suffix
N818,
# Exclude rule for black compatibility
# Exclude rules for black compatibility
E203,
E704,
per-file-ignores =
./beet:D
./docs/conf.py:D

View file

@ -133,6 +133,31 @@ class AdvancedRewritePluginTest(unittest.TestCase, TestHelper):
):
self.load_plugins(PLUGIN_NAME)
def test_combined_rewrite_example(self):
self.config[PLUGIN_NAME] = [
{"artist A": "B"},
{
"match": "album:'C'",
"replacements": {
"artist": "D",
},
},
]
self.load_plugins(PLUGIN_NAME)
item = self.add_item(
artist="A",
albumartist="A",
)
self.assertEqual(item.artist, "B")
item = self.add_item(
artist="C",
albumartist="C",
album="C",
)
self.assertEqual(item.artist, "D")
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View file

@ -1,6 +1,5 @@
"""Tests for the 'ihate' plugin"""
import unittest
from beets import importer

View file

@ -89,12 +89,16 @@ class LyricsPluginTest(unittest.TestCase):
("CHVRCHΞS", ["song"]), list(lyrics.search_pairs(item))[0]
)
item = Item(artist="横山克", title="song", artist_sort="Masaru Yokoyama")
item = Item(
artist="横山克", title="song", artist_sort="Masaru Yokoyama"
)
self.assertIn(("横山克", ["song"]), lyrics.search_pairs(item))
self.assertIn(("Masaru Yokoyama", ["song"]), lyrics.search_pairs(item))
# Make sure that the original artist name is still the first entry
self.assertEqual(("横山克", ["song"]), list(lyrics.search_pairs(item))[0])
self.assertEqual(
("横山克", ["song"]), list(lyrics.search_pairs(item))[0]
)
def test_search_pairs_multi_titles(self):
item = Item(title="1 / 2", artist="A")

View file

@ -1,6 +1,5 @@
"""Tests for the 'spotify' plugin"""
import os
import unittest
from urllib.parse import parse_qs, urlparse

View file

@ -1,6 +1,5 @@
"""Tests for the 'subsonic' plugin."""
import unittest
from urllib.parse import parse_qs, urlparse

View file

@ -1,6 +1,5 @@
"""Tests for the 'the' plugin"""
import unittest
from beets import config

View file

@ -1,6 +1,5 @@
"""Tests for the 'web' plugin"""
import json
import os.path
import platform

View file

@ -1,6 +1,5 @@
"""Tests for the 'zero' plugin"""
import unittest
from mediafile import MediaFile

View file

@ -71,12 +71,12 @@ class MetaSyncTest(_common.TestCase, TestHelper):
items[1].album = "An Awesome Wave"
if _is_windows():
items[
0
].path = "G:\\Music\\Alt-J\\An Awesome Wave\\03 Tessellate.mp3"
items[
1
].path = "G:\\Music\\Alt-J\\An Awesome Wave\\04 Breezeblocks.mp3"
items[0].path = (
"G:\\Music\\Alt-J\\An Awesome Wave\\03 Tessellate.mp3"
)
items[1].path = (
"G:\\Music\\Alt-J\\An Awesome Wave\\04 Breezeblocks.mp3"
)
else:
items[0].path = "/Music/Alt-J/An Awesome Wave/03 Tessellate.mp3"
items[1].path = "/Music/Alt-J/An Awesome Wave/04 Breezeblocks.mp3"

View file

@ -52,8 +52,8 @@ commands = python -bb -m pytest {posargs}
[testenv:format]
deps =
isort
black
isort==5.13.2
black==24.2.0
skip_install = True
commands =
isort beets beetsplug test
@ -61,8 +61,8 @@ commands =
[testenv:format_check]
deps =
isort
black
isort==5.13.2
black==24.2.0
skip_install = True
commands =
isort beets beetsplug test --check