diff --git a/beets/autotag/mb.py b/beets/autotag/mb.py index 90c2013d8..6c2b604cd 100644 --- a/beets/autotag/mb.py +++ b/beets/autotag/mb.py @@ -74,22 +74,38 @@ class MusicBrainzAPIError(util.HumanReadableError): log = logging.getLogger("beets") -RELEASE_INCLUDES = [ - "artists", - "media", - "recordings", - "release-groups", - "labels", - "artist-credits", - "aliases", - "recording-level-rels", - "work-rels", - "work-level-rels", - "artist-rels", - "isrcs", - "url-rels", - "release-rels", -] +RELEASE_INCLUDES = list( + { + "artists", + "media", + "recordings", + "release-groups", + "labels", + "artist-credits", + "aliases", + "recording-level-rels", + "work-rels", + "work-level-rels", + "artist-rels", + "isrcs", + "url-rels", + "release-rels", + "tags", + } + & set(musicbrainzngs.VALID_INCLUDES["release"]) +) + +TRACK_INCLUDES = list( + { + "artists", + "aliases", + "isrcs", + "work-level-rels", + "artist-rels", + } + & set(musicbrainzngs.VALID_INCLUDES["recording"]) +) + BROWSE_INCLUDES = [ "artist-credits", "work-rels", @@ -101,11 +117,6 @@ if "work-level-rels" in musicbrainzngs.VALID_BROWSE_INCLUDES["recording"]: BROWSE_INCLUDES.append("work-level-rels") BROWSE_CHUNKSIZE = 100 BROWSE_MAXTRACKS = 500 -TRACK_INCLUDES = ["artists", "aliases", "isrcs"] -if "work-level-rels" in musicbrainzngs.VALID_INCLUDES["recording"]: - TRACK_INCLUDES += ["work-level-rels", "artist-rels"] -if "genres" in musicbrainzngs.VALID_INCLUDES["recording"]: - RELEASE_INCLUDES += ["genres"] def track_url(trackid: str) -> str: @@ -607,8 +618,8 @@ def album_info(release: dict) -> beets.autotag.hooks.AlbumInfo: if config["musicbrainz"]["genres"]: sources = [ - release["release-group"].get("genre-list", []), - release.get("genre-list", []), + release["release-group"].get("tag-list", []), + release.get("tag-list", []), ] genres: Counter[str] = Counter() for source in sources: diff --git a/beets/importer.py b/beets/importer.py index b30e6399b..2bdb16669 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -12,11 +12,12 @@ # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. - """Provides the basic, interface-agnostic workflow for importing and autotagging music files. """ +from __future__ import annotations + import itertools import os import pickle @@ -25,9 +26,10 @@ import shutil import time from bisect import bisect_left, insort from collections import defaultdict -from contextlib import contextmanager +from dataclasses import dataclass from enum import Enum from tempfile import mkdtemp +from typing import Callable, Iterable, Sequence import mediafile @@ -49,8 +51,7 @@ action = Enum("action", ["SKIP", "ASIS", "TRACKS", "APPLY", "ALBUMS", "RETAG"]) QUEUE_SIZE = 128 SINGLE_ARTIST_THRESH = 0.25 -PROGRESS_KEY = "tagprogress" -HISTORY_KEY = "taghistory" + # Usually flexible attributes are preserved (i.e., not updated) during # reimports. The following two lists (globally) change this behaviour for # certain fields. To alter these lists only when a specific plugin is in use, @@ -73,6 +74,10 @@ REIMPORT_FRESH_FIELDS_ITEM = list(REIMPORT_FRESH_FIELDS_ALBUM) # Global logger. log = logging.getLogger("beets") +# Here for now to allow for a easy replace later on +# once we can move to a PathLike +PathBytes = bytes + class ImportAbortError(Exception): """Raised when the user aborts the tagging operation.""" @@ -80,117 +85,115 @@ class ImportAbortError(Exception): pass -# Utilities. +@dataclass +class ImportState: + """Representing the progress of an import task. + Opens the state file on creation of the class. If you want + to ensure the state is written to disk, you should use the + context manager protocol. -def _open_state(): - """Reads the state file, returning a dictionary.""" - try: - with open(config["statefile"].as_filename(), "rb") as f: - return pickle.load(f) - except Exception as exc: - # The `pickle` module can emit all sorts of exceptions during - # unpickling, including ImportError. We use a catch-all - # exception to avoid enumerating them all (the docs don't even have a - # full list!). - log.debug("state file could not be read: {0}", exc) - return {} + Tagprogress allows long tagging tasks to be resumed when they pause. + Taghistory is a utility for manipulating the "incremental" import log. + This keeps track of all directories that were ever imported, which + allows the importer to only import new stuff. -def _save_state(state): - """Writes the state dictionary out to disk.""" - try: - with open(config["statefile"].as_filename(), "wb") as f: - pickle.dump(state, f) - except OSError as exc: - log.error("state file could not be written: {0}", exc) + Usage + ----- + ``` + # Readonly + progress = ImportState().tagprogress - -# Utilities for reading and writing the beets progress file, which -# allows long tagging tasks to be resumed when they pause (or crash). - - -def progress_read(): - state = _open_state() - return state.setdefault(PROGRESS_KEY, {}) - - -@contextmanager -def progress_write(): - state = _open_state() - progress = state.setdefault(PROGRESS_KEY, {}) - yield progress - _save_state(state) - - -def progress_add(toppath, *paths): - """Record that the files under all of the `paths` have been imported - under `toppath`. + # Read and write + with ImportState() as state: + state["key"] = "value" + ``` """ - with progress_write() as state: - imported = state.setdefault(toppath, []) - for path in paths: - # Normally `progress_add` will be called with the path - # argument increasing. This is because of the ordering in - # `albums_in_dir`. We take advantage of that to make the - # code faster - if imported and imported[len(imported) - 1] <= path: - imported.append(path) - else: - insort(imported, path) + tagprogress: dict[PathBytes, list[PathBytes]] + taghistory: set[tuple[PathBytes, ...]] + path: PathBytes -def progress_element(toppath, path): - """Return whether `path` has been imported in `toppath`.""" - state = progress_read() - if toppath not in state: - return False - imported = state[toppath] - i = bisect_left(imported, path) - return i != len(imported) and imported[i] == path + def __init__(self, readonly=False, path: PathBytes | None = None): + self.path = path or os.fsencode(config["statefile"].as_filename()) + self.tagprogress = {} + self.taghistory = set() + self._open() + def __enter__(self): + return self -def has_progress(toppath): - """Return `True` if there exist paths that have already been - imported under `toppath`. - """ - state = progress_read() - return toppath in state + def __exit__(self, exc_type, exc_val, exc_tb): + self._save() + def _open( + self, + ): + try: + with open(self.path, "rb") as f: + state = pickle.load(f) + # Read the states + self.tagprogress = state.get("tagprogress", {}) + self.taghistory = state.get("taghistory", set()) + except Exception as exc: + # The `pickle` module can emit all sorts of exceptions during + # unpickling, including ImportError. We use a catch-all + # exception to avoid enumerating them all (the docs don't even have a + # full list!). + log.debug("state file could not be read: {0}", exc) -def progress_reset(toppath): - with progress_write() as state: - if toppath in state: - del state[toppath] + def _save(self): + try: + with open(self.path, "wb") as f: + pickle.dump( + { + "tagprogress": self.tagprogress, + "taghistory": self.taghistory, + }, + f, + ) + except OSError as exc: + log.error("state file could not be written: {0}", exc) + # -------------------------------- Tagprogress ------------------------------- # -# Similarly, utilities for manipulating the "incremental" import log. -# This keeps track of all directories that were ever imported, which -# allows the importer to only import new stuff. + def progress_add(self, toppath: PathBytes, *paths: PathBytes): + """Record that the files under all of the `paths` have been imported + under `toppath`. + """ + with self as state: + imported = state.tagprogress.setdefault(toppath, []) + for path in paths: + if imported and imported[-1] <= path: + imported.append(path) + else: + insort(imported, path) + def progress_has_element(self, toppath: PathBytes, path: PathBytes) -> bool: + """Return whether `path` has been imported in `toppath`.""" + imported = self.tagprogress.get(toppath, []) + i = bisect_left(imported, path) + return i != len(imported) and imported[i] == path -def history_add(paths): - """Indicate that the import of the album in `paths` is completed and - should not be repeated in incremental imports. - """ - state = _open_state() - if HISTORY_KEY not in state: - state[HISTORY_KEY] = set() + def progress_has(self, toppath: PathBytes) -> bool: + """Return `True` if there exist paths that have already been + imported under `toppath`. + """ + return toppath in self.tagprogress - state[HISTORY_KEY].add(tuple(paths)) + def progress_reset(self, toppath: PathBytes | None): + """Reset the progress for `toppath`.""" + with self as state: + if toppath in state.tagprogress: + del state.tagprogress[toppath] - _save_state(state) + # -------------------------------- Taghistory -------------------------------- # - -def history_get(): - """Get the set of completed path tuples in incremental imports.""" - state = _open_state() - if HISTORY_KEY not in state: - return set() - return state[HISTORY_KEY] - - -# Abstract session class. + def history_add(self, paths: list[PathBytes]): + """Add the paths to the history.""" + with self as state: + state.taghistory.add(tuple(paths)) class ImportSession: @@ -198,24 +201,46 @@ class ImportSession: communicate with the user or otherwise make decisions. """ - def __init__(self, lib, loghandler, paths, query): - """Create a session. `lib` is a Library object. `loghandler` is a - logging.Handler. Either `paths` or `query` is non-null and indicates - the source of files to be imported. + logger: logging.Logger + paths: list[PathBytes] + lib: library.Library + + _is_resuming: dict[bytes, bool] + _merged_items: set[PathBytes] + _merged_dirs: set[PathBytes] + + def __init__( + self, + lib: library.Library, + loghandler: logging.Handler | None, + paths: Sequence[PathBytes] | None, + query: dbcore.Query | None, + ): + """Create a session. + + Parameters + ---------- + lib : library.Library + The library instance to which items will be imported. + loghandler : logging.Handler or None + A logging handler to use for the session's logger. If None, a + NullHandler will be used. + paths : os.PathLike or None + The paths to be imported. + query : dbcore.Query or None + A query to filter items for import. """ self.lib = lib self.logger = self._setup_logging(loghandler) - self.paths = paths self.query = query self._is_resuming = {} self._merged_items = set() self._merged_dirs = set() # Normalize the paths. - if self.paths: - self.paths = list(map(normpath, self.paths)) + self.paths = list(map(normpath, paths or [])) - def _setup_logging(self, loghandler): + def _setup_logging(self, loghandler: logging.Handler | None): logger = logging.getLogger(__name__) logger.propagate = False if not loghandler: @@ -275,13 +300,13 @@ class ImportSession: self.want_resume = config["resume"].as_choice([True, False, "ask"]) - def tag_log(self, status, paths): + def tag_log(self, status, paths: Sequence[PathBytes]): """Log a message about a given album to the importer log. The status should reflect the reason the album couldn't be tagged. """ self.logger.info("{0} {1}", status, displayable_path(paths)) - def log_choice(self, task, duplicate=False): + def log_choice(self, task: ImportTask, duplicate=False): """Logs the task's current choice if it should be logged. If ``duplicate``, then this is a secondary choice after a duplicate was detected and a decision was made. @@ -302,16 +327,16 @@ class ImportSession: elif task.choice_flag is action.SKIP: self.tag_log("skip", paths) - def should_resume(self, path): + def should_resume(self, path: PathBytes): raise NotImplementedError - def choose_match(self, task): + def choose_match(self, task: ImportTask): raise NotImplementedError - def resolve_duplicate(self, task, found_duplicates): + def resolve_duplicate(self, task: ImportTask, found_duplicates): raise NotImplementedError - def choose_item(self, task): + def choose_item(self, task: ImportTask): raise NotImplementedError def run(self): @@ -366,12 +391,12 @@ class ImportSession: # Incremental and resumed imports - def already_imported(self, toppath, paths): + def already_imported(self, toppath: PathBytes, paths: Sequence[PathBytes]): """Returns true if the files belonging to this task have already been imported in a previous session. """ if self.is_resuming(toppath) and all( - [progress_element(toppath, p) for p in paths] + [ImportState().progress_has_element(toppath, p) for p in paths] ): return True if self.config["incremental"] and tuple(paths) in self.history_dirs: @@ -379,13 +404,16 @@ class ImportSession: return False + _history_dirs = None + @property - def history_dirs(self): - if not hasattr(self, "_history_dirs"): - self._history_dirs = history_get() + def history_dirs(self) -> set[tuple[PathBytes, ...]]: + # FIXME: This could be simplified to a cached property + if self._history_dirs is None: + self._history_dirs = ImportState().taghistory return self._history_dirs - def already_merged(self, paths): + def already_merged(self, paths: Sequence[PathBytes]): """Returns true if all the paths being imported were part of a merge during previous tasks. """ @@ -394,7 +422,7 @@ class ImportSession: return False return True - def mark_merged(self, paths): + def mark_merged(self, paths: Sequence[PathBytes]): """Mark paths and directories as merged for future reimport tasks.""" self._merged_items.update(paths) dirs = { @@ -403,20 +431,20 @@ class ImportSession: } self._merged_dirs.update(dirs) - def is_resuming(self, toppath): + def is_resuming(self, toppath: PathBytes): """Return `True` if user wants to resume import of this path. You have to call `ask_resume` first to determine the return value. """ return self._is_resuming.get(toppath, False) - def ask_resume(self, toppath): + def ask_resume(self, toppath: PathBytes): """If import of `toppath` was aborted in an earlier session, ask user if they want to resume the import. Determines the return value of `is_resuming(toppath)`. """ - if self.want_resume and has_progress(toppath): + if self.want_resume and ImportState().progress_has(toppath): # Either accept immediately or prompt for input to decide. if self.want_resume is True or self.should_resume(toppath): log.warning( @@ -426,7 +454,7 @@ class ImportSession: self._is_resuming[toppath] = True else: # Clear progress; we're starting from the top. - progress_reset(toppath) + ImportState().progress_reset(toppath) # The importer task class. @@ -438,7 +466,16 @@ class BaseImportTask: Tasks flow through the importer pipeline. Each stage can update them.""" - def __init__(self, toppath, paths, items): + toppath: PathBytes | None + paths: list[PathBytes] + items: list[library.Item] + + def __init__( + self, + toppath: PathBytes | None, + paths: Iterable[PathBytes] | None, + items: Iterable[library.Item] | None, + ): """Create a task. The primary fields that define a task are: * `toppath`: The user-specified base directory that contains the @@ -456,8 +493,8 @@ class BaseImportTask: These fields should not change after initialization. """ self.toppath = toppath - self.paths = paths - self.items = items + self.paths = list(paths) if paths is not None else [] + self.items = list(items) if items is not None else [] class ImportTask(BaseImportTask): @@ -492,24 +529,39 @@ class ImportTask(BaseImportTask): system. """ - def __init__(self, toppath, paths, items): + choice_flag: action | None = None + match: autotag.AlbumMatch | autotag.TrackMatch | None = None + + # Keep track of the current task item + cur_album: str | None = None + cur_artist: str | None = None + candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = [] + + def __init__( + self, + toppath: PathBytes | None, + paths: Iterable[PathBytes] | None, + items: Iterable[library.Item] | None, + ): super().__init__(toppath, paths, items) - self.choice_flag = None - self.cur_album = None - self.cur_artist = None - self.candidates = [] self.rec = None self.should_remove_duplicates = False self.should_merge_duplicates = False self.is_album = True self.search_ids = [] # user-supplied candidate IDs. - def set_choice(self, choice): + def set_choice( + self, choice: action | autotag.AlbumMatch | autotag.TrackMatch + ): """Given an AlbumMatch or TrackMatch object or an action constant, indicates that an action has been selected for this task. + + Album and trackmatch are implemented as tuples, so we can't + use isinstance to check for them. """ # Not part of the task structure: assert choice != action.APPLY # Only used internally. + if choice in ( action.SKIP, action.ASIS, @@ -517,23 +569,23 @@ class ImportTask(BaseImportTask): action.ALBUMS, action.RETAG, ): - self.choice_flag = choice + # TODO: redesign to stricten the type + self.choice_flag = choice # type: ignore[assignment] self.match = None else: self.choice_flag = action.APPLY # Implicit choice. - self.match = choice + self.match = choice # type: ignore[assignment] def save_progress(self): """Updates the progress state to indicate that this album has finished. """ if self.toppath: - progress_add(self.toppath, *self.paths) + ImportState().progress_add(self.toppath, *self.paths) def save_history(self): """Save the directory in the history for incremental imports.""" - if self.paths: - history_add(self.paths) + ImportState().history_add(self.paths) # Logical decisions. @@ -556,7 +608,7 @@ class ImportTask(BaseImportTask): if self.choice_flag in (action.ASIS, action.RETAG): likelies, consensus = autotag.current_metadata(self.items) return likelies - elif self.choice_flag is action.APPLY: + elif self.choice_flag is action.APPLY and self.match: return self.match.info.copy() assert False @@ -568,7 +620,9 @@ class ImportTask(BaseImportTask): """ if self.choice_flag in (action.ASIS, action.RETAG): return list(self.items) - elif self.choice_flag == action.APPLY: + elif self.choice_flag == action.APPLY and isinstance( + self.match, autotag.AlbumMatch + ): return list(self.match.mapping.keys()) else: assert False @@ -581,13 +635,13 @@ class ImportTask(BaseImportTask): autotag.apply_metadata(self.match.info, self.match.mapping) - def duplicate_items(self, lib): + def duplicate_items(self, lib: library.Library): duplicate_items = [] for album in self.find_duplicates(lib): duplicate_items += album.items() return duplicate_items - def remove_duplicates(self, lib): + def remove_duplicates(self, lib: library.Library): duplicate_items = self.duplicate_items(lib) log.debug("removing {0} old duplicated items", len(duplicate_items)) for item in duplicate_items: @@ -599,7 +653,7 @@ class ImportTask(BaseImportTask): util.remove(item.path) util.prune_dirs(os.path.dirname(item.path), lib.directory) - def set_fields(self, lib): + def set_fields(self, lib: library.Library): """Sets the fields given at CLI or configuration to the specified values, for both the album and all its items. """ @@ -620,7 +674,7 @@ class ImportTask(BaseImportTask): item.store() self.album.store() - def finalize(self, session): + def finalize(self, session: ImportSession): """Save progress, clean up files, and emit plugin event.""" # Update progress. if session.want_resume: @@ -654,7 +708,7 @@ class ImportTask(BaseImportTask): for old_path in self.old_paths: # Only delete files that were actually copied. if old_path not in new_paths: - util.remove(syspath(old_path), False) + util.remove(old_path, False) self.prune(old_path) # When moving, prune empty directories containing the original files. @@ -662,10 +716,10 @@ class ImportTask(BaseImportTask): for old_path in self.old_paths: self.prune(old_path) - def _emit_imported(self, lib): + def _emit_imported(self, lib: library.Library): plugins.send("album_imported", lib=lib, album=self.album) - def handle_created(self, session): + def handle_created(self, session: ImportSession): """Send the `import_task_created` event for this task. Return a list of tasks that should continue through the pipeline. By default, this is a list containing only the task itself, but plugins can replace the task @@ -692,7 +746,7 @@ class ImportTask(BaseImportTask): self.candidates = prop.candidates self.rec = prop.recommendation - def find_duplicates(self, lib): + def find_duplicates(self, lib: library.Library): """Return a list of albums from `lib` with the same artist and album name as the task. """ @@ -706,7 +760,9 @@ class ImportTask(BaseImportTask): # Construct a query to find duplicates with this metadata. We # use a temporary Album object to generate any computed fields. tmp_album = library.Album(lib, **info) - keys = config["import"]["duplicate_keys"]["album"].as_str_seq() + keys: list[str] = config["import"]["duplicate_keys"][ + "album" + ].as_str_seq() dup_query = tmp_album.duplicates_query(keys) # Don't count albums with the same files as duplicates. @@ -764,19 +820,25 @@ class ImportTask(BaseImportTask): for item in self.items: item.update(changes) - def manipulate_files(self, operation=None, write=False, session=None): + def manipulate_files( + self, + session: ImportSession, + operation: MoveOperation | None = None, + write=False, + ): """Copy, move, link, hardlink or reflink (depending on `operation`) the files as well as write metadata. `operation` should be an instance of `util.MoveOperation`. If `write` is `True` metadata is written to the files. + # TODO: Introduce a MoveOperation.NONE or SKIP """ items = self.imported_items() # Save the original paths of all items for deletion and pruning # in the next step (finalization). - self.old_paths = [item.path for item in items] + self.old_paths: list[PathBytes] = [item.path for item in items] for item in items: if operation is not None: # In copy and link modes, treat re-imports specially: @@ -806,7 +868,7 @@ class ImportTask(BaseImportTask): plugins.send("import_task_files", session=session, task=self) - def add(self, lib): + def add(self, lib: library.Library): """Add the items as an album to the library and remove replaced items.""" self.align_album_level_fields() with lib.transaction(): @@ -814,7 +876,9 @@ class ImportTask(BaseImportTask): self.remove_replaced(lib) self.album = lib.add_album(self.imported_items()) - if self.choice_flag == action.APPLY: + if self.choice_flag == action.APPLY and isinstance( + self.match, autotag.AlbumMatch + ): # Copy album flexible fields to the DB # TODO: change the flow so we create the `Album` object earlier, # and we can move this into `self.apply_metadata`, just like @@ -824,12 +888,12 @@ class ImportTask(BaseImportTask): self.reimport_metadata(lib) - def record_replaced(self, lib): + def record_replaced(self, lib: library.Library): """Records the replaced items and albums in the `replaced_items` and `replaced_albums` dictionaries. """ self.replaced_items = defaultdict(list) - self.replaced_albums = defaultdict(list) + self.replaced_albums: dict[PathBytes, library.Album] = defaultdict() replaced_album_ids = set() for item in self.imported_items(): dup_items = list( @@ -847,7 +911,7 @@ class ImportTask(BaseImportTask): replaced_album_ids.add(dup_item.album_id) self.replaced_albums[replaced_album.path] = replaced_album - def reimport_metadata(self, lib): + def reimport_metadata(self, lib: library.Library): """For reimports, preserves metadata for reimported items and albums. """ @@ -980,7 +1044,7 @@ class ImportTask(BaseImportTask): class SingletonImportTask(ImportTask): """ImportTask for a single track that is not associated to an album.""" - def __init__(self, toppath, item): + def __init__(self, toppath: PathBytes | None, item: library.Item): super().__init__(toppath, [item.path], [item]) self.item = item self.is_album = False @@ -1022,7 +1086,9 @@ class SingletonImportTask(ImportTask): # Query for existing items using the same metadata. We use a # temporary `Item` object to generate any computed fields. tmp_item = library.Item(lib, **info) - keys = config["import"]["duplicate_keys"]["item"].as_str_seq() + keys: list[str] = config["import"]["duplicate_keys"][ + "item" + ].as_str_seq() dup_query = tmp_item.duplicates_query(keys) found_items = [] @@ -1044,7 +1110,7 @@ class SingletonImportTask(ImportTask): def infer_album_fields(self): raise NotImplementedError - def choose_match(self, session): + def choose_match(self, session: ImportSession): """Ask the session which match should apply and apply it.""" choice = session.choose_item(self) self.set_choice(choice) @@ -1092,23 +1158,24 @@ class SentinelImportTask(ImportTask): pass def save_progress(self): - if self.paths is None: + if not self.paths: # "Done" sentinel. - progress_reset(self.toppath) - else: + ImportState().progress_reset(self.toppath) + elif self.toppath: # "Directory progress" sentinel for singletons - progress_add(self.toppath, *self.paths) + super().save_progress() - def skip(self): + @property + def skip(self) -> bool: return True def set_choice(self, choice): raise NotImplementedError - def cleanup(self, **kwargs): + def cleanup(self, copy=False, delete=False, move=False): pass - def _emit_imported(self, session): + def _emit_imported(self, lib): pass @@ -1152,7 +1219,7 @@ class ArchiveImportTask(SentinelImportTask): implements the same interface as `tarfile.TarFile`. """ if not hasattr(cls, "_handlers"): - cls._handlers = [] + cls._handlers: list[tuple[Callable, ...]] = [] from zipfile import ZipFile, is_zipfile cls._handlers.append((is_zipfile, ZipFile)) @@ -1174,9 +1241,9 @@ class ArchiveImportTask(SentinelImportTask): return cls._handlers - def cleanup(self, **kwargs): + def cleanup(self, copy=False, delete=False, move=False): """Removes the temporary directory the archive was extracted to.""" - if self.extracted: + if self.extracted and self.toppath: log.debug( "Removing extracted directory: {0}", displayable_path(self.toppath), @@ -1187,10 +1254,13 @@ class ArchiveImportTask(SentinelImportTask): """Extracts the archive to a temporary directory and sets `toppath` to that directory. """ + assert self.toppath is not None, "toppath must be set" + for path_test, handler_class in self.handlers(): if path_test(os.fsdecode(self.toppath)): break - + else: + raise ValueError(f"No handler found for archive: {self.toppath}") extract_to = mkdtemp() archive = handler_class(os.fsdecode(self.toppath), mode="r") try: @@ -1219,7 +1289,7 @@ class ImportTaskFactory: indicated by a path. """ - def __init__(self, toppath, session): + def __init__(self, toppath: PathBytes, session: ImportSession): """Create a new task factory. `toppath` is the user-specified path to search for music to @@ -1246,6 +1316,7 @@ class ImportTaskFactory: extracted data. """ # Check whether this is an archive. + archive_task: ArchiveImportTask | None = None if self.is_archive: archive_task = self.unarchive() if not archive_task: @@ -1267,12 +1338,9 @@ class ImportTaskFactory: # it is finished. This is usually just a SentinelImportTask, but # for archive imports, send the archive task instead (to remove # the extracted directory). - if self.is_archive: - yield archive_task - else: - yield self.sentinel() + yield archive_task or self.sentinel() - def _create(self, task): + def _create(self, task: ImportTask | None): """Handle a new task to be emitted by the factory. Emit the `import_task_created` event and increment the @@ -1305,7 +1373,7 @@ class ImportTaskFactory: for dirs, paths in albums_in_dir(self.toppath): yield dirs, paths - def singleton(self, path): + def singleton(self, path: PathBytes): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): log.debug( @@ -1320,14 +1388,12 @@ class ImportTaskFactory: else: return None - def album(self, paths, dirs=None): + def album(self, paths: Iterable[PathBytes], dirs=None): """Return a `ImportTask` with all media files from paths. `dirs` is a list of parent directories used to record already imported albums. """ - if not paths: - return None if dirs is None: dirs = list({os.path.dirname(p) for p in paths}) @@ -1339,15 +1405,16 @@ class ImportTaskFactory: self.skipped += 1 return None - items = map(self.read_item, paths) - items = [item for item in items if item] + items: list[library.Item] = [ + item for item in map(self.read_item, paths) if item + ] - if items: + if len(items) > 0: return ImportTask(self.toppath, dirs, items) else: return None - def sentinel(self, paths=None): + def sentinel(self, paths: Iterable[PathBytes] | None = None): """Return a `SentinelImportTask` indicating the end of a top-level directory import. """ @@ -1382,7 +1449,7 @@ class ImportTaskFactory: log.debug("Archive extracted to: {0}", self.toppath) return archive_task - def read_item(self, path): + def read_item(self, path: PathBytes): """Return an `Item` read from the path. If an item cannot be read, return `None` instead and log an @@ -1425,12 +1492,13 @@ def _extend_pipeline(tasks, *stages): # Full-album pipeline stages. -def read_tasks(session): +def read_tasks(session: ImportSession): """A generator yielding all the albums (as ImportTask objects) found in the user-specified list of paths. In the case of a singleton import, yields single-item tasks instead. """ skipped = 0 + for toppath in session.paths: # Check whether we need to resume the import. session.ask_resume(toppath) @@ -1448,7 +1516,7 @@ def read_tasks(session): log.info("Skipped {0} paths.", skipped) -def query_tasks(session): +def query_tasks(session: ImportSession): """A generator that works as a drop-in-replacement for read_tasks. Instead of finding files from the filesystem, a query is used to match items from the library. @@ -1478,7 +1546,7 @@ def query_tasks(session): @pipeline.mutator_stage -def lookup_candidates(session, task): +def lookup_candidates(session: ImportSession, task: ImportTask): """A coroutine for performing the initial MusicBrainz lookup for an album. It accepts lists of Items and yields (items, cur_artist, cur_album, candidates, rec) tuples. If no match @@ -1500,7 +1568,7 @@ def lookup_candidates(session, task): @pipeline.stage -def user_query(session, task): +def user_query(session: ImportSession, task: ImportTask): """A coroutine for interfacing with the user about the tagging process. @@ -1571,7 +1639,7 @@ def user_query(session, task): return task -def resolve_duplicates(session, task): +def resolve_duplicates(session: ImportSession, task: ImportTask): """Check if a task conflicts with items or albums already imported and ask the session to resolve this. """ @@ -1614,7 +1682,7 @@ def resolve_duplicates(session, task): @pipeline.mutator_stage -def import_asis(session, task): +def import_asis(session: ImportSession, task: ImportTask): """Select the `action.ASIS` choice for all tasks. This stage replaces the initial_lookup and user_query stages @@ -1628,7 +1696,7 @@ def import_asis(session, task): apply_choice(session, task) -def apply_choice(session, task): +def apply_choice(session: ImportSession, task: ImportTask): """Apply the task's choice to the Album or Item it contains and add it to the library. """ @@ -1652,7 +1720,11 @@ def apply_choice(session, task): @pipeline.mutator_stage -def plugin_stage(session, func, task): +def plugin_stage( + session: ImportSession, + func: Callable[[ImportSession, ImportTask], None], + task: ImportTask, +): """A coroutine (pipeline stage) that calls the given function with each non-skipped import task. These stages occur between applying metadata changes and moving/copying/writing files. @@ -1669,7 +1741,7 @@ def plugin_stage(session, func, task): @pipeline.stage -def manipulate_files(session, task): +def manipulate_files(session: ImportSession, task: ImportTask): """A coroutine (pipeline stage) that performs necessary file manipulations *after* items have been added to the library and finalizes each task. @@ -1694,9 +1766,9 @@ def manipulate_files(session, task): operation = None task.manipulate_files( - operation, - write=session.config["write"], session=session, + operation=operation, + write=session.config["write"], ) # Progress, cleanup, and event. @@ -1704,7 +1776,7 @@ def manipulate_files(session, task): @pipeline.stage -def log_files(session, task): +def log_files(session: ImportSession, task: ImportTask): """A coroutine (pipeline stage) to log each file to be imported.""" if isinstance(task, SingletonImportTask): log.info("Singleton: {0}", displayable_path(task.item["path"])) @@ -1714,7 +1786,7 @@ def log_files(session, task): log.info(" {0}", displayable_path(item["path"])) -def group_albums(session): +def group_albums(session: ImportSession): """A pipeline stage that groups the items of each task into albums using their metadata. @@ -1731,10 +1803,10 @@ def group_albums(session): if task.skip: continue tasks = [] - sorted_items = sorted(task.items, key=group) + sorted_items: list[library.Item] = sorted(task.items, key=group) for _, items in itertools.groupby(sorted_items, group): - items = list(items) - task = ImportTask(task.toppath, [i.path for i in items], items) + l_items = list(items) + task = ImportTask(task.toppath, [i.path for i in l_items], l_items) tasks += task.handle_created(session) tasks.append(SentinelImportTask(task.toppath, task.paths)) @@ -1753,15 +1825,15 @@ def is_subdir_of_any_in_list(path, dirs): return any(d in ancestors for d in dirs) -def albums_in_dir(path): +def albums_in_dir(path: PathBytes): """Recursively searches the given directory and returns an iterable of (paths, items) where paths is a list of directories and items is a list of Items that is probably an album. Specifically, any folder containing any media files is an album. """ collapse_pat = collapse_paths = collapse_items = None - ignore = config["ignore"].as_str_seq() - ignore_hidden = config["ignore_hidden"].get(bool) + ignore: list[str] = config["ignore"].as_str_seq() + ignore_hidden: bool = config["ignore_hidden"].get(bool) for root, dirs, files in sorted_walk( path, ignore=ignore, ignore_hidden=ignore_hidden, logger=log diff --git a/beets/test/helper.py b/beets/test/helper.py index 4effa47f8..c0d785d6e 100644 --- a/beets/test/helper.py +++ b/beets/test/helper.py @@ -675,7 +675,7 @@ class ImportSessionFixture(ImportSession): >>> importer.run() This imports ``/path/to/import`` into `lib`. It skips the first - album and imports thesecond one with metadata from the tags. For the + album and imports the second one with metadata from the tags. For the remaining albums, the metadata from the autotagger will be applied. """ diff --git a/beets/util/__init__.py b/beets/util/__init__.py index 32a63b216..b882ed626 100644 --- a/beets/util/__init__.py +++ b/beets/util/__init__.py @@ -198,8 +198,8 @@ def ancestry(path: AnyStr) -> list[AnyStr]: def sorted_walk( - path: AnyStr, - ignore: Sequence[bytes] = (), + path: PathLike, + ignore: Sequence[PathLike] = (), ignore_hidden: bool = False, logger: Logger | None = None, ) -> Iterator[tuple[bytes, Sequence[bytes], Sequence[bytes]]]: @@ -210,7 +210,9 @@ def sorted_walk( """ # Make sure the paths aren't Unicode strings. bytes_path = bytestring_path(path) - ignore = [bytestring_path(i) for i in ignore] + ignore_bytes = [ # rename prevents mypy variable shadowing issue + bytestring_path(i) for i in ignore + ] # Get all the directories and files at this level. try: @@ -230,7 +232,7 @@ def sorted_walk( # Skip ignored filenames. skip = False - for pat in ignore: + for pat in ignore_bytes: if fnmatch.fnmatch(base, pat): if logger: logger.debug( @@ -257,7 +259,7 @@ def sorted_walk( # Recurse into directories. for base in dirs: cur = os.path.join(bytes_path, base) - yield from sorted_walk(cur, ignore, ignore_hidden, logger) + yield from sorted_walk(cur, ignore_bytes, ignore_hidden, logger) def path_as_posix(path: bytes) -> bytes: @@ -297,8 +299,8 @@ def fnmatch_all(names: Sequence[bytes], patterns: Sequence[bytes]) -> bool: def prune_dirs( - path: bytes, - root: bytes | None = None, + path: PathLike, + root: PathLike | None = None, clutter: Sequence[str] = (".DS_Store", "Thumbs.db"), ): """If path is an empty directory, then remove it. Recursively remove @@ -419,12 +421,13 @@ PATH_SEP: bytes = bytestring_path(os.sep) def displayable_path( - path: BytesOrStr | tuple[BytesOrStr, ...], separator: str = "; " + path: PathLike | Iterable[PathLike], separator: str = "; " ) -> str: """Attempts to decode a bytestring path to a unicode object for the purpose of displaying it to the user. If the `path` argument is a list or a tuple, the elements are joined with `separator`. """ + if isinstance(path, (list, tuple)): return separator.join(displayable_path(p) for p in path) elif isinstance(path, str): @@ -472,7 +475,7 @@ def samefile(p1: bytes, p2: bytes) -> bool: return False -def remove(path: bytes, soft: bool = True): +def remove(path: PathLike, soft: bool = True): """Remove the file. If `soft`, then no error will be raised if the file does not exist. """ @@ -1130,7 +1133,10 @@ def get_temp_filename( tempdir = get_module_tempdir(module) tempdir.mkdir(parents=True, exist_ok=True) - _, filename = tempfile.mkstemp(dir=tempdir, prefix=prefix, suffix=suffix) + descriptor, filename = tempfile.mkstemp( + dir=tempdir, prefix=prefix, suffix=suffix + ) + os.close(descriptor) return bytestring_path(filename) diff --git a/beets/util/artresizer.py b/beets/util/artresizer.py index 898ffeab8..23dce3c9f 100644 --- a/beets/util/artresizer.py +++ b/beets/util/artresizer.py @@ -314,6 +314,11 @@ class IMBackend(LocalBackend): else: out_str = stdout + # ImageMagick 7.1.1-44 outputs in a different format. + if b"(" in out_str and out_str.endswith(b")"): + # Extract diff from "... (diff)". + out_str = out_str[out_str.index(b"(") + 1 : -1] + try: phash_diff = float(out_str) except ValueError: diff --git a/beets/util/pipeline.py b/beets/util/pipeline.py index d23b1bd10..8f7be8194 100644 --- a/beets/util/pipeline.py +++ b/beets/util/pipeline.py @@ -31,9 +31,14 @@ To do so, pass an iterable of coroutines to the Pipeline constructor in place of any single coroutine. """ +from __future__ import annotations + import queue import sys from threading import Lock, Thread +from typing import Callable, Generator + +from typing_extensions import TypeVar, TypeVarTuple, Unpack BUBBLE = "__PIPELINE_BUBBLE__" POISON = "__PIPELINE_POISON__" @@ -149,7 +154,22 @@ def multiple(messages): return MultiMessage(messages) -def stage(func): +A = TypeVarTuple("A") # Arguments of a function (omitting the task) +T = TypeVar("T") # Type of the task +# Normally these are concatenated i.e. (*args, task) + +# Return type of the function (should normally be task but sadly +# we cant enforce this with the current stage functions without +# a refactor) +R = TypeVar("R") + + +def stage( + func: Callable[ + [Unpack[A], T], + R | None, + ], +): """Decorate a function to become a simple stage. >>> @stage @@ -163,8 +183,8 @@ def stage(func): [3, 4, 5] """ - def coro(*args): - task = None + def coro(*args: Unpack[A]) -> Generator[R | T | None, T, None]: + task: R | T | None = None while True: task = yield task task = func(*(args + (task,))) @@ -172,7 +192,7 @@ def stage(func): return coro -def mutator_stage(func): +def mutator_stage(func: Callable[[Unpack[A], T], R]): """Decorate a function that manipulates items in a coroutine to become a simple stage. @@ -187,7 +207,7 @@ def mutator_stage(func): [{'x': True}, {'a': False, 'x': True}] """ - def coro(*args): + def coro(*args: Unpack[A]) -> Generator[T | None, T, None]: task = None while True: task = yield task diff --git a/beetsplug/_typing.py b/beetsplug/_typing.py index 915ea77e8..b772ffdd5 100644 --- a/beetsplug/_typing.py +++ b/beetsplug/_typing.py @@ -113,3 +113,23 @@ class GoogleCustomSearchAPI: """Pagemap data with a single meta tags dict in a list.""" metatags: list[JSONDict] + + +class TranslatorAPI: + class Language(TypedDict): + """Language data returned by the translator API.""" + + language: str + score: float + + class Translation(TypedDict): + """Translation data returned by the translator API.""" + + text: str + to: str + + class Response(TypedDict): + """Response from the translator API.""" + + detectedLanguage: TranslatorAPI.Language + translations: list[TranslatorAPI.Translation] diff --git a/beetsplug/convert.py b/beetsplug/convert.py index 536acf16e..0c50fc73f 100644 --- a/beetsplug/convert.py +++ b/beetsplug/convert.py @@ -22,6 +22,7 @@ import tempfile import threading from string import Template +import mediafile from confuse import ConfigTypeError, Optional from beets import art, config, plugins, ui, util @@ -351,6 +352,15 @@ class ConvertPlugin(BeetsPlugin): item = yield (item, original, converted) dest = item.destination(basedir=dest_dir, path_formats=path_formats) + # Ensure that desired item is readable before processing it. Needed + # to avoid any side-effect of the conversion (linking, keep_new, + # refresh) if we already know that it will fail. + try: + mediafile.MediaFile(util.syspath(item.path)) + except mediafile.UnreadableFileError as exc: + self._log.error("Could not open file to convert: {0}", exc) + continue + # When keeping the new file in the library, we first move the # current (pristine) file to the destination. We'll then copy it # back to its old path or transcode it to a new path. diff --git a/beetsplug/listenbrainz.py b/beetsplug/listenbrainz.py index 1e2912793..37a7920b9 100644 --- a/beetsplug/listenbrainz.py +++ b/beetsplug/listenbrainz.py @@ -110,7 +110,7 @@ class ListenBrainzPlugin(BeetsPlugin): if track["track_metadata"].get("release_name") is None: continue mbid_mapping = track["track_metadata"].get("mbid_mapping", {}) - # print(json.dumps(track, indent=4, sort_keys=True)) + mbid = None if mbid_mapping.get("recording_mbid") is None: # search for the track using title and release mbid = self.get_mb_recording_id(track) diff --git a/beetsplug/lyrics.py b/beetsplug/lyrics.py index 1732edbf7..cb48e2424 100644 --- a/beetsplug/lyrics.py +++ b/beetsplug/lyrics.py @@ -17,16 +17,17 @@ from __future__ import annotations import atexit -import errno import itertools import math -import os.path import re +import textwrap from contextlib import contextmanager, suppress from dataclasses import dataclass from functools import cached_property, partial, total_ordering from html import unescape from http import HTTPStatus +from itertools import groupby +from pathlib import Path from typing import TYPE_CHECKING, Iterable, Iterator, NamedTuple from urllib.parse import quote, quote_plus, urlencode, urlparse @@ -40,49 +41,22 @@ from beets import plugins, ui from beets.autotag.hooks import string_dist if TYPE_CHECKING: - from beets.importer import ImportTask - from beets.library import Item + from logging import Logger - from ._typing import GeniusAPI, GoogleCustomSearchAPI, JSONDict, LRCLibAPI + from beets.importer import ImportTask + from beets.library import Item, Library + + from ._typing import ( + GeniusAPI, + GoogleCustomSearchAPI, + JSONDict, + LRCLibAPI, + TranslatorAPI, + ) USER_AGENT = f"beets/{beets.__version__}" INSTRUMENTAL_LYRICS = "[Instrumental]" -# The content for the base index.rst generated in ReST mode. -REST_INDEX_TEMPLATE = """Lyrics -====== - -* :ref:`Song index ` -* :ref:`search` - -Artist index: - -.. toctree:: - :maxdepth: 1 - :glob: - - artists/* -""" - -# The content for the base conf.py generated. -REST_CONF_TEMPLATE = """# -*- coding: utf-8 -*- -master_doc = 'index' -project = 'Lyrics' -copyright = 'none' -author = 'Various Authors' -latex_documents = [ - (master_doc, 'Lyrics.tex', project, - author, 'manual'), -] -epub_title = project -epub_author = author -epub_publisher = author -epub_copyright = copyright -epub_exclude_files = ['search.html'] -epub_tocdepth = 1 -epub_tocdup = False -""" - class NotFoundError(requests.exceptions.HTTPError): pass @@ -252,6 +226,12 @@ class RequestHandler: self.debug("Fetching JSON from {}", url) return r_session.get(url, **kwargs).json() + def post_json(self, url: str, params: JSONDict | None = None, **kwargs): + """Send POST request and return JSON response.""" + url = self.format_url(url, params) + self.debug("Posting JSON to {}", url) + return r_session.post(url, **kwargs).json() + @contextmanager def handle_request(self) -> Iterator[None]: try: @@ -760,6 +740,214 @@ class Google(SearchBackend): return None +@dataclass +class Translator(RequestHandler): + TRANSLATE_URL = "https://api.cognitive.microsofttranslator.com/translate" + LINE_PARTS_RE = re.compile(r"^(\[\d\d:\d\d.\d\d\]|) *(.*)$") + SEPARATOR = " | " + remove_translations = partial(re.compile(r" / [^\n]+").sub, "") + + _log: Logger + api_key: str + to_language: str + from_languages: list[str] + + @classmethod + def from_config( + cls, + log: Logger, + api_key: str, + to_language: str, + from_languages: list[str] | None = None, + ) -> Translator: + return cls( + log, + api_key, + to_language.upper(), + [x.upper() for x in from_languages or []], + ) + + def get_translations(self, texts: Iterable[str]) -> list[tuple[str, str]]: + """Return translations for the given texts. + + To reduce the translation 'cost', we translate unique texts, and then + map the translations back to the original texts. + """ + unique_texts = list(dict.fromkeys(texts)) + text = self.SEPARATOR.join(unique_texts) + data: list[TranslatorAPI.Response] = self.post_json( + self.TRANSLATE_URL, + headers={"Ocp-Apim-Subscription-Key": self.api_key}, + json=[{"text": text}], + params={"api-version": "3.0", "to": self.to_language}, + ) + + translated_text = data[0]["translations"][0]["text"] + translations = translated_text.split(self.SEPARATOR) + trans_by_text = dict(zip(unique_texts, translations)) + return list(zip(texts, (trans_by_text.get(t, "") for t in texts))) + + @classmethod + def split_line(cls, line: str) -> tuple[str, str]: + """Split line to (timestamp, text).""" + if m := cls.LINE_PARTS_RE.match(line): + return m[1], m[2] + + return "", "" + + def append_translations(self, lines: Iterable[str]) -> list[str]: + """Append translations to the given lyrics texts. + + Lines may contain timestamps from LRCLib which need to be temporarily + removed for the translation. They can take any of these forms: + - empty + Text - text only + [00:00:00] - timestamp only + [00:00:00] Text - timestamp with text + """ + # split into [(timestamp, text), ...]] + ts_and_text = list(map(self.split_line, lines)) + timestamps = [ts for ts, _ in ts_and_text] + text_pairs = self.get_translations([ln for _, ln in ts_and_text]) + + # only add the separator for non-empty translations + texts = [" / ".join(filter(None, p)) for p in text_pairs] + # only add the space between non-empty timestamps and texts + return [" ".join(filter(None, p)) for p in zip(timestamps, texts)] + + def translate(self, new_lyrics: str, old_lyrics: str) -> str: + """Translate the given lyrics to the target language. + + Check old lyrics for existing translations and return them if their + original text matches the new lyrics. This is to avoid translating + the same lyrics multiple times. + + If the lyrics are already in the target language or not in any of + of the source languages (if configured), they are returned as is. + + The footer with the source URL is preserved, if present. + """ + if ( + " / " in old_lyrics + and self.remove_translations(old_lyrics) == new_lyrics + ): + self.info("🔵 Translations already exist") + return old_lyrics + + lyrics_language = langdetect.detect(new_lyrics).upper() + if lyrics_language == self.to_language: + self.info( + "🔵 Lyrics are already in the target language {}", + self.to_language, + ) + return new_lyrics + + if self.from_languages and lyrics_language not in self.from_languages: + self.info( + "🔵 Configuration {} does not permit translating from {}", + self.from_languages, + lyrics_language, + ) + return new_lyrics + + lyrics, *url = new_lyrics.split("\n\nSource: ") + with self.handle_request(): + translated_lines = self.append_translations(lyrics.splitlines()) + self.info("🟢 Translated lyrics to {}", self.to_language) + return "\n\nSource: ".join(["\n".join(translated_lines), *url]) + + +@dataclass +class RestFiles: + # The content for the base index.rst generated in ReST mode. + REST_INDEX_TEMPLATE = textwrap.dedent(""" + Lyrics + ====== + + * :ref:`Song index ` + * :ref:`search` + + Artist index: + + .. toctree:: + :maxdepth: 1 + :glob: + + artists/* + """).strip() + + # The content for the base conf.py generated. + REST_CONF_TEMPLATE = textwrap.dedent(""" + master_doc = "index" + project = "Lyrics" + copyright = "none" + author = "Various Authors" + latex_documents = [ + (master_doc, "Lyrics.tex", project, author, "manual"), + ] + epub_exclude_files = ["search.html"] + epub_tocdepth = 1 + epub_tocdup = False + """).strip() + + directory: Path + + @cached_property + def artists_dir(self) -> Path: + dir = self.directory / "artists" + dir.mkdir(parents=True, exist_ok=True) + return dir + + def write_indexes(self) -> None: + """Write conf.py and index.rst files necessary for Sphinx + + We write minimal configurations that are necessary for Sphinx + to operate. We do not overwrite existing files so that + customizations are respected.""" + index_file = self.directory / "index.rst" + if not index_file.exists(): + index_file.write_text(self.REST_INDEX_TEMPLATE) + conf_file = self.directory / "conf.py" + if not conf_file.exists(): + conf_file.write_text(self.REST_CONF_TEMPLATE) + + def write_artist(self, artist: str, items: Iterable[Item]) -> None: + parts = [ + f'{artist}\n{"=" * len(artist)}', + ".. contents::\n :local:", + ] + for album, items in groupby(items, key=lambda i: i.album): + parts.append(f'{album}\n{"-" * len(album)}') + parts.extend( + part + for i in items + if (title := f":index:`{i.title.strip()}`") + for part in ( + f'{title}\n{"~" * len(title)}', + textwrap.indent(i.lyrics, "| "), + ) + ) + file = self.artists_dir / f"{slug(artist)}.rst" + file.write_text("\n\n".join(parts).strip()) + + def write(self, items: list[Item]) -> None: + self.directory.mkdir(exist_ok=True, parents=True) + self.write_indexes() + + items.sort(key=lambda i: i.albumartist) + for artist, artist_items in groupby(items, key=lambda i: i.albumartist): + self.write_artist(artist.strip(), artist_items) + + d = self.directory + text = f""" + ReST files generated. to build, use one of: + sphinx-build -b html {d} {d/"html"} + sphinx-build -b epub {d} {d/"epub"} + sphinx-build -b latex {d} {d/"latex"} && make -C {d/"latex"} all-pdf + """ + ui.print_(textwrap.dedent(text)) + + class LyricsPlugin(RequestHandler, plugins.BeetsPlugin): BACKEND_BY_NAME = { b.name: b for b in [LRCLib, Google, Genius, Tekstowo, MusiXmatch] @@ -776,15 +964,23 @@ class LyricsPlugin(RequestHandler, plugins.BeetsPlugin): return [self.BACKEND_BY_NAME[c](self.config, self._log) for c in chosen] + @cached_property + def translator(self) -> Translator | None: + config = self.config["translate"] + if config["api_key"].get() and config["to_language"].get(): + return Translator.from_config(self._log, **config.flatten()) + return None + def __init__(self): super().__init__() - self.import_stages = [self.imported] self.config.add( { "auto": True, - "bing_client_secret": None, - "bing_lang_from": [], - "bing_lang_to": None, + "translate": { + "api_key": None, + "from_languages": [], + "to_language": None, + }, "dist_thresh": 0.11, "google_API_key": None, "google_engine_ID": "009217259823014548361:lndtuqkycfu", @@ -795,6 +991,7 @@ class LyricsPlugin(RequestHandler, plugins.BeetsPlugin): "fallback": None, "force": False, "local": False, + "print": False, "synced": False, # Musixmatch is disabled by default as they are currently blocking # requests with the beets user agent. @@ -803,52 +1000,27 @@ class LyricsPlugin(RequestHandler, plugins.BeetsPlugin): ], } ) - self.config["bing_client_secret"].redact = True + self.config["translate"]["api_key"].redact = True self.config["google_API_key"].redact = True self.config["google_engine_ID"].redact = True self.config["genius_api_key"].redact = True - # State information for the ReST writer. - # First, the current artist we're writing. - self.artist = "Unknown artist" - # The current album: False means no album yet. - self.album = False - # The current rest file content. None means the file is not - # open yet. - self.rest = None - - self.config["bing_lang_from"] = [ - x.lower() for x in self.config["bing_lang_from"].as_str_seq() - ] - - @cached_property - def bing_access_token(self) -> str | None: - params = { - "client_id": "beets", - "client_secret": self.config["bing_client_secret"], - "scope": "https://api.microsofttranslator.com", - "grant_type": "client_credentials", - } - - oauth_url = "https://datamarket.accesscontrol.windows.net/v2/OAuth2-13" - with self.handle_request(): - r = r_session.post(oauth_url, params=params) - return r.json()["access_token"] + if self.config["auto"]: + self.import_stages = [self.imported] def commands(self): cmd = ui.Subcommand("lyrics", help="fetch song lyrics") cmd.parser.add_option( "-p", "--print", - dest="printlyr", action="store_true", - default=False, + default=self.config["print"].get(), help="print lyrics to console", ) cmd.parser.add_option( "-r", "--write-rest", - dest="writerest", + dest="rest_directory", action="store", default=None, metavar="dir", @@ -857,154 +1029,69 @@ class LyricsPlugin(RequestHandler, plugins.BeetsPlugin): cmd.parser.add_option( "-f", "--force", - dest="force_refetch", action="store_true", - default=False, + default=self.config["force"].get(), help="always re-download lyrics", ) cmd.parser.add_option( "-l", "--local", - dest="local_only", action="store_true", - default=False, + default=self.config["local"].get(), help="do not fetch missing lyrics", ) - def func(lib, opts, args): + def func(lib: Library, opts, args) -> None: # The "write to files" option corresponds to the # import_write config value. - write = ui.should_write() - if opts.writerest: - self.writerest_indexes(opts.writerest) - items = lib.items(ui.decargs(args)) + self.config.set(vars(opts)) + items = list(lib.items(args)) for item in items: - if not opts.local_only and not self.config["local"]: - self.fetch_item_lyrics( - item, write, opts.force_refetch or self.config["force"] - ) - if item.lyrics: - if opts.printlyr: - ui.print_(item.lyrics) - if opts.writerest: - self.appendrest(opts.writerest, item) - if opts.writerest and items: - # flush last artist & write to ReST - self.writerest(opts.writerest) - ui.print_("ReST files generated. to build, use one of:") - ui.print_( - " sphinx-build -b html %s _build/html" % opts.writerest - ) - ui.print_( - " sphinx-build -b epub %s _build/epub" % opts.writerest - ) - ui.print_( - ( - " sphinx-build -b latex %s _build/latex " - "&& make -C _build/latex all-pdf" - ) - % opts.writerest - ) + self.add_item_lyrics(item, ui.should_write()) + if item.lyrics and opts.print: + ui.print_(item.lyrics) + + if opts.rest_directory and ( + items := [i for i in items if i.lyrics] + ): + RestFiles(Path(opts.rest_directory)).write(items) cmd.func = func return [cmd] - def appendrest(self, directory, item): - """Append the item to an ReST file - - This will keep state (in the `rest` variable) in order to avoid - writing continuously to the same files. - """ - - if slug(self.artist) != slug(item.albumartist): - # Write current file and start a new one ~ item.albumartist - self.writerest(directory) - self.artist = item.albumartist.strip() - self.rest = "%s\n%s\n\n.. contents::\n :local:\n\n" % ( - self.artist, - "=" * len(self.artist), - ) - - if self.album != item.album: - tmpalbum = self.album = item.album.strip() - if self.album == "": - tmpalbum = "Unknown album" - self.rest += "{}\n{}\n\n".format(tmpalbum, "-" * len(tmpalbum)) - title_str = ":index:`%s`" % item.title.strip() - block = "| " + item.lyrics.replace("\n", "\n| ") - self.rest += "{}\n{}\n\n{}\n\n".format( - title_str, "~" * len(title_str), block - ) - - def writerest(self, directory): - """Write self.rest to a ReST file""" - if self.rest is not None and self.artist is not None: - path = os.path.join( - directory, "artists", slug(self.artist) + ".rst" - ) - with open(path, "wb") as output: - output.write(self.rest.encode("utf-8")) - - def writerest_indexes(self, directory): - """Write conf.py and index.rst files necessary for Sphinx - - We write minimal configurations that are necessary for Sphinx - to operate. We do not overwrite existing files so that - customizations are respected.""" - try: - os.makedirs(os.path.join(directory, "artists")) - except OSError as e: - if e.errno == errno.EEXIST: - pass - else: - raise - indexfile = os.path.join(directory, "index.rst") - if not os.path.exists(indexfile): - with open(indexfile, "w") as output: - output.write(REST_INDEX_TEMPLATE) - conffile = os.path.join(directory, "conf.py") - if not os.path.exists(conffile): - with open(conffile, "w") as output: - output.write(REST_CONF_TEMPLATE) - def imported(self, _, task: ImportTask) -> None: """Import hook for fetching lyrics automatically.""" - if self.config["auto"]: - for item in task.imported_items(): - self.fetch_item_lyrics(item, False, self.config["force"]) + for item in task.imported_items(): + self.add_item_lyrics(item, False) - def fetch_item_lyrics(self, item: Item, write: bool, force: bool) -> None: + def find_lyrics(self, item: Item) -> str: + album, length = item.album, round(item.length) + matches = ( + [ + lyrics + for t in titles + if (lyrics := self.get_lyrics(a, t, album, length)) + ] + for a, titles in search_pairs(item) + ) + + return "\n\n---\n\n".join(next(filter(None, matches), [])) + + def add_item_lyrics(self, item: Item, write: bool) -> None: """Fetch and store lyrics for a single item. If ``write``, then the lyrics will also be written to the file itself. """ - # Skip if the item already has lyrics. - if not force and item.lyrics: + if self.config["local"]: + return + + if not self.config["force"] and item.lyrics: self.info("🔵 Lyrics already present: {}", item) return - lyrics_matches = [] - album, length = item.album, round(item.length) - for artist, titles in search_pairs(item): - lyrics_matches = [ - self.get_lyrics(artist, title, album, length) - for title in titles - ] - if any(lyrics_matches): - break - - lyrics = "\n\n---\n\n".join(filter(None, lyrics_matches)) - - if lyrics: + if lyrics := self.find_lyrics(item): self.info("🟢 Found lyrics: {0}", item) - if self.config["bing_client_secret"].get(): - lang_from = langdetect.detect(lyrics) - if self.config["bing_lang_to"].get() != lang_from and ( - not self.config["bing_lang_from"] - or (lang_from in self.config["bing_lang_from"].as_str_seq()) - ): - lyrics = self.append_translation( - lyrics, self.config["bing_lang_to"] - ) + if translator := self.translator: + lyrics = translator.translate(lyrics, item.lyrics) else: self.info("🔴 Lyrics not found: {}", item) lyrics = self.config["fallback"].get() @@ -1027,30 +1114,3 @@ class LyricsPlugin(RequestHandler, plugins.BeetsPlugin): return f"{lyrics}\n\nSource: {url}" return None - - def append_translation(self, text, to_lang): - from xml.etree import ElementTree - - if not (token := self.bing_access_token): - self.warn( - "Could not get Bing Translate API access token. " - "Check your 'bing_client_secret' password." - ) - return text - - # Extract unique lines to limit API request size per song - lines = text.split("\n") - unique_lines = set(lines) - url = "https://api.microsofttranslator.com/v2/Http.svc/Translate" - with self.handle_request(): - text = self.fetch_text( - url, - headers={"Authorization": f"Bearer {token}"}, - params={"text": "|".join(unique_lines), "to": to_lang}, - ) - if translated := ElementTree.fromstring(text.encode("utf-8")).text: - # Use a translation mapping dict to build resulting lyrics - translations = dict(zip(unique_lines, translated.split("|"))) - return "".join(f"{ln} / {translations[ln]}\n" for ln in lines) - - return text diff --git a/beetsplug/smartplaylist.py b/beetsplug/smartplaylist.py index a3b24b569..d758c0255 100644 --- a/beetsplug/smartplaylist.py +++ b/beetsplug/smartplaylist.py @@ -14,8 +14,8 @@ """Generates smart playlists based on beets queries.""" -import json import os +from urllib.parse import quote from urllib.request import pathname2url from beets import ui @@ -327,7 +327,8 @@ class SmartPlaylistPlugin(BeetsPlugin): if extm3u: attr = [(k, entry.item[k]) for k in keys] al = [ - f" {a[0]}={json.dumps(str(a[1]))}" for a in attr + f" {key}=\"{quote(str(value), safe='/:')}\"" + for key, value in attr ] attrs = "".join(al) comment = "#EXTINF:{}{},{} - {}\n".format( diff --git a/docs/changelog.rst b/docs/changelog.rst index ec7861f98..67eb5891e 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -19,9 +19,16 @@ New features: control the maximum allowed distance between the lyrics search result and the tagged item's artist and title. This is useful for preventing false positives when fetching lyrics. +* :doc:`plugins/lyrics`: Rewrite lyrics translation functionality to use Azure + AI Translator API and add relevant instructions to the documentation. Bug fixes: +* :doc:`plugins/listenbrainz`: Fix rST formatting for URLs of Listenbrainz API Key documentation and config.yaml. +* :doc:`plugins/listenbrainz`: Fix ``UnboundLocalError`` in cases where 'mbid' is not defined. +* :doc:`plugins/fetchart`: Fix fetchart bug where a tempfile could not be deleted due to never being + properly closed. + :bug:`5521` * :doc:`plugins/lyrics`: LRCLib will fallback to plain lyrics if synced lyrics are not found and `synced` flag is set to `yes`. * Synchronise files included in the source distribution with what we used to @@ -72,6 +79,7 @@ Bug fixes: :bug:`5583` * Handle potential OSError when unlinking temporary files in ArtResizer. :bug:`5615` +* ImageMagick 7.1.1-44 is now supported. For packagers: @@ -85,6 +93,10 @@ Other changes: wrong (outdated) commit. Now the tag is created in the same workflow step right after committing the version update. :bug:`5539` +* Added some typehints: ImportSession and Pipeline have typehints now. Should + improve useability for new developers. +* :doc:`/plugins/smartplaylist`: URL-encode additional item `fields` within generated + EXTM3U playlists instead of JSON-encoding them. 2.2.0 (December 02, 2024) ------------------------- @@ -512,6 +524,8 @@ Bug fixes: `requests` timeout. * Fix cover art resizing logic to support multiple steps of resizing :bug:`5151` +* :doc:`/plugins/convert`: Fix attempt to convert and perform side-effects if + library file is not readable. For plugin developers: diff --git a/docs/faq.rst b/docs/faq.rst index b740c6503..ac7818ab2 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -174,9 +174,8 @@ pages. …report a bug in beets? ----------------------- -We use the `issue tracker `__ -on GitHub. `Enter a new issue `__ -there to report a bug. Please follow these guidelines when reporting an issue: +We use the `issue tracker`_ on GitHub where you can `open a new ticket`_. +Please follow these guidelines when reporting an issue: - Most importantly: if beets is crashing, please `include the traceback `__. Tracebacks can be more @@ -206,6 +205,7 @@ If you've never reported a bug before, Mozilla has some well-written `general guidelines for good bug reports`_. +.. _issue tracker: https://github.com/beetbox/beets/issues .. _general guidelines for good bug reports: https://developer.mozilla.org/en-US/docs/Mozilla/QA/Bug_writing_guidelines @@ -343,11 +343,11 @@ read the file. You can also use specialized programs for checking file integrity---for example, type ``metaflac --list music.flac`` to check FLAC files. -If beets still complains about a file that seems to be valid, `file a -bug `__ and we'll look into -it. There's always a possibility that there's a bug "upstream" in the -`Mutagen `__ library used by beets, -in which case we'll forward the bug to that project's tracker. +If beets still complains about a file that seems to be valid, `open a new +ticket`_ and we'll look into it. There's always a possibility that there's +a bug "upstream" in the `Mutagen `__ +library used by beets, in which case we'll forward the bug to that project's +tracker. .. _importhang: @@ -398,3 +398,5 @@ try `this Super User answer`_. .. _this Super User answer: https://superuser.com/a/284361/4569 .. _pip: https://pip.pypa.io/en/stable/ +.. _open a new ticket: + https://github.com/beetbox/beets/issues/new?template=bug-report.md diff --git a/docs/guides/tagger.rst b/docs/guides/tagger.rst index 68ad908e8..bf1ecbd8a 100644 --- a/docs/guides/tagger.rst +++ b/docs/guides/tagger.rst @@ -76,7 +76,8 @@ all of these limitations. Musepack, Windows Media, Opus, and AIFF files are supported. (Do you use some other format? Please `file a feature request`_!) -.. _file a feature request: https://github.com/beetbox/beets/issues/new +.. _file a feature request: + https://github.com/beetbox/beets/issues/new?template=feature-request.md Now that that's out of the way, let's tag some music. diff --git a/docs/plugins/index.rst b/docs/plugins/index.rst index 6705344c9..b7998ef19 100644 --- a/docs/plugins/index.rst +++ b/docs/plugins/index.rst @@ -478,6 +478,9 @@ Here are a few of the plugins written by the beets community: `beets-ibroadcast`_ Uploads tracks to the `iBroadcast`_ cloud service. +`beets-id3extract`_ + Maps arbitrary ID3 tags to beets custom fields. + `beets-importreplace`_ Lets you perform regex replacements on incoming metadata. @@ -560,6 +563,7 @@ Here are a few of the plugins written by the beets community: .. _beets-follow: https://github.com/nolsto/beets-follow .. _beets-ibroadcast: https://github.com/ctrueden/beets-ibroadcast .. _iBroadcast: https://ibroadcast.com/ +.. _beets-id3extract: https://github.com/bcotton/beets-id3extract .. _beets-importreplace: https://github.com/edgars-supe/beets-importreplace .. _beets-setlister: https://github.com/tomjaspers/beets-setlister .. _beets-noimport: https://gitlab.com/tiago.dias/beets-noimport diff --git a/docs/plugins/listenbrainz.rst b/docs/plugins/listenbrainz.rst index 1be15ae67..037ccd685 100644 --- a/docs/plugins/listenbrainz.rst +++ b/docs/plugins/listenbrainz.rst @@ -8,14 +8,14 @@ The ListenBrainz plugin for beets allows you to interact with the ListenBrainz s Installation ------------ -To enable the ListenBrainz plugin, add the following to your beets configuration file (`config.yaml`): +To enable the ListenBrainz plugin, add the following to your beets configuration file (`config.yaml`_): .. code-block:: yaml plugins: - listenbrainz -You can then configure the plugin by providing your Listenbrainz token (see intructions `here`_`)and username:: +You can then configure the plugin by providing your Listenbrainz token (see intructions `here`_) and username:: listenbrainz: token: TOKEN @@ -28,4 +28,5 @@ Usage Once the plugin is enabled, you can import the listening history using the `lbimport` command in beets. -.. _here: https://listenbrainz.readthedocs.io/en/latest/users/api/index.html#get-the-user-token \ No newline at end of file +.. _here: https://listenbrainz.readthedocs.io/en/latest/users/api/index.html#get-the-user-token +.. _config.yaml: ../reference/config.rst diff --git a/docs/plugins/lyrics.rst b/docs/plugins/lyrics.rst index f034cf47a..a20f97faf 100644 --- a/docs/plugins/lyrics.rst +++ b/docs/plugins/lyrics.rst @@ -38,26 +38,30 @@ Default configuration: lyrics: auto: yes - bing_client_secret: null - bing_lang_from: [] - bing_lang_to: null + translate: + api_key: + from_languages: [] + to_language: dist_thresh: 0.11 fallback: null force: no google_API_key: null google_engine_ID: 009217259823014548361:lndtuqkycfu + print: no sources: [lrclib, google, genius, tekstowo] synced: no The available options are: - **auto**: Fetch lyrics automatically during import. -- **bing_client_secret**: Your Bing Translation application password - (see :ref:`lyrics-translation`) -- **bing_lang_from**: By default all lyrics with a language other than - ``bing_lang_to`` are translated. Use a list of lang codes to restrict the set - of source languages to translate. -- **bing_lang_to**: Language to translate lyrics into. +- **translate**: + + - **api_key**: Api key to access your Azure Translator resource. (see + :ref:`lyrics-translation`) + - **from_languages**: By default all lyrics with a language other than + ``translate_to`` are translated. Use a list of language codes to restrict + them. + - **to_language**: Language code to translate lyrics to. - **dist_thresh**: The maximum distance between the artist and title combination of the music file and lyrics candidate to consider them a match. Lower values will make the plugin more strict, higher values will make it @@ -72,6 +76,7 @@ The available options are: - **google_engine_ID**: The custom search engine to use. Default: The `beets custom search engine`_, which gathers an updated list of sources known to be scrapeable. +- **print**: Print lyrics to the console. - **sources**: List of sources to search for lyrics. An asterisk ``*`` expands to all available sources. The ``google`` source will be automatically deactivated if no ``google_API_key`` is setup. @@ -104,9 +109,8 @@ Rendering Lyrics into Other Formats ----------------------------------- The ``-r directory, --write-rest directory`` option renders all lyrics as -`reStructuredText`_ (ReST) documents in ``directory`` (by default, the current -directory). That directory, in turn, can be parsed by tools like `Sphinx`_ to -generate HTML, ePUB, or PDF documents. +`reStructuredText`_ (ReST) documents in ``directory``. That directory, in turn, +can be parsed by tools like `Sphinx`_ to generate HTML, ePUB, or PDF documents. Minimal ``conf.py`` and ``index.rst`` files are created the first time the command is run. They are not overwritten on subsequent runs, so you can safely @@ -119,19 +123,19 @@ Sphinx supports various `builders`_, see a few suggestions: :: - sphinx-build -b html . _build/html + sphinx-build -b html /html .. admonition:: Build an ePUB3 formatted file, usable on ebook readers :: - sphinx-build -b epub3 . _build/epub + sphinx-build -b epub3 /epub .. admonition:: Build a PDF file, which incidentally also builds a LaTeX file :: - sphinx-build -b latex %s _build/latex && make -C _build/latex all-pdf + sphinx-build -b latex /latex && make -C /latex all-pdf .. _Sphinx: https://www.sphinx-doc.org/ @@ -165,10 +169,28 @@ After that, the lyrics plugin will fall back on other declared data sources. Activate On-the-Fly Translation ------------------------------- -You need to register for a Microsoft Azure Marketplace free account and -to the `Microsoft Translator API`_. Follow the four steps process, specifically -at step 3 enter ``beets`` as *Client ID* and copy/paste the generated -*Client secret* into your ``bing_client_secret`` configuration, alongside -``bing_lang_to`` target ``language code``. +We use Azure to optionally translate your lyrics. To set up the integration, +follow these steps: -.. _Microsoft Translator API: https://docs.microsoft.com/en-us/azure/cognitive-services/translator/translator-how-to-signup +1. `Create a Translator resource`_ on Azure. +2. `Obtain its API key`_. +3. Add the API key to your configuration as ``translate.api_key``. +4. Configure your target language using the ``translate.to_language`` option. + + +For example, with the following configuration + +.. code-block:: yaml + + lyrics: + translate: + api_key: YOUR_TRANSLATOR_API_KEY + to_language: de + +You should expect lyrics like this:: + + Original verse / Ursprünglicher Vers + Some other verse / Ein anderer Vers + +.. _create a Translator resource: https://learn.microsoft.com/en-us/azure/ai-services/translator/create-translator-resource +.. _obtain its API key: https://learn.microsoft.com/en-us/python/api/overview/azure/ai-translation-text-readme?view=azure-python&preserve-view=true#get-an-api-key diff --git a/docs/plugins/smartplaylist.rst b/docs/plugins/smartplaylist.rst index af7a09f03..cb697f762 100644 --- a/docs/plugins/smartplaylist.rst +++ b/docs/plugins/smartplaylist.rst @@ -97,27 +97,25 @@ In case you want to export additional fields from the beets database into the generated playlists, you can do so by specifying them within the ``fields`` configuration option and setting the ``output`` option to ``extm3u``. For instance the following configuration exports the ``id`` and ``genre`` -fields: +fields:: smartplaylist: playlist_dir: /data/playlists relative_to: /data/playlists output: extm3u fields: - - id - genre - playlists: - - name: all.m3u query: '' -A resulting ``all.m3u`` file could look as follows: +Values of additional fields are URL-encoded. +A resulting ``all.m3u`` file could look as follows:: #EXTM3U - #EXTINF:805 id="1931" genre="Jazz",Miles Davis - Autumn Leaves - ../music/Albums/Miles Davis/Autumn Leaves/02 Autumn Leaves.mp3 + #EXTINF:805 id="1931" genre="Progressive%20Rock",Led Zeppelin - Stairway to Heaven + ../music/singles/Led Zeppelin/Stairway to Heaven.mp3 To give a usage example, the `webm3u`_ and `Beetstream`_ plugins read the exported ``id`` field, allowing you to serve your local m3u playlists via HTTP. diff --git a/setup.cfg b/setup.cfg index ba580f2c9..3e2fee46a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -15,7 +15,9 @@ markers = data_file = .reports/coverage/data branch = true relative_files = true -omit = beets/test/* +omit = + beets/test/* + beetsplug/_typing.py [coverage:report] precision = 2 diff --git a/test/plugins/lyrics_pages.py b/test/plugins/lyrics_pages.py index 84c9e2441..2d681e111 100644 --- a/test/plugins/lyrics_pages.py +++ b/test/plugins/lyrics_pages.py @@ -456,16 +456,6 @@ lyrics_pages = [ LyricsPage.make( "https://www.musica.com/letras.asp?letra=59862", """ - Lady Madonna, children at your feet - Wonder how you manage to make ends meet - Who finds the money when you pay the rent? - Did you think that money was heaven sent? - - Friday night arrives without a suitcase - Sunday morning creeping like a nun - Monday's child has learned to tie his bootlace - See how they run - Lady Madonna, baby at your breast Wonders how you manage to feed the rest diff --git a/test/plugins/test_embedart.py b/test/plugins/test_embedart.py index 14bfdf522..2d2d68153 100644 --- a/test/plugins/test_embedart.py +++ b/test/plugins/test_embedart.py @@ -285,8 +285,8 @@ class ArtSimilarityTest(unittest.TestCase): mock_extract, mock_subprocess, compare_status=0, - compare_stdout="", - compare_stderr="", + compare_stdout=b"", + compare_stderr=b"", convert_status=0, ): mock_extract.return_value = b"extracted_path" @@ -298,33 +298,33 @@ class ArtSimilarityTest(unittest.TestCase): ] def test_compare_success_similar(self, mock_extract, mock_subprocess): - self._mock_popens(mock_extract, mock_subprocess, 0, "10", "err") + self._mock_popens(mock_extract, mock_subprocess, 0, b"10", b"err") assert self._similarity(20) def test_compare_success_different(self, mock_extract, mock_subprocess): - self._mock_popens(mock_extract, mock_subprocess, 0, "10", "err") + self._mock_popens(mock_extract, mock_subprocess, 0, b"10", b"err") assert not self._similarity(5) def test_compare_status1_similar(self, mock_extract, mock_subprocess): - self._mock_popens(mock_extract, mock_subprocess, 1, "out", "10") + self._mock_popens(mock_extract, mock_subprocess, 1, b"out", b"10") assert self._similarity(20) def test_compare_status1_different(self, mock_extract, mock_subprocess): - self._mock_popens(mock_extract, mock_subprocess, 1, "out", "10") + self._mock_popens(mock_extract, mock_subprocess, 1, b"out", b"10") assert not self._similarity(5) def test_compare_failed(self, mock_extract, mock_subprocess): - self._mock_popens(mock_extract, mock_subprocess, 2, "out", "10") + self._mock_popens(mock_extract, mock_subprocess, 2, b"out", b"10") assert self._similarity(20) is None def test_compare_parsing_error(self, mock_extract, mock_subprocess): - self._mock_popens(mock_extract, mock_subprocess, 0, "foo", "bar") + self._mock_popens(mock_extract, mock_subprocess, 0, b"foo", b"bar") assert self._similarity(20) is None def test_compare_parsing_error_and_failure( self, mock_extract, mock_subprocess ): - self._mock_popens(mock_extract, mock_subprocess, 1, "foo", "bar") + self._mock_popens(mock_extract, mock_subprocess, 1, b"foo", b"bar") assert self._similarity(20) is None def test_convert_failure(self, mock_extract, mock_subprocess): diff --git a/test/plugins/test_importadded.py b/test/plugins/test_importadded.py index d48ec6c46..608afb399 100644 --- a/test/plugins/test_importadded.py +++ b/test/plugins/test_importadded.py @@ -57,7 +57,7 @@ class ImportAddedTest(PluginMixin, ImportTestCase): os.path.getmtime(mfile.path) for mfile in self.import_media ) self.matcher = AutotagStub().install() - self.matcher.macthin = AutotagStub.GOOD + self.matcher.matching = AutotagStub.IDENT self.importer = self.setup_importer() self.importer.add_choice(importer.action.APPLY) diff --git a/test/plugins/test_lyrics.py b/test/plugins/test_lyrics.py index c6d48c3bd..74e727099 100644 --- a/test/plugins/test_lyrics.py +++ b/test/plugins/test_lyrics.py @@ -14,18 +14,27 @@ """Tests for the 'lyrics' plugin.""" +import importlib.util +import os import re +import textwrap from functools import partial from http import HTTPStatus +from pathlib import Path import pytest from beets.library import Item -from beets.test.helper import PluginMixin +from beets.test.helper import PluginMixin, TestHelper from beetsplug import lyrics from .lyrics_pages import LyricsPage, lyrics_pages +github_ci = os.environ.get("GITHUB_ACTIONS") == "true" +if not github_ci and not importlib.util.find_spec("langdetect"): + pytest.skip("langdetect isn't available", allow_module_level=True) + + PHRASE_BY_TITLE = { "Lady Madonna": "friday night arrives without a suitcase", "Jazz'n'blues": "as i check my balance i kiss the screen", @@ -33,6 +42,14 @@ PHRASE_BY_TITLE = { } +@pytest.fixture(scope="module") +def helper(): + helper = TestHelper() + helper.setup_beets() + yield helper + helper.teardown_beets() + + class TestLyricsUtils: @pytest.mark.parametrize( "artist, title", @@ -231,6 +248,27 @@ class TestLyricsPlugin(LyricsPluginMixin): assert last_log assert re.search(expected_log_match, last_log, re.I) + @pytest.mark.parametrize( + "plugin_config, found, expected", + [ + ({}, "new", "old"), + ({"force": True}, "new", "new"), + ({"force": True, "local": True}, "new", "old"), + ({"force": True, "fallback": None}, "", "old"), + ({"force": True, "fallback": ""}, "", ""), + ({"force": True, "fallback": "default"}, "", "default"), + ], + ) + def test_overwrite_config( + self, monkeypatch, helper, lyrics_plugin, found, expected + ): + monkeypatch.setattr(lyrics_plugin, "find_lyrics", lambda _: found) + item = helper.create_item(id=1, lyrics="old") + + lyrics_plugin.add_item_lyrics(item, False) + + assert item.lyrics == expected + class LyricsBackendTest(LyricsPluginMixin): @pytest.fixture @@ -280,8 +318,13 @@ class TestLyricsSources(LyricsBackendTest): def test_backend_source(self, lyrics_plugin, lyrics_page: LyricsPage): """Test parsed lyrics from each of the configured lyrics pages.""" - lyrics_info = lyrics_plugin.get_lyrics( - lyrics_page.artist, lyrics_page.track_title, "", 186 + lyrics_info = lyrics_plugin.find_lyrics( + Item( + artist=lyrics_page.artist, + title=lyrics_page.track_title, + album="", + length=186.0, + ) ) assert lyrics_info @@ -502,3 +545,144 @@ class TestLRCLibLyrics(LyricsBackendTest): lyrics, _ = fetch_lyrics() assert lyrics == expected_lyrics + + +class TestTranslation: + @pytest.fixture(autouse=True) + def _patch_bing(self, requests_mock): + def callback(request, _): + if b"Refrain" in request.body: + translations = ( + "" + " | [Refrain : Doja Cat]" + " | Difficile pour moi de te laisser partir (Te laisser partir, te laisser partir)" # noqa: E501 + " | Mon corps ne me laissait pas le cacher (Cachez-le)" + " | Quoi qu’il arrive, je ne plierais pas (Ne plierait pas, ne plierais pas)" # noqa: E501 + " | Chevauchant à travers le tonnerre, la foudre" + ) + elif b"00:00.00" in request.body: + translations = ( + "" + " | [00:00.00] Quelques paroles synchronisées" + " | [00:01.00] Quelques paroles plus synchronisées" + ) + else: + translations = ( + "" + " | Quelques paroles synchronisées" + " | Quelques paroles plus synchronisées" + ) + + return [ + { + "detectedLanguage": {"language": "en", "score": 1.0}, + "translations": [{"text": translations, "to": "fr"}], + } + ] + + requests_mock.post(lyrics.Translator.TRANSLATE_URL, json=callback) + + @pytest.mark.parametrize( + "new_lyrics, old_lyrics, expected", + [ + pytest.param( + """ + [Refrain: Doja Cat] + Hard for me to let you go (Let you go, let you go) + My body wouldn't let me hide it (Hide it) + No matter what, I wouldn't fold (Wouldn't fold, wouldn't fold) + Ridin' through the thunder, lightnin'""", + "", + """ + [Refrain: Doja Cat] / [Refrain : Doja Cat] + Hard for me to let you go (Let you go, let you go) / Difficile pour moi de te laisser partir (Te laisser partir, te laisser partir) + My body wouldn't let me hide it (Hide it) / Mon corps ne me laissait pas le cacher (Cachez-le) + No matter what, I wouldn't fold (Wouldn't fold, wouldn't fold) / Quoi qu’il arrive, je ne plierais pas (Ne plierait pas, ne plierais pas) + Ridin' through the thunder, lightnin' / Chevauchant à travers le tonnerre, la foudre""", # noqa: E501 + id="plain", + ), + pytest.param( + """ + [00:00.00] Some synced lyrics + [00:00:50] + [00:01.00] Some more synced lyrics + + Source: https://lrclib.net/api/123""", + "", + """ + [00:00.00] Some synced lyrics / Quelques paroles synchronisées + [00:00:50] + [00:01.00] Some more synced lyrics / Quelques paroles plus synchronisées + + Source: https://lrclib.net/api/123""", # noqa: E501 + id="synced", + ), + pytest.param( + "Quelques paroles", + "", + "Quelques paroles", + id="already in the target language", + ), + pytest.param( + "Some lyrics", + "Some lyrics / Some translation", + "Some lyrics / Some translation", + id="already translated", + ), + ], + ) + def test_translate(self, new_lyrics, old_lyrics, expected): + plugin = lyrics.LyricsPlugin() + bing = lyrics.Translator(plugin._log, "123", "FR", ["EN"]) + + assert bing.translate( + textwrap.dedent(new_lyrics), old_lyrics + ) == textwrap.dedent(expected) + + +class TestRestFiles: + @pytest.fixture + def rest_dir(self, tmp_path): + return tmp_path + + @pytest.fixture + def rest_files(self, rest_dir): + return lyrics.RestFiles(rest_dir) + + def test_write(self, rest_dir: Path, rest_files): + items = [ + Item(albumartist=aa, album=a, title=t, lyrics=lyr) + for aa, a, t, lyr in [ + ("Artist One", "Album One", "Song One", "Lyrics One"), + ("Artist One", "Album One", "Song Two", "Lyrics Two"), + ("Artist Two", "Album Two", "Song Three", "Lyrics Three"), + ] + ] + + rest_files.write(items) + + assert (rest_dir / "index.rst").exists() + assert (rest_dir / "conf.py").exists() + + artist_one_file = rest_dir / "artists" / "artist-one.rst" + artist_two_file = rest_dir / "artists" / "artist-two.rst" + assert artist_one_file.exists() + assert artist_two_file.exists() + + c = artist_one_file.read_text() + assert ( + c.index("Artist One") + < c.index("Album One") + < c.index("Song One") + < c.index("Lyrics One") + < c.index("Song Two") + < c.index("Lyrics Two") + ) + + c = artist_two_file.read_text() + assert ( + c.index("Artist Two") + < c.index("Album Two") + < c.index("Song Three") + < c.index("Lyrics Three") + ) diff --git a/test/plugins/test_smartplaylist.py b/test/plugins/test_smartplaylist.py index a50f3e622..ade745c17 100644 --- a/test/plugins/test_smartplaylist.py +++ b/test/plugins/test_smartplaylist.py @@ -283,7 +283,7 @@ class SmartPlaylistTest(BeetsTestCase): assert ( content == b"#EXTM3U\n" - + b'#EXTINF:300 id="456" genre="Fake Genre",Fake Artist - fake Title\n' + + b'#EXTINF:300 id="456" genre="Fake%20Genre",Fake Artist - fake Title\n' + b"/tagada.mp3\n" ) diff --git a/test/test_importer.py b/test/test_importer.py index ad6b837f5..a28b646cf 100644 --- a/test/test_importer.py +++ b/test/test_importer.py @@ -440,7 +440,7 @@ class ImportTest(ImportTestCase): self.prepare_album_for_import(1) self.setup_importer() self.matcher = AutotagStub().install() - self.matcher.macthin = AutotagStub.GOOD + self.matcher.matching = AutotagStub.IDENT def tearDown(self): super().tearDown()