Merge branch 'master' into fix-permission-error

This commit is contained in:
Dr.Blank 2025-03-25 12:25:32 +05:30 committed by GitHub
commit 9972041e4b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 954 additions and 531 deletions

View file

@ -74,22 +74,38 @@ class MusicBrainzAPIError(util.HumanReadableError):
log = logging.getLogger("beets")
RELEASE_INCLUDES = [
"artists",
"media",
"recordings",
"release-groups",
"labels",
"artist-credits",
"aliases",
"recording-level-rels",
"work-rels",
"work-level-rels",
"artist-rels",
"isrcs",
"url-rels",
"release-rels",
]
RELEASE_INCLUDES = list(
{
"artists",
"media",
"recordings",
"release-groups",
"labels",
"artist-credits",
"aliases",
"recording-level-rels",
"work-rels",
"work-level-rels",
"artist-rels",
"isrcs",
"url-rels",
"release-rels",
"tags",
}
& set(musicbrainzngs.VALID_INCLUDES["release"])
)
TRACK_INCLUDES = list(
{
"artists",
"aliases",
"isrcs",
"work-level-rels",
"artist-rels",
}
& set(musicbrainzngs.VALID_INCLUDES["recording"])
)
BROWSE_INCLUDES = [
"artist-credits",
"work-rels",
@ -101,11 +117,6 @@ if "work-level-rels" in musicbrainzngs.VALID_BROWSE_INCLUDES["recording"]:
BROWSE_INCLUDES.append("work-level-rels")
BROWSE_CHUNKSIZE = 100
BROWSE_MAXTRACKS = 500
TRACK_INCLUDES = ["artists", "aliases", "isrcs"]
if "work-level-rels" in musicbrainzngs.VALID_INCLUDES["recording"]:
TRACK_INCLUDES += ["work-level-rels", "artist-rels"]
if "genres" in musicbrainzngs.VALID_INCLUDES["recording"]:
RELEASE_INCLUDES += ["genres"]
def track_url(trackid: str) -> str:
@ -607,8 +618,8 @@ def album_info(release: dict) -> beets.autotag.hooks.AlbumInfo:
if config["musicbrainz"]["genres"]:
sources = [
release["release-group"].get("genre-list", []),
release.get("genre-list", []),
release["release-group"].get("tag-list", []),
release.get("tag-list", []),
]
genres: Counter[str] = Counter()
for source in sources:

View file

@ -12,11 +12,12 @@
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
"""Provides the basic, interface-agnostic workflow for importing and
autotagging music files.
"""
from __future__ import annotations
import itertools
import os
import pickle
@ -25,9 +26,10 @@ import shutil
import time
from bisect import bisect_left, insort
from collections import defaultdict
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
from tempfile import mkdtemp
from typing import Callable, Iterable, Sequence
import mediafile
@ -49,8 +51,7 @@ action = Enum("action", ["SKIP", "ASIS", "TRACKS", "APPLY", "ALBUMS", "RETAG"])
QUEUE_SIZE = 128
SINGLE_ARTIST_THRESH = 0.25
PROGRESS_KEY = "tagprogress"
HISTORY_KEY = "taghistory"
# Usually flexible attributes are preserved (i.e., not updated) during
# reimports. The following two lists (globally) change this behaviour for
# certain fields. To alter these lists only when a specific plugin is in use,
@ -73,6 +74,10 @@ REIMPORT_FRESH_FIELDS_ITEM = list(REIMPORT_FRESH_FIELDS_ALBUM)
# Global logger.
log = logging.getLogger("beets")
# Here for now to allow for a easy replace later on
# once we can move to a PathLike
PathBytes = bytes
class ImportAbortError(Exception):
"""Raised when the user aborts the tagging operation."""
@ -80,117 +85,115 @@ class ImportAbortError(Exception):
pass
# Utilities.
@dataclass
class ImportState:
"""Representing the progress of an import task.
Opens the state file on creation of the class. If you want
to ensure the state is written to disk, you should use the
context manager protocol.
def _open_state():
"""Reads the state file, returning a dictionary."""
try:
with open(config["statefile"].as_filename(), "rb") as f:
return pickle.load(f)
except Exception as exc:
# The `pickle` module can emit all sorts of exceptions during
# unpickling, including ImportError. We use a catch-all
# exception to avoid enumerating them all (the docs don't even have a
# full list!).
log.debug("state file could not be read: {0}", exc)
return {}
Tagprogress allows long tagging tasks to be resumed when they pause.
Taghistory is a utility for manipulating the "incremental" import log.
This keeps track of all directories that were ever imported, which
allows the importer to only import new stuff.
def _save_state(state):
"""Writes the state dictionary out to disk."""
try:
with open(config["statefile"].as_filename(), "wb") as f:
pickle.dump(state, f)
except OSError as exc:
log.error("state file could not be written: {0}", exc)
Usage
-----
```
# Readonly
progress = ImportState().tagprogress
# Utilities for reading and writing the beets progress file, which
# allows long tagging tasks to be resumed when they pause (or crash).
def progress_read():
state = _open_state()
return state.setdefault(PROGRESS_KEY, {})
@contextmanager
def progress_write():
state = _open_state()
progress = state.setdefault(PROGRESS_KEY, {})
yield progress
_save_state(state)
def progress_add(toppath, *paths):
"""Record that the files under all of the `paths` have been imported
under `toppath`.
# Read and write
with ImportState() as state:
state["key"] = "value"
```
"""
with progress_write() as state:
imported = state.setdefault(toppath, [])
for path in paths:
# Normally `progress_add` will be called with the path
# argument increasing. This is because of the ordering in
# `albums_in_dir`. We take advantage of that to make the
# code faster
if imported and imported[len(imported) - 1] <= path:
imported.append(path)
else:
insort(imported, path)
tagprogress: dict[PathBytes, list[PathBytes]]
taghistory: set[tuple[PathBytes, ...]]
path: PathBytes
def progress_element(toppath, path):
"""Return whether `path` has been imported in `toppath`."""
state = progress_read()
if toppath not in state:
return False
imported = state[toppath]
i = bisect_left(imported, path)
return i != len(imported) and imported[i] == path
def __init__(self, readonly=False, path: PathBytes | None = None):
self.path = path or os.fsencode(config["statefile"].as_filename())
self.tagprogress = {}
self.taghistory = set()
self._open()
def __enter__(self):
return self
def has_progress(toppath):
"""Return `True` if there exist paths that have already been
imported under `toppath`.
"""
state = progress_read()
return toppath in state
def __exit__(self, exc_type, exc_val, exc_tb):
self._save()
def _open(
self,
):
try:
with open(self.path, "rb") as f:
state = pickle.load(f)
# Read the states
self.tagprogress = state.get("tagprogress", {})
self.taghistory = state.get("taghistory", set())
except Exception as exc:
# The `pickle` module can emit all sorts of exceptions during
# unpickling, including ImportError. We use a catch-all
# exception to avoid enumerating them all (the docs don't even have a
# full list!).
log.debug("state file could not be read: {0}", exc)
def progress_reset(toppath):
with progress_write() as state:
if toppath in state:
del state[toppath]
def _save(self):
try:
with open(self.path, "wb") as f:
pickle.dump(
{
"tagprogress": self.tagprogress,
"taghistory": self.taghistory,
},
f,
)
except OSError as exc:
log.error("state file could not be written: {0}", exc)
# -------------------------------- Tagprogress ------------------------------- #
# Similarly, utilities for manipulating the "incremental" import log.
# This keeps track of all directories that were ever imported, which
# allows the importer to only import new stuff.
def progress_add(self, toppath: PathBytes, *paths: PathBytes):
"""Record that the files under all of the `paths` have been imported
under `toppath`.
"""
with self as state:
imported = state.tagprogress.setdefault(toppath, [])
for path in paths:
if imported and imported[-1] <= path:
imported.append(path)
else:
insort(imported, path)
def progress_has_element(self, toppath: PathBytes, path: PathBytes) -> bool:
"""Return whether `path` has been imported in `toppath`."""
imported = self.tagprogress.get(toppath, [])
i = bisect_left(imported, path)
return i != len(imported) and imported[i] == path
def history_add(paths):
"""Indicate that the import of the album in `paths` is completed and
should not be repeated in incremental imports.
"""
state = _open_state()
if HISTORY_KEY not in state:
state[HISTORY_KEY] = set()
def progress_has(self, toppath: PathBytes) -> bool:
"""Return `True` if there exist paths that have already been
imported under `toppath`.
"""
return toppath in self.tagprogress
state[HISTORY_KEY].add(tuple(paths))
def progress_reset(self, toppath: PathBytes | None):
"""Reset the progress for `toppath`."""
with self as state:
if toppath in state.tagprogress:
del state.tagprogress[toppath]
_save_state(state)
# -------------------------------- Taghistory -------------------------------- #
def history_get():
"""Get the set of completed path tuples in incremental imports."""
state = _open_state()
if HISTORY_KEY not in state:
return set()
return state[HISTORY_KEY]
# Abstract session class.
def history_add(self, paths: list[PathBytes]):
"""Add the paths to the history."""
with self as state:
state.taghistory.add(tuple(paths))
class ImportSession:
@ -198,24 +201,46 @@ class ImportSession:
communicate with the user or otherwise make decisions.
"""
def __init__(self, lib, loghandler, paths, query):
"""Create a session. `lib` is a Library object. `loghandler` is a
logging.Handler. Either `paths` or `query` is non-null and indicates
the source of files to be imported.
logger: logging.Logger
paths: list[PathBytes]
lib: library.Library
_is_resuming: dict[bytes, bool]
_merged_items: set[PathBytes]
_merged_dirs: set[PathBytes]
def __init__(
self,
lib: library.Library,
loghandler: logging.Handler | None,
paths: Sequence[PathBytes] | None,
query: dbcore.Query | None,
):
"""Create a session.
Parameters
----------
lib : library.Library
The library instance to which items will be imported.
loghandler : logging.Handler or None
A logging handler to use for the session's logger. If None, a
NullHandler will be used.
paths : os.PathLike or None
The paths to be imported.
query : dbcore.Query or None
A query to filter items for import.
"""
self.lib = lib
self.logger = self._setup_logging(loghandler)
self.paths = paths
self.query = query
self._is_resuming = {}
self._merged_items = set()
self._merged_dirs = set()
# Normalize the paths.
if self.paths:
self.paths = list(map(normpath, self.paths))
self.paths = list(map(normpath, paths or []))
def _setup_logging(self, loghandler):
def _setup_logging(self, loghandler: logging.Handler | None):
logger = logging.getLogger(__name__)
logger.propagate = False
if not loghandler:
@ -275,13 +300,13 @@ class ImportSession:
self.want_resume = config["resume"].as_choice([True, False, "ask"])
def tag_log(self, status, paths):
def tag_log(self, status, paths: Sequence[PathBytes]):
"""Log a message about a given album to the importer log. The status
should reflect the reason the album couldn't be tagged.
"""
self.logger.info("{0} {1}", status, displayable_path(paths))
def log_choice(self, task, duplicate=False):
def log_choice(self, task: ImportTask, duplicate=False):
"""Logs the task's current choice if it should be logged. If
``duplicate``, then this is a secondary choice after a duplicate was
detected and a decision was made.
@ -302,16 +327,16 @@ class ImportSession:
elif task.choice_flag is action.SKIP:
self.tag_log("skip", paths)
def should_resume(self, path):
def should_resume(self, path: PathBytes):
raise NotImplementedError
def choose_match(self, task):
def choose_match(self, task: ImportTask):
raise NotImplementedError
def resolve_duplicate(self, task, found_duplicates):
def resolve_duplicate(self, task: ImportTask, found_duplicates):
raise NotImplementedError
def choose_item(self, task):
def choose_item(self, task: ImportTask):
raise NotImplementedError
def run(self):
@ -366,12 +391,12 @@ class ImportSession:
# Incremental and resumed imports
def already_imported(self, toppath, paths):
def already_imported(self, toppath: PathBytes, paths: Sequence[PathBytes]):
"""Returns true if the files belonging to this task have already
been imported in a previous session.
"""
if self.is_resuming(toppath) and all(
[progress_element(toppath, p) for p in paths]
[ImportState().progress_has_element(toppath, p) for p in paths]
):
return True
if self.config["incremental"] and tuple(paths) in self.history_dirs:
@ -379,13 +404,16 @@ class ImportSession:
return False
_history_dirs = None
@property
def history_dirs(self):
if not hasattr(self, "_history_dirs"):
self._history_dirs = history_get()
def history_dirs(self) -> set[tuple[PathBytes, ...]]:
# FIXME: This could be simplified to a cached property
if self._history_dirs is None:
self._history_dirs = ImportState().taghistory
return self._history_dirs
def already_merged(self, paths):
def already_merged(self, paths: Sequence[PathBytes]):
"""Returns true if all the paths being imported were part of a merge
during previous tasks.
"""
@ -394,7 +422,7 @@ class ImportSession:
return False
return True
def mark_merged(self, paths):
def mark_merged(self, paths: Sequence[PathBytes]):
"""Mark paths and directories as merged for future reimport tasks."""
self._merged_items.update(paths)
dirs = {
@ -403,20 +431,20 @@ class ImportSession:
}
self._merged_dirs.update(dirs)
def is_resuming(self, toppath):
def is_resuming(self, toppath: PathBytes):
"""Return `True` if user wants to resume import of this path.
You have to call `ask_resume` first to determine the return value.
"""
return self._is_resuming.get(toppath, False)
def ask_resume(self, toppath):
def ask_resume(self, toppath: PathBytes):
"""If import of `toppath` was aborted in an earlier session, ask
user if they want to resume the import.
Determines the return value of `is_resuming(toppath)`.
"""
if self.want_resume and has_progress(toppath):
if self.want_resume and ImportState().progress_has(toppath):
# Either accept immediately or prompt for input to decide.
if self.want_resume is True or self.should_resume(toppath):
log.warning(
@ -426,7 +454,7 @@ class ImportSession:
self._is_resuming[toppath] = True
else:
# Clear progress; we're starting from the top.
progress_reset(toppath)
ImportState().progress_reset(toppath)
# The importer task class.
@ -438,7 +466,16 @@ class BaseImportTask:
Tasks flow through the importer pipeline. Each stage can update
them."""
def __init__(self, toppath, paths, items):
toppath: PathBytes | None
paths: list[PathBytes]
items: list[library.Item]
def __init__(
self,
toppath: PathBytes | None,
paths: Iterable[PathBytes] | None,
items: Iterable[library.Item] | None,
):
"""Create a task. The primary fields that define a task are:
* `toppath`: The user-specified base directory that contains the
@ -456,8 +493,8 @@ class BaseImportTask:
These fields should not change after initialization.
"""
self.toppath = toppath
self.paths = paths
self.items = items
self.paths = list(paths) if paths is not None else []
self.items = list(items) if items is not None else []
class ImportTask(BaseImportTask):
@ -492,24 +529,39 @@ class ImportTask(BaseImportTask):
system.
"""
def __init__(self, toppath, paths, items):
choice_flag: action | None = None
match: autotag.AlbumMatch | autotag.TrackMatch | None = None
# Keep track of the current task item
cur_album: str | None = None
cur_artist: str | None = None
candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = []
def __init__(
self,
toppath: PathBytes | None,
paths: Iterable[PathBytes] | None,
items: Iterable[library.Item] | None,
):
super().__init__(toppath, paths, items)
self.choice_flag = None
self.cur_album = None
self.cur_artist = None
self.candidates = []
self.rec = None
self.should_remove_duplicates = False
self.should_merge_duplicates = False
self.is_album = True
self.search_ids = [] # user-supplied candidate IDs.
def set_choice(self, choice):
def set_choice(
self, choice: action | autotag.AlbumMatch | autotag.TrackMatch
):
"""Given an AlbumMatch or TrackMatch object or an action constant,
indicates that an action has been selected for this task.
Album and trackmatch are implemented as tuples, so we can't
use isinstance to check for them.
"""
# Not part of the task structure:
assert choice != action.APPLY # Only used internally.
if choice in (
action.SKIP,
action.ASIS,
@ -517,23 +569,23 @@ class ImportTask(BaseImportTask):
action.ALBUMS,
action.RETAG,
):
self.choice_flag = choice
# TODO: redesign to stricten the type
self.choice_flag = choice # type: ignore[assignment]
self.match = None
else:
self.choice_flag = action.APPLY # Implicit choice.
self.match = choice
self.match = choice # type: ignore[assignment]
def save_progress(self):
"""Updates the progress state to indicate that this album has
finished.
"""
if self.toppath:
progress_add(self.toppath, *self.paths)
ImportState().progress_add(self.toppath, *self.paths)
def save_history(self):
"""Save the directory in the history for incremental imports."""
if self.paths:
history_add(self.paths)
ImportState().history_add(self.paths)
# Logical decisions.
@ -556,7 +608,7 @@ class ImportTask(BaseImportTask):
if self.choice_flag in (action.ASIS, action.RETAG):
likelies, consensus = autotag.current_metadata(self.items)
return likelies
elif self.choice_flag is action.APPLY:
elif self.choice_flag is action.APPLY and self.match:
return self.match.info.copy()
assert False
@ -568,7 +620,9 @@ class ImportTask(BaseImportTask):
"""
if self.choice_flag in (action.ASIS, action.RETAG):
return list(self.items)
elif self.choice_flag == action.APPLY:
elif self.choice_flag == action.APPLY and isinstance(
self.match, autotag.AlbumMatch
):
return list(self.match.mapping.keys())
else:
assert False
@ -581,13 +635,13 @@ class ImportTask(BaseImportTask):
autotag.apply_metadata(self.match.info, self.match.mapping)
def duplicate_items(self, lib):
def duplicate_items(self, lib: library.Library):
duplicate_items = []
for album in self.find_duplicates(lib):
duplicate_items += album.items()
return duplicate_items
def remove_duplicates(self, lib):
def remove_duplicates(self, lib: library.Library):
duplicate_items = self.duplicate_items(lib)
log.debug("removing {0} old duplicated items", len(duplicate_items))
for item in duplicate_items:
@ -599,7 +653,7 @@ class ImportTask(BaseImportTask):
util.remove(item.path)
util.prune_dirs(os.path.dirname(item.path), lib.directory)
def set_fields(self, lib):
def set_fields(self, lib: library.Library):
"""Sets the fields given at CLI or configuration to the specified
values, for both the album and all its items.
"""
@ -620,7 +674,7 @@ class ImportTask(BaseImportTask):
item.store()
self.album.store()
def finalize(self, session):
def finalize(self, session: ImportSession):
"""Save progress, clean up files, and emit plugin event."""
# Update progress.
if session.want_resume:
@ -654,7 +708,7 @@ class ImportTask(BaseImportTask):
for old_path in self.old_paths:
# Only delete files that were actually copied.
if old_path not in new_paths:
util.remove(syspath(old_path), False)
util.remove(old_path, False)
self.prune(old_path)
# When moving, prune empty directories containing the original files.
@ -662,10 +716,10 @@ class ImportTask(BaseImportTask):
for old_path in self.old_paths:
self.prune(old_path)
def _emit_imported(self, lib):
def _emit_imported(self, lib: library.Library):
plugins.send("album_imported", lib=lib, album=self.album)
def handle_created(self, session):
def handle_created(self, session: ImportSession):
"""Send the `import_task_created` event for this task. Return a list of
tasks that should continue through the pipeline. By default, this is a
list containing only the task itself, but plugins can replace the task
@ -692,7 +746,7 @@ class ImportTask(BaseImportTask):
self.candidates = prop.candidates
self.rec = prop.recommendation
def find_duplicates(self, lib):
def find_duplicates(self, lib: library.Library):
"""Return a list of albums from `lib` with the same artist and
album name as the task.
"""
@ -706,7 +760,9 @@ class ImportTask(BaseImportTask):
# Construct a query to find duplicates with this metadata. We
# use a temporary Album object to generate any computed fields.
tmp_album = library.Album(lib, **info)
keys = config["import"]["duplicate_keys"]["album"].as_str_seq()
keys: list[str] = config["import"]["duplicate_keys"][
"album"
].as_str_seq()
dup_query = tmp_album.duplicates_query(keys)
# Don't count albums with the same files as duplicates.
@ -764,19 +820,25 @@ class ImportTask(BaseImportTask):
for item in self.items:
item.update(changes)
def manipulate_files(self, operation=None, write=False, session=None):
def manipulate_files(
self,
session: ImportSession,
operation: MoveOperation | None = None,
write=False,
):
"""Copy, move, link, hardlink or reflink (depending on `operation`)
the files as well as write metadata.
`operation` should be an instance of `util.MoveOperation`.
If `write` is `True` metadata is written to the files.
# TODO: Introduce a MoveOperation.NONE or SKIP
"""
items = self.imported_items()
# Save the original paths of all items for deletion and pruning
# in the next step (finalization).
self.old_paths = [item.path for item in items]
self.old_paths: list[PathBytes] = [item.path for item in items]
for item in items:
if operation is not None:
# In copy and link modes, treat re-imports specially:
@ -806,7 +868,7 @@ class ImportTask(BaseImportTask):
plugins.send("import_task_files", session=session, task=self)
def add(self, lib):
def add(self, lib: library.Library):
"""Add the items as an album to the library and remove replaced items."""
self.align_album_level_fields()
with lib.transaction():
@ -814,7 +876,9 @@ class ImportTask(BaseImportTask):
self.remove_replaced(lib)
self.album = lib.add_album(self.imported_items())
if self.choice_flag == action.APPLY:
if self.choice_flag == action.APPLY and isinstance(
self.match, autotag.AlbumMatch
):
# Copy album flexible fields to the DB
# TODO: change the flow so we create the `Album` object earlier,
# and we can move this into `self.apply_metadata`, just like
@ -824,12 +888,12 @@ class ImportTask(BaseImportTask):
self.reimport_metadata(lib)
def record_replaced(self, lib):
def record_replaced(self, lib: library.Library):
"""Records the replaced items and albums in the `replaced_items`
and `replaced_albums` dictionaries.
"""
self.replaced_items = defaultdict(list)
self.replaced_albums = defaultdict(list)
self.replaced_albums: dict[PathBytes, library.Album] = defaultdict()
replaced_album_ids = set()
for item in self.imported_items():
dup_items = list(
@ -847,7 +911,7 @@ class ImportTask(BaseImportTask):
replaced_album_ids.add(dup_item.album_id)
self.replaced_albums[replaced_album.path] = replaced_album
def reimport_metadata(self, lib):
def reimport_metadata(self, lib: library.Library):
"""For reimports, preserves metadata for reimported items and
albums.
"""
@ -980,7 +1044,7 @@ class ImportTask(BaseImportTask):
class SingletonImportTask(ImportTask):
"""ImportTask for a single track that is not associated to an album."""
def __init__(self, toppath, item):
def __init__(self, toppath: PathBytes | None, item: library.Item):
super().__init__(toppath, [item.path], [item])
self.item = item
self.is_album = False
@ -1022,7 +1086,9 @@ class SingletonImportTask(ImportTask):
# Query for existing items using the same metadata. We use a
# temporary `Item` object to generate any computed fields.
tmp_item = library.Item(lib, **info)
keys = config["import"]["duplicate_keys"]["item"].as_str_seq()
keys: list[str] = config["import"]["duplicate_keys"][
"item"
].as_str_seq()
dup_query = tmp_item.duplicates_query(keys)
found_items = []
@ -1044,7 +1110,7 @@ class SingletonImportTask(ImportTask):
def infer_album_fields(self):
raise NotImplementedError
def choose_match(self, session):
def choose_match(self, session: ImportSession):
"""Ask the session which match should apply and apply it."""
choice = session.choose_item(self)
self.set_choice(choice)
@ -1092,23 +1158,24 @@ class SentinelImportTask(ImportTask):
pass
def save_progress(self):
if self.paths is None:
if not self.paths:
# "Done" sentinel.
progress_reset(self.toppath)
else:
ImportState().progress_reset(self.toppath)
elif self.toppath:
# "Directory progress" sentinel for singletons
progress_add(self.toppath, *self.paths)
super().save_progress()
def skip(self):
@property
def skip(self) -> bool:
return True
def set_choice(self, choice):
raise NotImplementedError
def cleanup(self, **kwargs):
def cleanup(self, copy=False, delete=False, move=False):
pass
def _emit_imported(self, session):
def _emit_imported(self, lib):
pass
@ -1152,7 +1219,7 @@ class ArchiveImportTask(SentinelImportTask):
implements the same interface as `tarfile.TarFile`.
"""
if not hasattr(cls, "_handlers"):
cls._handlers = []
cls._handlers: list[tuple[Callable, ...]] = []
from zipfile import ZipFile, is_zipfile
cls._handlers.append((is_zipfile, ZipFile))
@ -1174,9 +1241,9 @@ class ArchiveImportTask(SentinelImportTask):
return cls._handlers
def cleanup(self, **kwargs):
def cleanup(self, copy=False, delete=False, move=False):
"""Removes the temporary directory the archive was extracted to."""
if self.extracted:
if self.extracted and self.toppath:
log.debug(
"Removing extracted directory: {0}",
displayable_path(self.toppath),
@ -1187,10 +1254,13 @@ class ArchiveImportTask(SentinelImportTask):
"""Extracts the archive to a temporary directory and sets
`toppath` to that directory.
"""
assert self.toppath is not None, "toppath must be set"
for path_test, handler_class in self.handlers():
if path_test(os.fsdecode(self.toppath)):
break
else:
raise ValueError(f"No handler found for archive: {self.toppath}")
extract_to = mkdtemp()
archive = handler_class(os.fsdecode(self.toppath), mode="r")
try:
@ -1219,7 +1289,7 @@ class ImportTaskFactory:
indicated by a path.
"""
def __init__(self, toppath, session):
def __init__(self, toppath: PathBytes, session: ImportSession):
"""Create a new task factory.
`toppath` is the user-specified path to search for music to
@ -1246,6 +1316,7 @@ class ImportTaskFactory:
extracted data.
"""
# Check whether this is an archive.
archive_task: ArchiveImportTask | None = None
if self.is_archive:
archive_task = self.unarchive()
if not archive_task:
@ -1267,12 +1338,9 @@ class ImportTaskFactory:
# it is finished. This is usually just a SentinelImportTask, but
# for archive imports, send the archive task instead (to remove
# the extracted directory).
if self.is_archive:
yield archive_task
else:
yield self.sentinel()
yield archive_task or self.sentinel()
def _create(self, task):
def _create(self, task: ImportTask | None):
"""Handle a new task to be emitted by the factory.
Emit the `import_task_created` event and increment the
@ -1305,7 +1373,7 @@ class ImportTaskFactory:
for dirs, paths in albums_in_dir(self.toppath):
yield dirs, paths
def singleton(self, path):
def singleton(self, path: PathBytes):
"""Return a `SingletonImportTask` for the music file."""
if self.session.already_imported(self.toppath, [path]):
log.debug(
@ -1320,14 +1388,12 @@ class ImportTaskFactory:
else:
return None
def album(self, paths, dirs=None):
def album(self, paths: Iterable[PathBytes], dirs=None):
"""Return a `ImportTask` with all media files from paths.
`dirs` is a list of parent directories used to record already
imported albums.
"""
if not paths:
return None
if dirs is None:
dirs = list({os.path.dirname(p) for p in paths})
@ -1339,15 +1405,16 @@ class ImportTaskFactory:
self.skipped += 1
return None
items = map(self.read_item, paths)
items = [item for item in items if item]
items: list[library.Item] = [
item for item in map(self.read_item, paths) if item
]
if items:
if len(items) > 0:
return ImportTask(self.toppath, dirs, items)
else:
return None
def sentinel(self, paths=None):
def sentinel(self, paths: Iterable[PathBytes] | None = None):
"""Return a `SentinelImportTask` indicating the end of a
top-level directory import.
"""
@ -1382,7 +1449,7 @@ class ImportTaskFactory:
log.debug("Archive extracted to: {0}", self.toppath)
return archive_task
def read_item(self, path):
def read_item(self, path: PathBytes):
"""Return an `Item` read from the path.
If an item cannot be read, return `None` instead and log an
@ -1425,12 +1492,13 @@ def _extend_pipeline(tasks, *stages):
# Full-album pipeline stages.
def read_tasks(session):
def read_tasks(session: ImportSession):
"""A generator yielding all the albums (as ImportTask objects) found
in the user-specified list of paths. In the case of a singleton
import, yields single-item tasks instead.
"""
skipped = 0
for toppath in session.paths:
# Check whether we need to resume the import.
session.ask_resume(toppath)
@ -1448,7 +1516,7 @@ def read_tasks(session):
log.info("Skipped {0} paths.", skipped)
def query_tasks(session):
def query_tasks(session: ImportSession):
"""A generator that works as a drop-in-replacement for read_tasks.
Instead of finding files from the filesystem, a query is used to
match items from the library.
@ -1478,7 +1546,7 @@ def query_tasks(session):
@pipeline.mutator_stage
def lookup_candidates(session, task):
def lookup_candidates(session: ImportSession, task: ImportTask):
"""A coroutine for performing the initial MusicBrainz lookup for an
album. It accepts lists of Items and yields
(items, cur_artist, cur_album, candidates, rec) tuples. If no match
@ -1500,7 +1568,7 @@ def lookup_candidates(session, task):
@pipeline.stage
def user_query(session, task):
def user_query(session: ImportSession, task: ImportTask):
"""A coroutine for interfacing with the user about the tagging
process.
@ -1571,7 +1639,7 @@ def user_query(session, task):
return task
def resolve_duplicates(session, task):
def resolve_duplicates(session: ImportSession, task: ImportTask):
"""Check if a task conflicts with items or albums already imported
and ask the session to resolve this.
"""
@ -1614,7 +1682,7 @@ def resolve_duplicates(session, task):
@pipeline.mutator_stage
def import_asis(session, task):
def import_asis(session: ImportSession, task: ImportTask):
"""Select the `action.ASIS` choice for all tasks.
This stage replaces the initial_lookup and user_query stages
@ -1628,7 +1696,7 @@ def import_asis(session, task):
apply_choice(session, task)
def apply_choice(session, task):
def apply_choice(session: ImportSession, task: ImportTask):
"""Apply the task's choice to the Album or Item it contains and add
it to the library.
"""
@ -1652,7 +1720,11 @@ def apply_choice(session, task):
@pipeline.mutator_stage
def plugin_stage(session, func, task):
def plugin_stage(
session: ImportSession,
func: Callable[[ImportSession, ImportTask], None],
task: ImportTask,
):
"""A coroutine (pipeline stage) that calls the given function with
each non-skipped import task. These stages occur between applying
metadata changes and moving/copying/writing files.
@ -1669,7 +1741,7 @@ def plugin_stage(session, func, task):
@pipeline.stage
def manipulate_files(session, task):
def manipulate_files(session: ImportSession, task: ImportTask):
"""A coroutine (pipeline stage) that performs necessary file
manipulations *after* items have been added to the library and
finalizes each task.
@ -1694,9 +1766,9 @@ def manipulate_files(session, task):
operation = None
task.manipulate_files(
operation,
write=session.config["write"],
session=session,
operation=operation,
write=session.config["write"],
)
# Progress, cleanup, and event.
@ -1704,7 +1776,7 @@ def manipulate_files(session, task):
@pipeline.stage
def log_files(session, task):
def log_files(session: ImportSession, task: ImportTask):
"""A coroutine (pipeline stage) to log each file to be imported."""
if isinstance(task, SingletonImportTask):
log.info("Singleton: {0}", displayable_path(task.item["path"]))
@ -1714,7 +1786,7 @@ def log_files(session, task):
log.info(" {0}", displayable_path(item["path"]))
def group_albums(session):
def group_albums(session: ImportSession):
"""A pipeline stage that groups the items of each task into albums
using their metadata.
@ -1731,10 +1803,10 @@ def group_albums(session):
if task.skip:
continue
tasks = []
sorted_items = sorted(task.items, key=group)
sorted_items: list[library.Item] = sorted(task.items, key=group)
for _, items in itertools.groupby(sorted_items, group):
items = list(items)
task = ImportTask(task.toppath, [i.path for i in items], items)
l_items = list(items)
task = ImportTask(task.toppath, [i.path for i in l_items], l_items)
tasks += task.handle_created(session)
tasks.append(SentinelImportTask(task.toppath, task.paths))
@ -1753,15 +1825,15 @@ def is_subdir_of_any_in_list(path, dirs):
return any(d in ancestors for d in dirs)
def albums_in_dir(path):
def albums_in_dir(path: PathBytes):
"""Recursively searches the given directory and returns an iterable
of (paths, items) where paths is a list of directories and items is
a list of Items that is probably an album. Specifically, any folder
containing any media files is an album.
"""
collapse_pat = collapse_paths = collapse_items = None
ignore = config["ignore"].as_str_seq()
ignore_hidden = config["ignore_hidden"].get(bool)
ignore: list[str] = config["ignore"].as_str_seq()
ignore_hidden: bool = config["ignore_hidden"].get(bool)
for root, dirs, files in sorted_walk(
path, ignore=ignore, ignore_hidden=ignore_hidden, logger=log

View file

@ -675,7 +675,7 @@ class ImportSessionFixture(ImportSession):
>>> importer.run()
This imports ``/path/to/import`` into `lib`. It skips the first
album and imports thesecond one with metadata from the tags. For the
album and imports the second one with metadata from the tags. For the
remaining albums, the metadata from the autotagger will be applied.
"""

View file

@ -198,8 +198,8 @@ def ancestry(path: AnyStr) -> list[AnyStr]:
def sorted_walk(
path: AnyStr,
ignore: Sequence[bytes] = (),
path: PathLike,
ignore: Sequence[PathLike] = (),
ignore_hidden: bool = False,
logger: Logger | None = None,
) -> Iterator[tuple[bytes, Sequence[bytes], Sequence[bytes]]]:
@ -210,7 +210,9 @@ def sorted_walk(
"""
# Make sure the paths aren't Unicode strings.
bytes_path = bytestring_path(path)
ignore = [bytestring_path(i) for i in ignore]
ignore_bytes = [ # rename prevents mypy variable shadowing issue
bytestring_path(i) for i in ignore
]
# Get all the directories and files at this level.
try:
@ -230,7 +232,7 @@ def sorted_walk(
# Skip ignored filenames.
skip = False
for pat in ignore:
for pat in ignore_bytes:
if fnmatch.fnmatch(base, pat):
if logger:
logger.debug(
@ -257,7 +259,7 @@ def sorted_walk(
# Recurse into directories.
for base in dirs:
cur = os.path.join(bytes_path, base)
yield from sorted_walk(cur, ignore, ignore_hidden, logger)
yield from sorted_walk(cur, ignore_bytes, ignore_hidden, logger)
def path_as_posix(path: bytes) -> bytes:
@ -297,8 +299,8 @@ def fnmatch_all(names: Sequence[bytes], patterns: Sequence[bytes]) -> bool:
def prune_dirs(
path: bytes,
root: bytes | None = None,
path: PathLike,
root: PathLike | None = None,
clutter: Sequence[str] = (".DS_Store", "Thumbs.db"),
):
"""If path is an empty directory, then remove it. Recursively remove
@ -419,12 +421,13 @@ PATH_SEP: bytes = bytestring_path(os.sep)
def displayable_path(
path: BytesOrStr | tuple[BytesOrStr, ...], separator: str = "; "
path: PathLike | Iterable[PathLike], separator: str = "; "
) -> str:
"""Attempts to decode a bytestring path to a unicode object for the
purpose of displaying it to the user. If the `path` argument is a
list or a tuple, the elements are joined with `separator`.
"""
if isinstance(path, (list, tuple)):
return separator.join(displayable_path(p) for p in path)
elif isinstance(path, str):
@ -472,7 +475,7 @@ def samefile(p1: bytes, p2: bytes) -> bool:
return False
def remove(path: bytes, soft: bool = True):
def remove(path: PathLike, soft: bool = True):
"""Remove the file. If `soft`, then no error will be raised if the
file does not exist.
"""
@ -1130,7 +1133,10 @@ def get_temp_filename(
tempdir = get_module_tempdir(module)
tempdir.mkdir(parents=True, exist_ok=True)
_, filename = tempfile.mkstemp(dir=tempdir, prefix=prefix, suffix=suffix)
descriptor, filename = tempfile.mkstemp(
dir=tempdir, prefix=prefix, suffix=suffix
)
os.close(descriptor)
return bytestring_path(filename)

View file

@ -314,6 +314,11 @@ class IMBackend(LocalBackend):
else:
out_str = stdout
# ImageMagick 7.1.1-44 outputs in a different format.
if b"(" in out_str and out_str.endswith(b")"):
# Extract diff from "... (diff)".
out_str = out_str[out_str.index(b"(") + 1 : -1]
try:
phash_diff = float(out_str)
except ValueError:

View file

@ -31,9 +31,14 @@ To do so, pass an iterable of coroutines to the Pipeline constructor
in place of any single coroutine.
"""
from __future__ import annotations
import queue
import sys
from threading import Lock, Thread
from typing import Callable, Generator
from typing_extensions import TypeVar, TypeVarTuple, Unpack
BUBBLE = "__PIPELINE_BUBBLE__"
POISON = "__PIPELINE_POISON__"
@ -149,7 +154,22 @@ def multiple(messages):
return MultiMessage(messages)
def stage(func):
A = TypeVarTuple("A") # Arguments of a function (omitting the task)
T = TypeVar("T") # Type of the task
# Normally these are concatenated i.e. (*args, task)
# Return type of the function (should normally be task but sadly
# we cant enforce this with the current stage functions without
# a refactor)
R = TypeVar("R")
def stage(
func: Callable[
[Unpack[A], T],
R | None,
],
):
"""Decorate a function to become a simple stage.
>>> @stage
@ -163,8 +183,8 @@ def stage(func):
[3, 4, 5]
"""
def coro(*args):
task = None
def coro(*args: Unpack[A]) -> Generator[R | T | None, T, None]:
task: R | T | None = None
while True:
task = yield task
task = func(*(args + (task,)))
@ -172,7 +192,7 @@ def stage(func):
return coro
def mutator_stage(func):
def mutator_stage(func: Callable[[Unpack[A], T], R]):
"""Decorate a function that manipulates items in a coroutine to
become a simple stage.
@ -187,7 +207,7 @@ def mutator_stage(func):
[{'x': True}, {'a': False, 'x': True}]
"""
def coro(*args):
def coro(*args: Unpack[A]) -> Generator[T | None, T, None]:
task = None
while True:
task = yield task

View file

@ -113,3 +113,23 @@ class GoogleCustomSearchAPI:
"""Pagemap data with a single meta tags dict in a list."""
metatags: list[JSONDict]
class TranslatorAPI:
class Language(TypedDict):
"""Language data returned by the translator API."""
language: str
score: float
class Translation(TypedDict):
"""Translation data returned by the translator API."""
text: str
to: str
class Response(TypedDict):
"""Response from the translator API."""
detectedLanguage: TranslatorAPI.Language
translations: list[TranslatorAPI.Translation]

View file

@ -22,6 +22,7 @@ import tempfile
import threading
from string import Template
import mediafile
from confuse import ConfigTypeError, Optional
from beets import art, config, plugins, ui, util
@ -351,6 +352,15 @@ class ConvertPlugin(BeetsPlugin):
item = yield (item, original, converted)
dest = item.destination(basedir=dest_dir, path_formats=path_formats)
# Ensure that desired item is readable before processing it. Needed
# to avoid any side-effect of the conversion (linking, keep_new,
# refresh) if we already know that it will fail.
try:
mediafile.MediaFile(util.syspath(item.path))
except mediafile.UnreadableFileError as exc:
self._log.error("Could not open file to convert: {0}", exc)
continue
# When keeping the new file in the library, we first move the
# current (pristine) file to the destination. We'll then copy it
# back to its old path or transcode it to a new path.

View file

@ -110,7 +110,7 @@ class ListenBrainzPlugin(BeetsPlugin):
if track["track_metadata"].get("release_name") is None:
continue
mbid_mapping = track["track_metadata"].get("mbid_mapping", {})
# print(json.dumps(track, indent=4, sort_keys=True))
mbid = None
if mbid_mapping.get("recording_mbid") is None:
# search for the track using title and release
mbid = self.get_mb_recording_id(track)

View file

@ -17,16 +17,17 @@
from __future__ import annotations
import atexit
import errno
import itertools
import math
import os.path
import re
import textwrap
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from functools import cached_property, partial, total_ordering
from html import unescape
from http import HTTPStatus
from itertools import groupby
from pathlib import Path
from typing import TYPE_CHECKING, Iterable, Iterator, NamedTuple
from urllib.parse import quote, quote_plus, urlencode, urlparse
@ -40,49 +41,22 @@ from beets import plugins, ui
from beets.autotag.hooks import string_dist
if TYPE_CHECKING:
from beets.importer import ImportTask
from beets.library import Item
from logging import Logger
from ._typing import GeniusAPI, GoogleCustomSearchAPI, JSONDict, LRCLibAPI
from beets.importer import ImportTask
from beets.library import Item, Library
from ._typing import (
GeniusAPI,
GoogleCustomSearchAPI,
JSONDict,
LRCLibAPI,
TranslatorAPI,
)
USER_AGENT = f"beets/{beets.__version__}"
INSTRUMENTAL_LYRICS = "[Instrumental]"
# The content for the base index.rst generated in ReST mode.
REST_INDEX_TEMPLATE = """Lyrics
======
* :ref:`Song index <genindex>`
* :ref:`search`
Artist index:
.. toctree::
:maxdepth: 1
:glob:
artists/*
"""
# The content for the base conf.py generated.
REST_CONF_TEMPLATE = """# -*- coding: utf-8 -*-
master_doc = 'index'
project = 'Lyrics'
copyright = 'none'
author = 'Various Authors'
latex_documents = [
(master_doc, 'Lyrics.tex', project,
author, 'manual'),
]
epub_title = project
epub_author = author
epub_publisher = author
epub_copyright = copyright
epub_exclude_files = ['search.html']
epub_tocdepth = 1
epub_tocdup = False
"""
class NotFoundError(requests.exceptions.HTTPError):
pass
@ -252,6 +226,12 @@ class RequestHandler:
self.debug("Fetching JSON from {}", url)
return r_session.get(url, **kwargs).json()
def post_json(self, url: str, params: JSONDict | None = None, **kwargs):
"""Send POST request and return JSON response."""
url = self.format_url(url, params)
self.debug("Posting JSON to {}", url)
return r_session.post(url, **kwargs).json()
@contextmanager
def handle_request(self) -> Iterator[None]:
try:
@ -760,6 +740,214 @@ class Google(SearchBackend):
return None
@dataclass
class Translator(RequestHandler):
TRANSLATE_URL = "https://api.cognitive.microsofttranslator.com/translate"
LINE_PARTS_RE = re.compile(r"^(\[\d\d:\d\d.\d\d\]|) *(.*)$")
SEPARATOR = " | "
remove_translations = partial(re.compile(r" / [^\n]+").sub, "")
_log: Logger
api_key: str
to_language: str
from_languages: list[str]
@classmethod
def from_config(
cls,
log: Logger,
api_key: str,
to_language: str,
from_languages: list[str] | None = None,
) -> Translator:
return cls(
log,
api_key,
to_language.upper(),
[x.upper() for x in from_languages or []],
)
def get_translations(self, texts: Iterable[str]) -> list[tuple[str, str]]:
"""Return translations for the given texts.
To reduce the translation 'cost', we translate unique texts, and then
map the translations back to the original texts.
"""
unique_texts = list(dict.fromkeys(texts))
text = self.SEPARATOR.join(unique_texts)
data: list[TranslatorAPI.Response] = self.post_json(
self.TRANSLATE_URL,
headers={"Ocp-Apim-Subscription-Key": self.api_key},
json=[{"text": text}],
params={"api-version": "3.0", "to": self.to_language},
)
translated_text = data[0]["translations"][0]["text"]
translations = translated_text.split(self.SEPARATOR)
trans_by_text = dict(zip(unique_texts, translations))
return list(zip(texts, (trans_by_text.get(t, "") for t in texts)))
@classmethod
def split_line(cls, line: str) -> tuple[str, str]:
"""Split line to (timestamp, text)."""
if m := cls.LINE_PARTS_RE.match(line):
return m[1], m[2]
return "", ""
def append_translations(self, lines: Iterable[str]) -> list[str]:
"""Append translations to the given lyrics texts.
Lines may contain timestamps from LRCLib which need to be temporarily
removed for the translation. They can take any of these forms:
- empty
Text - text only
[00:00:00] - timestamp only
[00:00:00] Text - timestamp with text
"""
# split into [(timestamp, text), ...]]
ts_and_text = list(map(self.split_line, lines))
timestamps = [ts for ts, _ in ts_and_text]
text_pairs = self.get_translations([ln for _, ln in ts_and_text])
# only add the separator for non-empty translations
texts = [" / ".join(filter(None, p)) for p in text_pairs]
# only add the space between non-empty timestamps and texts
return [" ".join(filter(None, p)) for p in zip(timestamps, texts)]
def translate(self, new_lyrics: str, old_lyrics: str) -> str:
"""Translate the given lyrics to the target language.
Check old lyrics for existing translations and return them if their
original text matches the new lyrics. This is to avoid translating
the same lyrics multiple times.
If the lyrics are already in the target language or not in any of
of the source languages (if configured), they are returned as is.
The footer with the source URL is preserved, if present.
"""
if (
" / " in old_lyrics
and self.remove_translations(old_lyrics) == new_lyrics
):
self.info("🔵 Translations already exist")
return old_lyrics
lyrics_language = langdetect.detect(new_lyrics).upper()
if lyrics_language == self.to_language:
self.info(
"🔵 Lyrics are already in the target language {}",
self.to_language,
)
return new_lyrics
if self.from_languages and lyrics_language not in self.from_languages:
self.info(
"🔵 Configuration {} does not permit translating from {}",
self.from_languages,
lyrics_language,
)
return new_lyrics
lyrics, *url = new_lyrics.split("\n\nSource: ")
with self.handle_request():
translated_lines = self.append_translations(lyrics.splitlines())
self.info("🟢 Translated lyrics to {}", self.to_language)
return "\n\nSource: ".join(["\n".join(translated_lines), *url])
@dataclass
class RestFiles:
# The content for the base index.rst generated in ReST mode.
REST_INDEX_TEMPLATE = textwrap.dedent("""
Lyrics
======
* :ref:`Song index <genindex>`
* :ref:`search`
Artist index:
.. toctree::
:maxdepth: 1
:glob:
artists/*
""").strip()
# The content for the base conf.py generated.
REST_CONF_TEMPLATE = textwrap.dedent("""
master_doc = "index"
project = "Lyrics"
copyright = "none"
author = "Various Authors"
latex_documents = [
(master_doc, "Lyrics.tex", project, author, "manual"),
]
epub_exclude_files = ["search.html"]
epub_tocdepth = 1
epub_tocdup = False
""").strip()
directory: Path
@cached_property
def artists_dir(self) -> Path:
dir = self.directory / "artists"
dir.mkdir(parents=True, exist_ok=True)
return dir
def write_indexes(self) -> None:
"""Write conf.py and index.rst files necessary for Sphinx
We write minimal configurations that are necessary for Sphinx
to operate. We do not overwrite existing files so that
customizations are respected."""
index_file = self.directory / "index.rst"
if not index_file.exists():
index_file.write_text(self.REST_INDEX_TEMPLATE)
conf_file = self.directory / "conf.py"
if not conf_file.exists():
conf_file.write_text(self.REST_CONF_TEMPLATE)
def write_artist(self, artist: str, items: Iterable[Item]) -> None:
parts = [
f'{artist}\n{"=" * len(artist)}',
".. contents::\n :local:",
]
for album, items in groupby(items, key=lambda i: i.album):
parts.append(f'{album}\n{"-" * len(album)}')
parts.extend(
part
for i in items
if (title := f":index:`{i.title.strip()}`")
for part in (
f'{title}\n{"~" * len(title)}',
textwrap.indent(i.lyrics, "| "),
)
)
file = self.artists_dir / f"{slug(artist)}.rst"
file.write_text("\n\n".join(parts).strip())
def write(self, items: list[Item]) -> None:
self.directory.mkdir(exist_ok=True, parents=True)
self.write_indexes()
items.sort(key=lambda i: i.albumartist)
for artist, artist_items in groupby(items, key=lambda i: i.albumartist):
self.write_artist(artist.strip(), artist_items)
d = self.directory
text = f"""
ReST files generated. to build, use one of:
sphinx-build -b html {d} {d/"html"}
sphinx-build -b epub {d} {d/"epub"}
sphinx-build -b latex {d} {d/"latex"} && make -C {d/"latex"} all-pdf
"""
ui.print_(textwrap.dedent(text))
class LyricsPlugin(RequestHandler, plugins.BeetsPlugin):
BACKEND_BY_NAME = {
b.name: b for b in [LRCLib, Google, Genius, Tekstowo, MusiXmatch]
@ -776,15 +964,23 @@ class LyricsPlugin(RequestHandler, plugins.BeetsPlugin):
return [self.BACKEND_BY_NAME[c](self.config, self._log) for c in chosen]
@cached_property
def translator(self) -> Translator | None:
config = self.config["translate"]
if config["api_key"].get() and config["to_language"].get():
return Translator.from_config(self._log, **config.flatten())
return None
def __init__(self):
super().__init__()
self.import_stages = [self.imported]
self.config.add(
{
"auto": True,
"bing_client_secret": None,
"bing_lang_from": [],
"bing_lang_to": None,
"translate": {
"api_key": None,
"from_languages": [],
"to_language": None,
},
"dist_thresh": 0.11,
"google_API_key": None,
"google_engine_ID": "009217259823014548361:lndtuqkycfu",
@ -795,6 +991,7 @@ class LyricsPlugin(RequestHandler, plugins.BeetsPlugin):
"fallback": None,
"force": False,
"local": False,
"print": False,
"synced": False,
# Musixmatch is disabled by default as they are currently blocking
# requests with the beets user agent.
@ -803,52 +1000,27 @@ class LyricsPlugin(RequestHandler, plugins.BeetsPlugin):
],
}
)
self.config["bing_client_secret"].redact = True
self.config["translate"]["api_key"].redact = True
self.config["google_API_key"].redact = True
self.config["google_engine_ID"].redact = True
self.config["genius_api_key"].redact = True
# State information for the ReST writer.
# First, the current artist we're writing.
self.artist = "Unknown artist"
# The current album: False means no album yet.
self.album = False
# The current rest file content. None means the file is not
# open yet.
self.rest = None
self.config["bing_lang_from"] = [
x.lower() for x in self.config["bing_lang_from"].as_str_seq()
]
@cached_property
def bing_access_token(self) -> str | None:
params = {
"client_id": "beets",
"client_secret": self.config["bing_client_secret"],
"scope": "https://api.microsofttranslator.com",
"grant_type": "client_credentials",
}
oauth_url = "https://datamarket.accesscontrol.windows.net/v2/OAuth2-13"
with self.handle_request():
r = r_session.post(oauth_url, params=params)
return r.json()["access_token"]
if self.config["auto"]:
self.import_stages = [self.imported]
def commands(self):
cmd = ui.Subcommand("lyrics", help="fetch song lyrics")
cmd.parser.add_option(
"-p",
"--print",
dest="printlyr",
action="store_true",
default=False,
default=self.config["print"].get(),
help="print lyrics to console",
)
cmd.parser.add_option(
"-r",
"--write-rest",
dest="writerest",
dest="rest_directory",
action="store",
default=None,
metavar="dir",
@ -857,154 +1029,69 @@ class LyricsPlugin(RequestHandler, plugins.BeetsPlugin):
cmd.parser.add_option(
"-f",
"--force",
dest="force_refetch",
action="store_true",
default=False,
default=self.config["force"].get(),
help="always re-download lyrics",
)
cmd.parser.add_option(
"-l",
"--local",
dest="local_only",
action="store_true",
default=False,
default=self.config["local"].get(),
help="do not fetch missing lyrics",
)
def func(lib, opts, args):
def func(lib: Library, opts, args) -> None:
# The "write to files" option corresponds to the
# import_write config value.
write = ui.should_write()
if opts.writerest:
self.writerest_indexes(opts.writerest)
items = lib.items(ui.decargs(args))
self.config.set(vars(opts))
items = list(lib.items(args))
for item in items:
if not opts.local_only and not self.config["local"]:
self.fetch_item_lyrics(
item, write, opts.force_refetch or self.config["force"]
)
if item.lyrics:
if opts.printlyr:
ui.print_(item.lyrics)
if opts.writerest:
self.appendrest(opts.writerest, item)
if opts.writerest and items:
# flush last artist & write to ReST
self.writerest(opts.writerest)
ui.print_("ReST files generated. to build, use one of:")
ui.print_(
" sphinx-build -b html %s _build/html" % opts.writerest
)
ui.print_(
" sphinx-build -b epub %s _build/epub" % opts.writerest
)
ui.print_(
(
" sphinx-build -b latex %s _build/latex "
"&& make -C _build/latex all-pdf"
)
% opts.writerest
)
self.add_item_lyrics(item, ui.should_write())
if item.lyrics and opts.print:
ui.print_(item.lyrics)
if opts.rest_directory and (
items := [i for i in items if i.lyrics]
):
RestFiles(Path(opts.rest_directory)).write(items)
cmd.func = func
return [cmd]
def appendrest(self, directory, item):
"""Append the item to an ReST file
This will keep state (in the `rest` variable) in order to avoid
writing continuously to the same files.
"""
if slug(self.artist) != slug(item.albumartist):
# Write current file and start a new one ~ item.albumartist
self.writerest(directory)
self.artist = item.albumartist.strip()
self.rest = "%s\n%s\n\n.. contents::\n :local:\n\n" % (
self.artist,
"=" * len(self.artist),
)
if self.album != item.album:
tmpalbum = self.album = item.album.strip()
if self.album == "":
tmpalbum = "Unknown album"
self.rest += "{}\n{}\n\n".format(tmpalbum, "-" * len(tmpalbum))
title_str = ":index:`%s`" % item.title.strip()
block = "| " + item.lyrics.replace("\n", "\n| ")
self.rest += "{}\n{}\n\n{}\n\n".format(
title_str, "~" * len(title_str), block
)
def writerest(self, directory):
"""Write self.rest to a ReST file"""
if self.rest is not None and self.artist is not None:
path = os.path.join(
directory, "artists", slug(self.artist) + ".rst"
)
with open(path, "wb") as output:
output.write(self.rest.encode("utf-8"))
def writerest_indexes(self, directory):
"""Write conf.py and index.rst files necessary for Sphinx
We write minimal configurations that are necessary for Sphinx
to operate. We do not overwrite existing files so that
customizations are respected."""
try:
os.makedirs(os.path.join(directory, "artists"))
except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
raise
indexfile = os.path.join(directory, "index.rst")
if not os.path.exists(indexfile):
with open(indexfile, "w") as output:
output.write(REST_INDEX_TEMPLATE)
conffile = os.path.join(directory, "conf.py")
if not os.path.exists(conffile):
with open(conffile, "w") as output:
output.write(REST_CONF_TEMPLATE)
def imported(self, _, task: ImportTask) -> None:
"""Import hook for fetching lyrics automatically."""
if self.config["auto"]:
for item in task.imported_items():
self.fetch_item_lyrics(item, False, self.config["force"])
for item in task.imported_items():
self.add_item_lyrics(item, False)
def fetch_item_lyrics(self, item: Item, write: bool, force: bool) -> None:
def find_lyrics(self, item: Item) -> str:
album, length = item.album, round(item.length)
matches = (
[
lyrics
for t in titles
if (lyrics := self.get_lyrics(a, t, album, length))
]
for a, titles in search_pairs(item)
)
return "\n\n---\n\n".join(next(filter(None, matches), []))
def add_item_lyrics(self, item: Item, write: bool) -> None:
"""Fetch and store lyrics for a single item. If ``write``, then the
lyrics will also be written to the file itself.
"""
# Skip if the item already has lyrics.
if not force and item.lyrics:
if self.config["local"]:
return
if not self.config["force"] and item.lyrics:
self.info("🔵 Lyrics already present: {}", item)
return
lyrics_matches = []
album, length = item.album, round(item.length)
for artist, titles in search_pairs(item):
lyrics_matches = [
self.get_lyrics(artist, title, album, length)
for title in titles
]
if any(lyrics_matches):
break
lyrics = "\n\n---\n\n".join(filter(None, lyrics_matches))
if lyrics:
if lyrics := self.find_lyrics(item):
self.info("🟢 Found lyrics: {0}", item)
if self.config["bing_client_secret"].get():
lang_from = langdetect.detect(lyrics)
if self.config["bing_lang_to"].get() != lang_from and (
not self.config["bing_lang_from"]
or (lang_from in self.config["bing_lang_from"].as_str_seq())
):
lyrics = self.append_translation(
lyrics, self.config["bing_lang_to"]
)
if translator := self.translator:
lyrics = translator.translate(lyrics, item.lyrics)
else:
self.info("🔴 Lyrics not found: {}", item)
lyrics = self.config["fallback"].get()
@ -1027,30 +1114,3 @@ class LyricsPlugin(RequestHandler, plugins.BeetsPlugin):
return f"{lyrics}\n\nSource: {url}"
return None
def append_translation(self, text, to_lang):
from xml.etree import ElementTree
if not (token := self.bing_access_token):
self.warn(
"Could not get Bing Translate API access token. "
"Check your 'bing_client_secret' password."
)
return text
# Extract unique lines to limit API request size per song
lines = text.split("\n")
unique_lines = set(lines)
url = "https://api.microsofttranslator.com/v2/Http.svc/Translate"
with self.handle_request():
text = self.fetch_text(
url,
headers={"Authorization": f"Bearer {token}"},
params={"text": "|".join(unique_lines), "to": to_lang},
)
if translated := ElementTree.fromstring(text.encode("utf-8")).text:
# Use a translation mapping dict to build resulting lyrics
translations = dict(zip(unique_lines, translated.split("|")))
return "".join(f"{ln} / {translations[ln]}\n" for ln in lines)
return text

View file

@ -14,8 +14,8 @@
"""Generates smart playlists based on beets queries."""
import json
import os
from urllib.parse import quote
from urllib.request import pathname2url
from beets import ui
@ -327,7 +327,8 @@ class SmartPlaylistPlugin(BeetsPlugin):
if extm3u:
attr = [(k, entry.item[k]) for k in keys]
al = [
f" {a[0]}={json.dumps(str(a[1]))}" for a in attr
f" {key}=\"{quote(str(value), safe='/:')}\""
for key, value in attr
]
attrs = "".join(al)
comment = "#EXTINF:{}{},{} - {}\n".format(

View file

@ -19,9 +19,16 @@ New features:
control the maximum allowed distance between the lyrics search result and the
tagged item's artist and title. This is useful for preventing false positives
when fetching lyrics.
* :doc:`plugins/lyrics`: Rewrite lyrics translation functionality to use Azure
AI Translator API and add relevant instructions to the documentation.
Bug fixes:
* :doc:`plugins/listenbrainz`: Fix rST formatting for URLs of Listenbrainz API Key documentation and config.yaml.
* :doc:`plugins/listenbrainz`: Fix ``UnboundLocalError`` in cases where 'mbid' is not defined.
* :doc:`plugins/fetchart`: Fix fetchart bug where a tempfile could not be deleted due to never being
properly closed.
:bug:`5521`
* :doc:`plugins/lyrics`: LRCLib will fallback to plain lyrics if synced lyrics
are not found and `synced` flag is set to `yes`.
* Synchronise files included in the source distribution with what we used to
@ -72,6 +79,7 @@ Bug fixes:
:bug:`5583`
* Handle potential OSError when unlinking temporary files in ArtResizer.
:bug:`5615`
* ImageMagick 7.1.1-44 is now supported.
For packagers:
@ -85,6 +93,10 @@ Other changes:
wrong (outdated) commit. Now the tag is created in the same workflow step
right after committing the version update.
:bug:`5539`
* Added some typehints: ImportSession and Pipeline have typehints now. Should
improve useability for new developers.
* :doc:`/plugins/smartplaylist`: URL-encode additional item `fields` within generated
EXTM3U playlists instead of JSON-encoding them.
2.2.0 (December 02, 2024)
-------------------------
@ -512,6 +524,8 @@ Bug fixes:
`requests` timeout.
* Fix cover art resizing logic to support multiple steps of resizing
:bug:`5151`
* :doc:`/plugins/convert`: Fix attempt to convert and perform side-effects if
library file is not readable.
For plugin developers:

View file

@ -174,9 +174,8 @@ pages.
…report a bug in beets?
-----------------------
We use the `issue tracker <https://github.com/beetbox/beets/issues>`__
on GitHub. `Enter a new issue <https://github.com/beetbox/beets/issues/new>`__
there to report a bug. Please follow these guidelines when reporting an issue:
We use the `issue tracker`_ on GitHub where you can `open a new ticket`_.
Please follow these guidelines when reporting an issue:
- Most importantly: if beets is crashing, please `include the
traceback <https://imgur.com/jacoj>`__. Tracebacks can be more
@ -206,6 +205,7 @@ If you've never reported a bug before, Mozilla has some well-written
`general guidelines for good bug
reports`_.
.. _issue tracker: https://github.com/beetbox/beets/issues
.. _general guidelines for good bug reports: https://developer.mozilla.org/en-US/docs/Mozilla/QA/Bug_writing_guidelines
@ -343,11 +343,11 @@ read the file. You can also use specialized programs for checking file
integrity---for example, type ``metaflac --list music.flac`` to check
FLAC files.
If beets still complains about a file that seems to be valid, `file a
bug <https://github.com/beetbox/beets/issues/new>`__ and we'll look into
it. There's always a possibility that there's a bug "upstream" in the
`Mutagen <https://github.com/quodlibet/mutagen>`__ library used by beets,
in which case we'll forward the bug to that project's tracker.
If beets still complains about a file that seems to be valid, `open a new
ticket`_ and we'll look into it. There's always a possibility that there's
a bug "upstream" in the `Mutagen <https://github.com/quodlibet/mutagen>`__
library used by beets, in which case we'll forward the bug to that project's
tracker.
.. _importhang:
@ -398,3 +398,5 @@ try `this Super User answer`_.
.. _this Super User answer: https://superuser.com/a/284361/4569
.. _pip: https://pip.pypa.io/en/stable/
.. _open a new ticket:
https://github.com/beetbox/beets/issues/new?template=bug-report.md

View file

@ -76,7 +76,8 @@ all of these limitations.
Musepack, Windows Media, Opus, and AIFF files are supported. (Do you use
some other format? Please `file a feature request`_!)
.. _file a feature request: https://github.com/beetbox/beets/issues/new
.. _file a feature request:
https://github.com/beetbox/beets/issues/new?template=feature-request.md
Now that that's out of the way, let's tag some music.

View file

@ -478,6 +478,9 @@ Here are a few of the plugins written by the beets community:
`beets-ibroadcast`_
Uploads tracks to the `iBroadcast`_ cloud service.
`beets-id3extract`_
Maps arbitrary ID3 tags to beets custom fields.
`beets-importreplace`_
Lets you perform regex replacements on incoming
metadata.
@ -560,6 +563,7 @@ Here are a few of the plugins written by the beets community:
.. _beets-follow: https://github.com/nolsto/beets-follow
.. _beets-ibroadcast: https://github.com/ctrueden/beets-ibroadcast
.. _iBroadcast: https://ibroadcast.com/
.. _beets-id3extract: https://github.com/bcotton/beets-id3extract
.. _beets-importreplace: https://github.com/edgars-supe/beets-importreplace
.. _beets-setlister: https://github.com/tomjaspers/beets-setlister
.. _beets-noimport: https://gitlab.com/tiago.dias/beets-noimport

View file

@ -8,14 +8,14 @@ The ListenBrainz plugin for beets allows you to interact with the ListenBrainz s
Installation
------------
To enable the ListenBrainz plugin, add the following to your beets configuration file (`config.yaml`):
To enable the ListenBrainz plugin, add the following to your beets configuration file (`config.yaml`_):
.. code-block:: yaml
plugins:
- listenbrainz
You can then configure the plugin by providing your Listenbrainz token (see intructions `here`_`)and username::
You can then configure the plugin by providing your Listenbrainz token (see intructions `here`_) and username::
listenbrainz:
token: TOKEN
@ -28,4 +28,5 @@ Usage
Once the plugin is enabled, you can import the listening history using the `lbimport` command in beets.
.. _here: https://listenbrainz.readthedocs.io/en/latest/users/api/index.html#get-the-user-token
.. _here: https://listenbrainz.readthedocs.io/en/latest/users/api/index.html#get-the-user-token
.. _config.yaml: ../reference/config.rst

View file

@ -38,26 +38,30 @@ Default configuration:
lyrics:
auto: yes
bing_client_secret: null
bing_lang_from: []
bing_lang_to: null
translate:
api_key:
from_languages: []
to_language:
dist_thresh: 0.11
fallback: null
force: no
google_API_key: null
google_engine_ID: 009217259823014548361:lndtuqkycfu
print: no
sources: [lrclib, google, genius, tekstowo]
synced: no
The available options are:
- **auto**: Fetch lyrics automatically during import.
- **bing_client_secret**: Your Bing Translation application password
(see :ref:`lyrics-translation`)
- **bing_lang_from**: By default all lyrics with a language other than
``bing_lang_to`` are translated. Use a list of lang codes to restrict the set
of source languages to translate.
- **bing_lang_to**: Language to translate lyrics into.
- **translate**:
- **api_key**: Api key to access your Azure Translator resource. (see
:ref:`lyrics-translation`)
- **from_languages**: By default all lyrics with a language other than
``translate_to`` are translated. Use a list of language codes to restrict
them.
- **to_language**: Language code to translate lyrics to.
- **dist_thresh**: The maximum distance between the artist and title
combination of the music file and lyrics candidate to consider them a match.
Lower values will make the plugin more strict, higher values will make it
@ -72,6 +76,7 @@ The available options are:
- **google_engine_ID**: The custom search engine to use.
Default: The `beets custom search engine`_, which gathers an updated list of
sources known to be scrapeable.
- **print**: Print lyrics to the console.
- **sources**: List of sources to search for lyrics. An asterisk ``*`` expands
to all available sources. The ``google`` source will be automatically
deactivated if no ``google_API_key`` is setup.
@ -104,9 +109,8 @@ Rendering Lyrics into Other Formats
-----------------------------------
The ``-r directory, --write-rest directory`` option renders all lyrics as
`reStructuredText`_ (ReST) documents in ``directory`` (by default, the current
directory). That directory, in turn, can be parsed by tools like `Sphinx`_ to
generate HTML, ePUB, or PDF documents.
`reStructuredText`_ (ReST) documents in ``directory``. That directory, in turn,
can be parsed by tools like `Sphinx`_ to generate HTML, ePUB, or PDF documents.
Minimal ``conf.py`` and ``index.rst`` files are created the first time the
command is run. They are not overwritten on subsequent runs, so you can safely
@ -119,19 +123,19 @@ Sphinx supports various `builders`_, see a few suggestions:
::
sphinx-build -b html . _build/html
sphinx-build -b html <dir> <dir>/html
.. admonition:: Build an ePUB3 formatted file, usable on ebook readers
::
sphinx-build -b epub3 . _build/epub
sphinx-build -b epub3 <dir> <dir>/epub
.. admonition:: Build a PDF file, which incidentally also builds a LaTeX file
::
sphinx-build -b latex %s _build/latex && make -C _build/latex all-pdf
sphinx-build -b latex <dir> <dir>/latex && make -C <dir>/latex all-pdf
.. _Sphinx: https://www.sphinx-doc.org/
@ -165,10 +169,28 @@ After that, the lyrics plugin will fall back on other declared data sources.
Activate On-the-Fly Translation
-------------------------------
You need to register for a Microsoft Azure Marketplace free account and
to the `Microsoft Translator API`_. Follow the four steps process, specifically
at step 3 enter ``beets`` as *Client ID* and copy/paste the generated
*Client secret* into your ``bing_client_secret`` configuration, alongside
``bing_lang_to`` target ``language code``.
We use Azure to optionally translate your lyrics. To set up the integration,
follow these steps:
.. _Microsoft Translator API: https://docs.microsoft.com/en-us/azure/cognitive-services/translator/translator-how-to-signup
1. `Create a Translator resource`_ on Azure.
2. `Obtain its API key`_.
3. Add the API key to your configuration as ``translate.api_key``.
4. Configure your target language using the ``translate.to_language`` option.
For example, with the following configuration
.. code-block:: yaml
lyrics:
translate:
api_key: YOUR_TRANSLATOR_API_KEY
to_language: de
You should expect lyrics like this::
Original verse / Ursprünglicher Vers
Some other verse / Ein anderer Vers
.. _create a Translator resource: https://learn.microsoft.com/en-us/azure/ai-services/translator/create-translator-resource
.. _obtain its API key: https://learn.microsoft.com/en-us/python/api/overview/azure/ai-translation-text-readme?view=azure-python&preserve-view=true#get-an-api-key

View file

@ -97,27 +97,25 @@ In case you want to export additional fields from the beets database into the
generated playlists, you can do so by specifying them within the ``fields``
configuration option and setting the ``output`` option to ``extm3u``.
For instance the following configuration exports the ``id`` and ``genre``
fields:
fields::
smartplaylist:
playlist_dir: /data/playlists
relative_to: /data/playlists
output: extm3u
fields:
- id
- genre
playlists:
- name: all.m3u
query: ''
A resulting ``all.m3u`` file could look as follows:
Values of additional fields are URL-encoded.
A resulting ``all.m3u`` file could look as follows::
#EXTM3U
#EXTINF:805 id="1931" genre="Jazz",Miles Davis - Autumn Leaves
../music/Albums/Miles Davis/Autumn Leaves/02 Autumn Leaves.mp3
#EXTINF:805 id="1931" genre="Progressive%20Rock",Led Zeppelin - Stairway to Heaven
../music/singles/Led Zeppelin/Stairway to Heaven.mp3
To give a usage example, the `webm3u`_ and `Beetstream`_ plugins read the
exported ``id`` field, allowing you to serve your local m3u playlists via HTTP.

View file

@ -15,7 +15,9 @@ markers =
data_file = .reports/coverage/data
branch = true
relative_files = true
omit = beets/test/*
omit =
beets/test/*
beetsplug/_typing.py
[coverage:report]
precision = 2

View file

@ -456,16 +456,6 @@ lyrics_pages = [
LyricsPage.make(
"https://www.musica.com/letras.asp?letra=59862",
"""
Lady Madonna, children at your feet
Wonder how you manage to make ends meet
Who finds the money when you pay the rent?
Did you think that money was heaven sent?
Friday night arrives without a suitcase
Sunday morning creeping like a nun
Monday's child has learned to tie his bootlace
See how they run
Lady Madonna, baby at your breast
Wonders how you manage to feed the rest

View file

@ -285,8 +285,8 @@ class ArtSimilarityTest(unittest.TestCase):
mock_extract,
mock_subprocess,
compare_status=0,
compare_stdout="",
compare_stderr="",
compare_stdout=b"",
compare_stderr=b"",
convert_status=0,
):
mock_extract.return_value = b"extracted_path"
@ -298,33 +298,33 @@ class ArtSimilarityTest(unittest.TestCase):
]
def test_compare_success_similar(self, mock_extract, mock_subprocess):
self._mock_popens(mock_extract, mock_subprocess, 0, "10", "err")
self._mock_popens(mock_extract, mock_subprocess, 0, b"10", b"err")
assert self._similarity(20)
def test_compare_success_different(self, mock_extract, mock_subprocess):
self._mock_popens(mock_extract, mock_subprocess, 0, "10", "err")
self._mock_popens(mock_extract, mock_subprocess, 0, b"10", b"err")
assert not self._similarity(5)
def test_compare_status1_similar(self, mock_extract, mock_subprocess):
self._mock_popens(mock_extract, mock_subprocess, 1, "out", "10")
self._mock_popens(mock_extract, mock_subprocess, 1, b"out", b"10")
assert self._similarity(20)
def test_compare_status1_different(self, mock_extract, mock_subprocess):
self._mock_popens(mock_extract, mock_subprocess, 1, "out", "10")
self._mock_popens(mock_extract, mock_subprocess, 1, b"out", b"10")
assert not self._similarity(5)
def test_compare_failed(self, mock_extract, mock_subprocess):
self._mock_popens(mock_extract, mock_subprocess, 2, "out", "10")
self._mock_popens(mock_extract, mock_subprocess, 2, b"out", b"10")
assert self._similarity(20) is None
def test_compare_parsing_error(self, mock_extract, mock_subprocess):
self._mock_popens(mock_extract, mock_subprocess, 0, "foo", "bar")
self._mock_popens(mock_extract, mock_subprocess, 0, b"foo", b"bar")
assert self._similarity(20) is None
def test_compare_parsing_error_and_failure(
self, mock_extract, mock_subprocess
):
self._mock_popens(mock_extract, mock_subprocess, 1, "foo", "bar")
self._mock_popens(mock_extract, mock_subprocess, 1, b"foo", b"bar")
assert self._similarity(20) is None
def test_convert_failure(self, mock_extract, mock_subprocess):

View file

@ -57,7 +57,7 @@ class ImportAddedTest(PluginMixin, ImportTestCase):
os.path.getmtime(mfile.path) for mfile in self.import_media
)
self.matcher = AutotagStub().install()
self.matcher.macthin = AutotagStub.GOOD
self.matcher.matching = AutotagStub.IDENT
self.importer = self.setup_importer()
self.importer.add_choice(importer.action.APPLY)

View file

@ -14,18 +14,27 @@
"""Tests for the 'lyrics' plugin."""
import importlib.util
import os
import re
import textwrap
from functools import partial
from http import HTTPStatus
from pathlib import Path
import pytest
from beets.library import Item
from beets.test.helper import PluginMixin
from beets.test.helper import PluginMixin, TestHelper
from beetsplug import lyrics
from .lyrics_pages import LyricsPage, lyrics_pages
github_ci = os.environ.get("GITHUB_ACTIONS") == "true"
if not github_ci and not importlib.util.find_spec("langdetect"):
pytest.skip("langdetect isn't available", allow_module_level=True)
PHRASE_BY_TITLE = {
"Lady Madonna": "friday night arrives without a suitcase",
"Jazz'n'blues": "as i check my balance i kiss the screen",
@ -33,6 +42,14 @@ PHRASE_BY_TITLE = {
}
@pytest.fixture(scope="module")
def helper():
helper = TestHelper()
helper.setup_beets()
yield helper
helper.teardown_beets()
class TestLyricsUtils:
@pytest.mark.parametrize(
"artist, title",
@ -231,6 +248,27 @@ class TestLyricsPlugin(LyricsPluginMixin):
assert last_log
assert re.search(expected_log_match, last_log, re.I)
@pytest.mark.parametrize(
"plugin_config, found, expected",
[
({}, "new", "old"),
({"force": True}, "new", "new"),
({"force": True, "local": True}, "new", "old"),
({"force": True, "fallback": None}, "", "old"),
({"force": True, "fallback": ""}, "", ""),
({"force": True, "fallback": "default"}, "", "default"),
],
)
def test_overwrite_config(
self, monkeypatch, helper, lyrics_plugin, found, expected
):
monkeypatch.setattr(lyrics_plugin, "find_lyrics", lambda _: found)
item = helper.create_item(id=1, lyrics="old")
lyrics_plugin.add_item_lyrics(item, False)
assert item.lyrics == expected
class LyricsBackendTest(LyricsPluginMixin):
@pytest.fixture
@ -280,8 +318,13 @@ class TestLyricsSources(LyricsBackendTest):
def test_backend_source(self, lyrics_plugin, lyrics_page: LyricsPage):
"""Test parsed lyrics from each of the configured lyrics pages."""
lyrics_info = lyrics_plugin.get_lyrics(
lyrics_page.artist, lyrics_page.track_title, "", 186
lyrics_info = lyrics_plugin.find_lyrics(
Item(
artist=lyrics_page.artist,
title=lyrics_page.track_title,
album="",
length=186.0,
)
)
assert lyrics_info
@ -502,3 +545,144 @@ class TestLRCLibLyrics(LyricsBackendTest):
lyrics, _ = fetch_lyrics()
assert lyrics == expected_lyrics
class TestTranslation:
@pytest.fixture(autouse=True)
def _patch_bing(self, requests_mock):
def callback(request, _):
if b"Refrain" in request.body:
translations = (
""
" | [Refrain : Doja Cat]"
" | Difficile pour moi de te laisser partir (Te laisser partir, te laisser partir)" # noqa: E501
" | Mon corps ne me laissait pas le cacher (Cachez-le)"
" | Quoi quil arrive, je ne plierais pas (Ne plierait pas, ne plierais pas)" # noqa: E501
" | Chevauchant à travers le tonnerre, la foudre"
)
elif b"00:00.00" in request.body:
translations = (
""
" | [00:00.00] Quelques paroles synchronisées"
" | [00:01.00] Quelques paroles plus synchronisées"
)
else:
translations = (
""
" | Quelques paroles synchronisées"
" | Quelques paroles plus synchronisées"
)
return [
{
"detectedLanguage": {"language": "en", "score": 1.0},
"translations": [{"text": translations, "to": "fr"}],
}
]
requests_mock.post(lyrics.Translator.TRANSLATE_URL, json=callback)
@pytest.mark.parametrize(
"new_lyrics, old_lyrics, expected",
[
pytest.param(
"""
[Refrain: Doja Cat]
Hard for me to let you go (Let you go, let you go)
My body wouldn't let me hide it (Hide it)
No matter what, I wouldn't fold (Wouldn't fold, wouldn't fold)
Ridin' through the thunder, lightnin'""",
"",
"""
[Refrain: Doja Cat] / [Refrain : Doja Cat]
Hard for me to let you go (Let you go, let you go) / Difficile pour moi de te laisser partir (Te laisser partir, te laisser partir)
My body wouldn't let me hide it (Hide it) / Mon corps ne me laissait pas le cacher (Cachez-le)
No matter what, I wouldn't fold (Wouldn't fold, wouldn't fold) / Quoi quil arrive, je ne plierais pas (Ne plierait pas, ne plierais pas)
Ridin' through the thunder, lightnin' / Chevauchant à travers le tonnerre, la foudre""", # noqa: E501
id="plain",
),
pytest.param(
"""
[00:00.00] Some synced lyrics
[00:00:50]
[00:01.00] Some more synced lyrics
Source: https://lrclib.net/api/123""",
"",
"""
[00:00.00] Some synced lyrics / Quelques paroles synchronisées
[00:00:50]
[00:01.00] Some more synced lyrics / Quelques paroles plus synchronisées
Source: https://lrclib.net/api/123""", # noqa: E501
id="synced",
),
pytest.param(
"Quelques paroles",
"",
"Quelques paroles",
id="already in the target language",
),
pytest.param(
"Some lyrics",
"Some lyrics / Some translation",
"Some lyrics / Some translation",
id="already translated",
),
],
)
def test_translate(self, new_lyrics, old_lyrics, expected):
plugin = lyrics.LyricsPlugin()
bing = lyrics.Translator(plugin._log, "123", "FR", ["EN"])
assert bing.translate(
textwrap.dedent(new_lyrics), old_lyrics
) == textwrap.dedent(expected)
class TestRestFiles:
@pytest.fixture
def rest_dir(self, tmp_path):
return tmp_path
@pytest.fixture
def rest_files(self, rest_dir):
return lyrics.RestFiles(rest_dir)
def test_write(self, rest_dir: Path, rest_files):
items = [
Item(albumartist=aa, album=a, title=t, lyrics=lyr)
for aa, a, t, lyr in [
("Artist One", "Album One", "Song One", "Lyrics One"),
("Artist One", "Album One", "Song Two", "Lyrics Two"),
("Artist Two", "Album Two", "Song Three", "Lyrics Three"),
]
]
rest_files.write(items)
assert (rest_dir / "index.rst").exists()
assert (rest_dir / "conf.py").exists()
artist_one_file = rest_dir / "artists" / "artist-one.rst"
artist_two_file = rest_dir / "artists" / "artist-two.rst"
assert artist_one_file.exists()
assert artist_two_file.exists()
c = artist_one_file.read_text()
assert (
c.index("Artist One")
< c.index("Album One")
< c.index("Song One")
< c.index("Lyrics One")
< c.index("Song Two")
< c.index("Lyrics Two")
)
c = artist_two_file.read_text()
assert (
c.index("Artist Two")
< c.index("Album Two")
< c.index("Song Three")
< c.index("Lyrics Three")
)

View file

@ -283,7 +283,7 @@ class SmartPlaylistTest(BeetsTestCase):
assert (
content
== b"#EXTM3U\n"
+ b'#EXTINF:300 id="456" genre="Fake Genre",Fake Artist - fake Title\n'
+ b'#EXTINF:300 id="456" genre="Fake%20Genre",Fake Artist - fake Title\n'
+ b"/tagada.mp3\n"
)

View file

@ -440,7 +440,7 @@ class ImportTest(ImportTestCase):
self.prepare_album_for_import(1)
self.setup_importer()
self.matcher = AutotagStub().install()
self.matcher.macthin = AutotagStub.GOOD
self.matcher.matching = AutotagStub.IDENT
def tearDown(self):
super().tearDown()