From 10c0aa3f6ab442bf8144bec2dec85ca908617749 Mon Sep 17 00:00:00 2001 From: Sebastian Mohr Date: Sat, 8 Feb 2025 22:03:03 +0100 Subject: [PATCH] Replaced pathlike with pathbytes and remove unnecessary type ignores --- beets/importer.py | 118 ++++++++++++++++++++++++---------------------- 1 file changed, 61 insertions(+), 57 deletions(-) diff --git a/beets/importer.py b/beets/importer.py index 8f64e25df..202e8f1ef 100644 --- a/beets/importer.py +++ b/beets/importer.py @@ -15,14 +15,13 @@ from collections import defaultdict from dataclasses import dataclass from enum import Enum from tempfile import mkdtemp -from typing import Callable, Dict, Iterable, Optional, Sequence, Union, cast +from typing import Callable, Iterable, Optional, Sequence, cast import mediafile from beets import autotag, config, dbcore, library, logging, plugins, util from beets.util import ( MoveOperation, - PathLike, ancestry, displayable_path, normpath, @@ -61,6 +60,10 @@ REIMPORT_FRESH_FIELDS_ITEM = list(REIMPORT_FRESH_FIELDS_ALBUM) # Global logger. log = logging.getLogger("beets") +# Here for now to allow for a easy replace later on +# once we can move to a PathLike +PathBytes = bytes + class ImportAbortError(Exception): """Raised when the user aborts the tagging operation.""" @@ -94,12 +97,12 @@ class ImportState: ``` """ - tagprogress: dict - taghistory: set - path: PathLike + tagprogress: dict[PathBytes, list[PathBytes]] + taghistory: set[tuple[PathBytes, ...]] + path: PathBytes - def __init__(self, readonly=False, path: Union[PathLike, None] = None): - self.path = path or cast(PathLike, config["statefile"].as_filename()) + def __init__(self, readonly=False, path: Optional[PathBytes] = None): + self.path = path or cast(PathBytes, config["statefile"].as_filename()) self.tagprogress = {} self.taghistory = set() self._open() @@ -141,7 +144,7 @@ class ImportState: # -------------------------------- Tagprogress ------------------------------- # - def progress_add(self, toppath: PathLike, *paths: PathLike): + def progress_add(self, toppath: PathBytes, *paths: PathBytes): """Record that the files under all of the `paths` have been imported under `toppath`. """ @@ -153,19 +156,19 @@ class ImportState: else: insort(imported, path) - def progress_has_element(self, toppath: PathLike, path: PathLike) -> bool: + def progress_has_element(self, toppath: PathBytes, path: PathBytes) -> bool: """Return whether `path` has been imported in `toppath`.""" imported = self.tagprogress.get(toppath, []) i = bisect_left(imported, path) return i != len(imported) and imported[i] == path - def progress_has(self, toppath: PathLike) -> bool: + def progress_has(self, toppath: PathBytes) -> bool: """Return `True` if there exist paths that have already been imported under `toppath`. """ return toppath in self.tagprogress - def progress_reset(self, toppath: PathLike): + def progress_reset(self, toppath: PathBytes): """Reset the progress for `toppath`.""" with self as state: if toppath in state.tagprogress: @@ -173,7 +176,7 @@ class ImportState: # -------------------------------- Taghistory -------------------------------- # - def history_add(self, paths: list[PathLike]): + def history_add(self, paths: list[PathBytes]): """Add the paths to the history.""" with self as state: state.taghistory.add(tuple(paths)) @@ -185,18 +188,18 @@ class ImportSession: """ logger: logging.Logger - paths: Union[list[bytes], None] = None + paths: Optional[list[bytes]] = None lib: library.Library - _is_resuming: Dict[bytes, bool] - _merged_items: set - _merged_dirs: set + _is_resuming: dict[bytes, bool] + _merged_items: set[PathBytes] + _merged_dirs: set[PathBytes] def __init__( self, lib: library.Library, loghandler: Optional[logging.Handler], - paths: Optional[Sequence[PathLike]], + paths: Optional[Sequence[PathBytes]], query: Optional[dbcore.Query], ): """Create a session. @@ -284,7 +287,7 @@ class ImportSession: self.want_resume = config["resume"].as_choice([True, False, "ask"]) - def tag_log(self, status, paths: Sequence[PathLike]): + def tag_log(self, status, paths: Sequence[PathBytes]): """Log a message about a given album to the importer log. The status should reflect the reason the album couldn't be tagged. """ @@ -311,7 +314,7 @@ class ImportSession: elif task.choice_flag is action.SKIP: self.tag_log("skip", paths) - def should_resume(self, path: PathLike): + def should_resume(self, path: PathBytes): raise NotImplementedError( "Inheriting class must implement `should_resume`" ) @@ -383,7 +386,7 @@ class ImportSession: # Incremental and resumed imports - def already_imported(self, toppath: PathLike, paths: Sequence[PathLike]): + def already_imported(self, toppath: PathBytes, paths: Sequence[PathBytes]): """Returns true if the files belonging to this task have already been imported in a previous session. """ @@ -404,7 +407,7 @@ class ImportSession: self._history_dirs = ImportState().taghistory return self._history_dirs - def already_merged(self, paths: Sequence[PathLike]): + def already_merged(self, paths: Sequence[PathBytes]): """Returns true if all the paths being imported were part of a merge during previous tasks. """ @@ -413,7 +416,7 @@ class ImportSession: return False return True - def mark_merged(self, paths: Sequence[PathLike]): + def mark_merged(self, paths: Sequence[PathBytes]): """Mark paths and directories as merged for future reimport tasks.""" self._merged_items.update(paths) dirs = { @@ -422,14 +425,14 @@ class ImportSession: } self._merged_dirs.update(dirs) - def is_resuming(self, toppath: PathLike): + def is_resuming(self, toppath: PathBytes): """Return `True` if user wants to resume import of this path. You have to call `ask_resume` first to determine the return value. """ return self._is_resuming.get(normpath(toppath), False) - def ask_resume(self, toppath: PathLike): + def ask_resume(self, toppath: PathBytes): """If import of `toppath` was aborted in an earlier session, ask user if they want to resume the import. @@ -457,14 +460,14 @@ class BaseImportTask: Tasks flow through the importer pipeline. Each stage can update them.""" - toppath: Optional[PathLike] - paths: list[PathLike] + toppath: Optional[PathBytes] + paths: list[PathBytes] items: list[library.Item] def __init__( self, - toppath: Optional[PathLike], - paths: Optional[Iterable[PathLike]], + toppath: Optional[PathBytes], + paths: Optional[Iterable[PathBytes]], items: Optional[Iterable[library.Item]], ): """Create a task. The primary fields that define a task are: @@ -521,7 +524,7 @@ class ImportTask(BaseImportTask): """ choice_flag: Optional[action] = None - match: Union[autotag.AlbumMatch, autotag.TrackMatch, None] = None + match: autotag.AlbumMatch | autotag.TrackMatch | None = None # Keep track of the current task item cur_album: Optional[str] = None @@ -537,7 +540,7 @@ class ImportTask(BaseImportTask): self.search_ids = [] # user-supplied candidate IDs. def set_choice( - self, choice: Union[action, autotag.AlbumMatch, autotag.TrackMatch] + self, choice: action | autotag.AlbumMatch | autotag.TrackMatch ): """Given an AlbumMatch or TrackMatch object or an action constant, indicates that an action has been selected for this task. @@ -547,6 +550,7 @@ class ImportTask(BaseImportTask): """ # Not part of the task structure: assert choice != action.APPLY # Only used internally. + if choice in ( action.SKIP, action.ASIS, @@ -554,11 +558,11 @@ class ImportTask(BaseImportTask): action.ALBUMS, action.RETAG, ): - self.choice_flag = choice + self.choice_flag = cast(action, choice) self.match = None else: self.choice_flag = action.APPLY # Implicit choice. - self.match = choice + self.match = cast(autotag.AlbumMatch | autotag.TrackMatch, choice) def save_progress(self): """Updates the progress state to indicate that this album has @@ -827,7 +831,7 @@ class ImportTask(BaseImportTask): items = self.imported_items() # Save the original paths of all items for deletion and pruning # in the next step (finalization). - self.old_paths: list[PathLike] = [item.path for item in items] + self.old_paths: list[PathBytes] = [item.path for item in items] for item in items: if operation is not None: # In copy and link modes, treat re-imports specially: @@ -882,7 +886,7 @@ class ImportTask(BaseImportTask): and `replaced_albums` dictionaries. """ self.replaced_items = defaultdict(list) - self.replaced_albums: dict[PathLike, library.Album] = defaultdict() + self.replaced_albums: dict[PathBytes, library.Album] = defaultdict() replaced_album_ids = set() for item in self.imported_items(): dup_items = list( @@ -1033,7 +1037,7 @@ class ImportTask(BaseImportTask): class SingletonImportTask(ImportTask): """ImportTask for a single track that is not associated to an album.""" - def __init__(self, toppath: Optional[PathLike], item: library.Item): + def __init__(self, toppath: Optional[PathBytes], item: library.Item): super().__init__(toppath, [item.path], [item]) self.item = item self.is_album = False @@ -1287,7 +1291,7 @@ class ImportTaskFactory: indicated by a path. """ - def __init__(self, toppath: PathLike, session: ImportSession): + def __init__(self, toppath: PathBytes, session: ImportSession): """Create a new task factory. `toppath` is the user-specified path to search for music to @@ -1314,6 +1318,7 @@ class ImportTaskFactory: extracted data. """ # Check whether this is an archive. + archive_task: ArchiveImportTask | None = None if self.is_archive: archive_task = self.unarchive() if not archive_task: @@ -1335,8 +1340,8 @@ class ImportTaskFactory: # it is finished. This is usually just a SentinelImportTask, but # for archive imports, send the archive task instead (to remove # the extracted directory). - if self.is_archive: - yield archive_task # type: ignore + if archive_task: + yield archive_task else: yield self.sentinel() @@ -1373,7 +1378,7 @@ class ImportTaskFactory: for dirs, paths in albums_in_dir(self.toppath): yield dirs, paths - def singleton(self, path: PathLike): + def singleton(self, path: PathBytes): """Return a `SingletonImportTask` for the music file.""" if self.session.already_imported(self.toppath, [path]): log.debug( @@ -1388,7 +1393,7 @@ class ImportTaskFactory: else: return None - def album(self, paths: Iterable[PathLike], dirs=None): + def album(self, paths: Iterable[PathBytes], dirs=None): """Return a `ImportTask` with all media files from paths. `dirs` is a list of parent directories used to record already @@ -1407,15 +1412,16 @@ class ImportTaskFactory: self.skipped += 1 return None - items = map(self.read_item, paths) - items = [item for item in items if item] + items: list[library.Item] = [ + item for item in map(self.read_item, paths) if item + ] if items: return ImportTask(self.toppath, dirs, items) else: return None - def sentinel(self, paths: Optional[Iterable[PathLike]] = None): + def sentinel(self, paths: Optional[Iterable[PathBytes]] = None): """Return a `SentinelImportTask` indicating the end of a top-level directory import. """ @@ -1450,7 +1456,7 @@ class ImportTaskFactory: log.debug("Archive extracted to: {0}", self.toppath) return archive_task - def read_item(self, path: PathLike): + def read_item(self, path: PathBytes): """Return an `Item` read from the path. If an item cannot be read, return `None` instead and log an @@ -1528,9 +1534,9 @@ def query_tasks(session: ImportSession): if session.config["singletons"]: # Search for items. for item in session.lib.items(session.query): - task = SingletonImportTask(None, item) - for task in task.handle_created(session): - yield task + singleton_task = SingletonImportTask(None, item) + for task in singleton_task.handle_created(session): + yield singleton_task else: # Search for albums. @@ -1607,7 +1613,7 @@ def user_query(session: ImportSession, task: ImportTask): return _extend_pipeline( emitter(task), lookup_candidates(session), - user_query(session), # type: ignore # Recursion in decorators + user_query(session), ) # As albums: group items by albums and create task for each album @@ -1616,7 +1622,7 @@ def user_query(session: ImportSession, task: ImportTask): [task], group_albums(session), lookup_candidates(session), - user_query(session), # type: ignore # Recursion in decorators + user_query(session), ) resolve_duplicates(session, task) @@ -1638,9 +1644,7 @@ def user_query(session: ImportSession, task: ImportTask): ) return _extend_pipeline( - [merged_task], - lookup_candidates(session), - user_query(session), # type: ignore # Recursion in decorators + [merged_task], lookup_candidates(session), user_query(session) ) apply_choice(session, task) @@ -1730,7 +1734,7 @@ def apply_choice(session: ImportSession, task: ImportTask): @pipeline.mutator_stage def plugin_stage( session: ImportSession, - func: Callable[[ImportSession, ImportTask]], + func: Callable[[ImportSession, ImportTask], None], task: ImportTask, ): """A coroutine (pipeline stage) that calls the given function with @@ -1811,10 +1815,10 @@ def group_albums(session: ImportSession): if task.skip: continue tasks = [] - sorted_items = sorted(task.items, key=group) + sorted_items: list[library.Item] = sorted(task.items, key=group) for _, items in itertools.groupby(sorted_items, group): - items = list(items) - task = ImportTask(task.toppath, [i.path for i in items], items) + l_items = list(items) + task = ImportTask(task.toppath, [i.path for i in l_items], l_items) tasks += task.handle_created(session) tasks.append(SentinelImportTask(task.toppath, task.paths)) @@ -1833,7 +1837,7 @@ def is_subdir_of_any_in_list(path, dirs): return any(d in ancestors for d in dirs) -def albums_in_dir(path: PathLike): +def albums_in_dir(path: PathBytes): """Recursively searches the given directory and returns an iterable of (paths, items) where paths is a list of directories and items is a list of Items that is probably an album. Specifically, any folder