From 04aa1b8ddbfbc980039a4d820f7e7d3a2011dbf9 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 1 Feb 2025 15:52:11 +0100 Subject: [PATCH] Added last missing typehints in importer and fixed some typehint issues in util. Also run poe format --- beets/importer.py | 191 +++++++++++++++++++++++++----------- beets/util/__init__.py | 13 +-- beets/util/pipeline.py | 5 +- test/plugins/test_lyrics.py | 12 ++- 4 files changed, 153 insertions(+), 68 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 3586445fd..a81c97df0 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -2,19 +2,20 @@ autotagging music files. """ +from __future__ import annotations + import itertools import os import pickle import re import shutil import time -from abc import ABC, abstractmethod from bisect import bisect_left, insort from collections import defaultdict from dataclasses import dataclass from enum import Enum from tempfile import mkdtemp -from typing import Dict, Iterable, Optional, Sequence, Union +from typing import Callable, Dict, Iterable, Optional, Sequence, Union, cast import mediafile @@ -98,7 +99,7 @@ class ImportState: path: PathLike def __init__(self, readonly=False, path: Union[PathLike, None] = None): - self.path = path or config["statefile"].as_filename() + self.path = path or cast(PathLike, config["statefile"].as_filename()) self.tagprogress = {} self.taghistory = set() self._open() @@ -140,7 +141,7 @@ class ImportState: # -------------------------------- Tagprogress ------------------------------- # - def progress_add(self, toppath: PathLike, *paths: list[PathLike]): + def progress_add(self, toppath: PathLike, *paths: PathLike): """Record that the files under all of the `paths` have been imported under `toppath`. """ @@ -253,7 +254,9 @@ class ImportSession: iconfig["incremental"] = False if iconfig["reflink"]: - iconfig["reflink"] = iconfig["reflink"].as_choice(["auto", True, False]) + iconfig["reflink"] = iconfig["reflink"].as_choice( + ["auto", True, False] + ) # Copy, move, reflink, link, and hardlink are mutually exclusive. if iconfig["move"]: @@ -311,16 +314,24 @@ class ImportSession: self.tag_log("skip", paths) def should_resume(self, path): - raise NotImplementedError("Inheriting class must implement `should_resume`") + raise NotImplementedError( + "Inheriting class must implement `should_resume`" + ) def choose_match(self, task): - raise NotImplementedError("Inheriting class must implement `choose_match`") + raise NotImplementedError( + "Inheriting class must implement `choose_match`" + ) def resolve_duplicate(self, task, found_duplicates): - raise NotImplementedError("Inheriting class must implement `resolve_duplicate`") + raise NotImplementedError( + "Inheriting class must implement `resolve_duplicate`" + ) def choose_item(self, task): - raise NotImplementedError("Inheriting class must implement `choose_item`") + raise NotImplementedError( + "Inheriting class must implement `choose_item`" + ) def run(self): """Run the import task.""" @@ -448,7 +459,16 @@ class BaseImportTask: Tasks flow through the importer pipeline. Each stage can update them.""" - def __init__(self, toppath, paths, items): + toppath: Optional[PathLike] + paths: list[PathLike] + items: list[library.Item] + + def __init__( + self, + toppath: Optional[PathLike], + paths: Optional[Iterable[PathLike]], + items: Optional[Iterable[library.Item]], + ): """Create a task. The primary fields that define a task are: * `toppath`: The user-specified base directory that contains the @@ -466,8 +486,8 @@ class BaseImportTask: These fields should not change after initialization. """ self.toppath = toppath - self.paths = paths - self.items = items + self.paths = list(paths) if paths is not None else [] + self.items = list(items) if items is not None else [] class ImportTask(BaseImportTask): @@ -514,9 +534,14 @@ class ImportTask(BaseImportTask): self.is_album = True self.search_ids = [] # user-supplied candidate IDs. - def set_choice(self, choice): + def set_choice( + self, choice: Union[action, autotag.AlbumMatch, autotag.TrackMatch] + ): """Given an AlbumMatch or TrackMatch object or an action constant, indicates that an action has been selected for this task. + + Album and trackmatch are implemented as tuples, so we can't + use isinstance to check for them. """ # Not part of the task structure: assert choice != action.APPLY # Only used internally. @@ -566,7 +591,7 @@ class ImportTask(BaseImportTask): if self.choice_flag in (action.ASIS, action.RETAG): likelies, consensus = autotag.current_metadata(self.items) return likelies - elif self.choice_flag is action.APPLY: + elif self.choice_flag is action.APPLY and self.match: return self.match.info.copy() assert False @@ -578,13 +603,19 @@ class ImportTask(BaseImportTask): """ if self.choice_flag in (action.ASIS, action.RETAG): return list(self.items) - elif self.choice_flag == action.APPLY: + elif self.choice_flag == action.APPLY and isinstance( + self.match, autotag.AlbumMatch + ): return list(self.match.mapping.keys()) else: assert False def apply_metadata(self): """Copy metadata from match info to the items.""" + assert isinstance( + self.match, autotag.AlbumMatch + ), "apply_metadata() only works for albums" + if config["import"]["from_scratch"]: for item in self.match.mapping: item.clear() @@ -603,7 +634,9 @@ class ImportTask(BaseImportTask): for item in duplicate_items: item.remove() if lib.directory in util.ancestry(item.path): - log.debug("deleting duplicate {0}", util.displayable_path(item.path)) + log.debug( + "deleting duplicate {0}", util.displayable_path(item.path) + ) util.remove(item.path) util.prune_dirs(os.path.dirname(item.path), lib.directory) @@ -635,8 +668,7 @@ class ImportTask(BaseImportTask): self.save_progress() if session.config["incremental"] and not ( # Should we skip recording to incremental list? - self.skip - and session.config["incremental_skip_later"] + self.skip and session.config["incremental_skip_later"] ): self.save_history() @@ -663,7 +695,7 @@ class ImportTask(BaseImportTask): for old_path in self.old_paths: # Only delete files that were actually copied. if old_path not in new_paths: - util.remove(syspath(old_path), False) + util.remove(old_path, False) self.prune(old_path) # When moving, prune empty directories containing the original files. @@ -671,7 +703,7 @@ class ImportTask(BaseImportTask): for old_path in self.old_paths: self.prune(old_path) - def _emit_imported(self, lib): + def _emit_imported(self, lib: library.Library): plugins.send("album_imported", lib=lib, album=self.album) def handle_created(self, session): @@ -693,7 +725,9 @@ class ImportTask(BaseImportTask): candidate IDs are stored in self.search_ids: if present, the initial lookup is restricted to only those IDs. """ - artist, album, prop = autotag.tag_album(self.items, search_ids=self.search_ids) + artist, album, prop = autotag.tag_album( + self.items, search_ids=self.search_ids + ) self.cur_artist = artist self.cur_album = album self.candidates = prop.candidates @@ -713,7 +747,9 @@ class ImportTask(BaseImportTask): # Construct a query to find duplicates with this metadata. We # use a temporary Album object to generate any computed fields. tmp_album = library.Album(lib, **info) - keys = config["import"]["duplicate_keys"]["album"].as_str_seq() + keys = cast( + list[str], config["import"]["duplicate_keys"]["album"].as_str_seq() + ) dup_query = tmp_album.duplicates_query(keys) # Don't count albums with the same files as duplicates. @@ -744,7 +780,8 @@ class ImportTask(BaseImportTask): [i.albumartist or i.artist for i in self.items] ) if freq == len(self.items) or ( - freq > 1 and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH + freq > 1 + and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH ): # Single-artist album. changes["albumartist"] = plur_albumartist @@ -770,7 +807,12 @@ class ImportTask(BaseImportTask): for item in self.items: item.update(changes) - def manipulate_files(self, operation=None, write=False, session=None): + def manipulate_files( + self, + operation: Optional[MoveOperation] = None, + write=False, + session: Optional[ImportSession] = None, + ): """Copy, move, link, hardlink or reflink (depending on `operation`) the files as well as write metadata. @@ -778,11 +820,12 @@ class ImportTask(BaseImportTask): If `write` is `True` metadata is written to the files. """ + assert session is not None, "session must be provided" items = self.imported_items() # Save the original paths of all items for deletion and pruning # in the next step (finalization). - self.old_paths = [item.path for item in items] + self.old_paths: list[PathLike] = [item.path for item in items] for item in items: if operation is not None: # In copy and link modes, treat re-imports specially: @@ -820,7 +863,9 @@ class ImportTask(BaseImportTask): self.remove_replaced(lib) self.album = lib.add_album(self.imported_items()) - if self.choice_flag == action.APPLY: + if self.choice_flag == action.APPLY and isinstance( + self.match, autotag.AlbumMatch + ): # Copy album flexible fields to the DB # TODO: change the flow so we create the `Album` object earlier, # and we can move this into `self.apply_metadata`, just like @@ -830,25 +875,30 @@ class ImportTask(BaseImportTask): self.reimport_metadata(lib) - def record_replaced(self, lib): + def record_replaced(self, lib: library.Library): """Records the replaced items and albums in the `replaced_items` and `replaced_albums` dictionaries. """ self.replaced_items = defaultdict(list) - self.replaced_albums = defaultdict(list) + self.replaced_albums: dict[PathLike, library.Album] = defaultdict() replaced_album_ids = set() for item in self.imported_items(): - dup_items = list(lib.items(dbcore.query.BytesQuery("path", item.path))) + dup_items = list( + lib.items(dbcore.query.BytesQuery("path", item.path)) + ) self.replaced_items[item] = dup_items for dup_item in dup_items: - if not dup_item.album_id or dup_item.album_id in replaced_album_ids: + if ( + not dup_item.album_id + or dup_item.album_id in replaced_album_ids + ): continue replaced_album = dup_item._cached_album if replaced_album: replaced_album_ids.add(dup_item.album_id) self.replaced_albums[replaced_album.path] = replaced_album - def reimport_metadata(self, lib): + def reimport_metadata(self, lib: library.Library): """For reimports, preserves metadata for reimported items and albums. """ @@ -894,7 +944,8 @@ class ImportTask(BaseImportTask): self.album.artpath = replaced_album.artpath self.album.store() log.debug( - "Reimported album {}. Preserving attribute ['added']. " "Path: {}", + "Reimported album {}. Preserving attribute ['added']. " + "Path: {}", self.album.id, displayable_path(self.album.path), ) @@ -973,14 +1024,14 @@ class ImportTask(BaseImportTask): util.prune_dirs( os.path.dirname(filename), self.toppath, - clutter=config["clutter"].as_str_seq(), + clutter=cast(list[str], config["clutter"].as_str_seq()), ) class SingletonImportTask(ImportTask): """ImportTask for a single track that is not associated to an album.""" - def __init__(self, toppath, item): + def __init__(self, toppath: Optional[PathLike], item: library.Item): super().__init__(toppath, [item.path], [item]) self.item = item self.is_album = False @@ -995,13 +1046,17 @@ class SingletonImportTask(ImportTask): assert self.choice_flag in (action.ASIS, action.RETAG, action.APPLY) if self.choice_flag in (action.ASIS, action.RETAG): return dict(self.item) - elif self.choice_flag is action.APPLY: + elif self.choice_flag is action.APPLY and self.match: return self.match.info.copy() + assert False def imported_items(self): return [self.item] def apply_metadata(self): + assert isinstance( + self.match, autotag.TrackMatch + ), "apply_metadata() only works for tracks" autotag.apply_item_metadata(self.item, self.match.info) def _emit_imported(self, lib): @@ -1022,7 +1077,9 @@ class SingletonImportTask(ImportTask): # Query for existing items using the same metadata. We use a # temporary `Item` object to generate any computed fields. tmp_item = library.Item(lib, **info) - keys = config["import"]["duplicate_keys"]["item"].as_str_seq() + keys = cast( + list[str], config["import"]["duplicate_keys"]["item"].as_str_seq() + ) dup_query = tmp_item.duplicates_query(keys) found_items = [] @@ -1092,23 +1149,24 @@ class SentinelImportTask(ImportTask): pass def save_progress(self): - if self.paths is None: + if self.paths is None and self.toppath: # "Done" sentinel. ImportState().progress_reset(self.toppath) - else: + elif self.toppath: # "Directory progress" sentinel for singletons ImportState().progress_add(self.toppath, *self.paths) - def skip(self): + @property + def skip(self) -> bool: return True def set_choice(self, choice): raise NotImplementedError - def cleanup(self, **kwargs): + def cleanup(self, copy=False, delete=False, move=False): pass - def _emit_imported(self, session): + def _emit_imported(self, lib): pass @@ -1152,7 +1210,7 @@ class ArchiveImportTask(SentinelImportTask): implements the same interface as `tarfile.TarFile`. """ if not hasattr(cls, "_handlers"): - cls._handlers = [] + cls._handlers: list[tuple[Callable, ...]] = [] from zipfile import ZipFile, is_zipfile cls._handlers.append((is_zipfile, ZipFile)) @@ -1174,9 +1232,9 @@ class ArchiveImportTask(SentinelImportTask): return cls._handlers - def cleanup(self, **kwargs): + def cleanup(self, copy=False, delete=False, move=False): """Removes the temporary directory the archive was extracted to.""" - if self.extracted: + if self.extracted and self.toppath: log.debug( "Removing extracted directory: {0}", displayable_path(self.toppath), @@ -1187,10 +1245,18 @@ class ArchiveImportTask(SentinelImportTask): """Extracts the archive to a temporary directory and sets `toppath` to that directory. """ + assert self.toppath is not None, "toppath must be set" + + handler_class = None for path_test, handler_class in self.handlers(): if path_test(os.fsdecode(self.toppath)): break + if handler_class is None: + raise ValueError( + "No handler found for archive: {0}".format(self.toppath) + ) + extract_to = mkdtemp() archive = handler_class(os.fsdecode(self.toppath), mode="r") try: @@ -1268,7 +1334,7 @@ class ImportTaskFactory: # for archive imports, send the archive task instead (to remove # the extracted directory). if self.is_archive: - yield archive_task + yield archive_task # type: ignore else: yield self.sentinel() @@ -1308,7 +1374,9 @@ class ImportTaskFactory: def singleton(self, path): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): - log.debug("Skipping previously-imported path: {0}", displayable_path(path)) + log.debug( + "Skipping previously-imported path: {0}", displayable_path(path) + ) self.skipped += 1 return None @@ -1331,7 +1399,9 @@ class ImportTaskFactory: dirs = list({os.path.dirname(p) for p in paths}) if self.session.already_imported(self.toppath, dirs): - log.debug("Skipping previously-imported path: {0}", displayable_path(dirs)) + log.debug( + "Skipping previously-imported path: {0}", displayable_path(dirs) + ) self.skipped += 1 return None @@ -1360,7 +1430,8 @@ class ImportTaskFactory: if not (self.session.config["move"] or self.session.config["copy"]): log.warning( - "Archive importing requires either " "'copy' or 'move' to be enabled." + "Archive importing requires either " + "'copy' or 'move' to be enabled." ) return @@ -1473,7 +1544,7 @@ def query_tasks(session): @pipeline.mutator_stage -def lookup_candidates(session, task): +def lookup_candidates(session: ImportSession, task: ImportTask): """A coroutine for performing the initial MusicBrainz lookup for an album. It accepts lists of Items and yields (items, cur_artist, cur_album, candidates, rec) tuples. If no match @@ -1495,7 +1566,7 @@ def lookup_candidates(session, task): @pipeline.stage -def user_query(session, task): +def user_query(session: ImportSession, task: ImportTask): """A coroutine for interfacing with the user about the tagging process. @@ -1528,7 +1599,9 @@ def user_query(session, task): yield SentinelImportTask(task.toppath, task.paths) return _extend_pipeline( - emitter(task), lookup_candidates(session), user_query(session) + emitter(task), + lookup_candidates(session), + user_query(session), # type: ignore # Recursion in decorators ) # As albums: group items by albums and create task for each album @@ -1537,7 +1610,7 @@ def user_query(session, task): [task], group_albums(session), lookup_candidates(session), - user_query(session), + user_query(session), # type: ignore # Recursion in decorators ) resolve_duplicates(session, task) @@ -1559,7 +1632,9 @@ def user_query(session, task): ) return _extend_pipeline( - [merged_task], lookup_candidates(session), user_query(session) + [merged_task], + lookup_candidates(session), + user_query(session), # type: ignore # Recursion in decorators ) apply_choice(session, task) @@ -1573,7 +1648,9 @@ def resolve_duplicates(session, task): if task.choice_flag in (action.ASIS, action.APPLY, action.RETAG): found_duplicates = task.find_duplicates(session.lib) if found_duplicates: - log.debug("found duplicates: {}".format([o.id for o in found_duplicates])) + log.debug( + "found duplicates: {}".format([o.id for o in found_duplicates]) + ) # Get the default action to follow from config. duplicate_action = config["import"]["duplicate_action"].as_choice( @@ -1697,7 +1774,7 @@ def manipulate_files(session, task): @pipeline.stage -def log_files(session, task): +def log_files(session: ImportSession, task: ImportTask): """A coroutine (pipeline stage) to log each file to be imported.""" if isinstance(task, SingletonImportTask): log.info("Singleton: {0}", displayable_path(task.item["path"])) @@ -1753,8 +1830,8 @@ def albums_in_dir(path): containing any media files is an album. """ collapse_pat = collapse_paths = collapse_items = None - ignore = config["ignore"].as_str_seq() - ignore_hidden = config["ignore_hidden"].get(bool) + ignore = cast(list[str], config["ignore"].as_str_seq()) + ignore_hidden = cast(bool, config["ignore_hidden"].get(bool)) for root, dirs, files in sorted_walk( path, ignore=ignore, ignore_hidden=ignore_hidden, logger=log diff --git a/beets/util/__init__.py b/beets/util/__init__.py index 32a63b216..e13b67604 100644 --- a/beets/util/__init__.py +++ b/beets/util/__init__.py @@ -198,8 +198,8 @@ def ancestry(path: AnyStr) -> list[AnyStr]: def sorted_walk( - path: AnyStr, - ignore: Sequence[bytes] = (), + path: PathLike, + ignore: Sequence[PathLike] = (), ignore_hidden: bool = False, logger: Logger | None = None, ) -> Iterator[tuple[bytes, Sequence[bytes], Sequence[bytes]]]: @@ -297,8 +297,8 @@ def fnmatch_all(names: Sequence[bytes], patterns: Sequence[bytes]) -> bool: def prune_dirs( - path: bytes, - root: bytes | None = None, + path: PathLike, + root: PathLike | None = None, clutter: Sequence[str] = (".DS_Store", "Thumbs.db"), ): """If path is an empty directory, then remove it. Recursively remove @@ -419,12 +419,13 @@ PATH_SEP: bytes = bytestring_path(os.sep) def displayable_path( - path: BytesOrStr | tuple[BytesOrStr, ...], separator: str = "; " + path: PathLike | Iterable[PathLike], separator: str = "; " ) -> str: """Attempts to decode a bytestring path to a unicode object for the purpose of displaying it to the user. If the `path` argument is a list or a tuple, the elements are joined with `separator`. """ + if isinstance(path, (list, tuple)): return separator.join(displayable_path(p) for p in path) elif isinstance(path, str): @@ -472,7 +473,7 @@ def samefile(p1: bytes, p2: bytes) -> bool: return False -def remove(path: bytes, soft: bool = True): +def remove(path: PathLike, soft: bool = True): """Remove the file. If `soft`, then no error will be raised if the file does not exist. """ diff --git a/beets/util/pipeline.py b/beets/util/pipeline.py index e79325fe2..5387186b4 100644 --- a/beets/util/pipeline.py +++ b/beets/util/pipeline.py @@ -31,7 +31,6 @@ To do so, pass an iterable of coroutines to the Pipeline constructor in place of any single coroutine. """ - import queue import sys from threading import Lock, Thread @@ -420,7 +419,9 @@ class Pipeline: for i in range(1, queue_count): for coro in self.stages[i]: threads.append( - MiddlePipelineThread(coro, queues[i - 1], queues[i], threads) + MiddlePipelineThread( + coro, queues[i - 1], queues[i], threads + ) ) # Last stage. diff --git a/test/plugins/test_lyrics.py b/test/plugins/test_lyrics.py index 1305a21d3..fd6112d7c 100644 --- a/test/plugins/test_lyrics.py +++ b/test/plugins/test_lyrics.py @@ -74,7 +74,9 @@ class TestLyricsUtils: ("横山克", "Masaru Yokoyama", ["Masaru Yokoyama"]), ], ) - def test_search_pairs_artists(self, artist, artist_sort, expected_extra_artists): + def test_search_pairs_artists( + self, artist, artist_sort, expected_extra_artists + ): item = Item(artist=artist, artist_sort=artist_sort, title="song") actual_artists = [a for a, _ in lyrics.search_pairs(item)] @@ -99,7 +101,9 @@ class TestLyricsUtils: def test_search_pairs_titles(self, title, expected_extra_titles): item = Item(title=title, artist="A") - actual_titles = {t: None for _, tit in lyrics.search_pairs(item) for t in tit} + actual_titles = { + t: None for _, tit in lyrics.search_pairs(item) for t in tit + } assert list(actual_titles) == [title, *expected_extra_titles] @@ -241,7 +245,9 @@ class LyricsBackendTest(LyricsPluginMixin): @pytest.fixture def lyrics_html(self, lyrics_root_dir, file_name): - return (lyrics_root_dir / f"{file_name}.txt").read_text(encoding="utf-8") + return (lyrics_root_dir / f"{file_name}.txt").read_text( + encoding="utf-8" + ) @pytest.mark.on_lyrics_update