Replaced pathlike with pathbytes and remove unnecessary type ignores

This commit is contained in:
Sebastian Mohr 2025-02-08 22:03:03 +01:00
parent b46dbb93f7
commit 10c0aa3f6a

View file

@ -15,14 +15,13 @@ from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from tempfile import mkdtemp
from typing import Callable, Dict, Iterable, Optional, Sequence, Union, cast
from typing import Callable, Iterable, Optional, Sequence, cast
import mediafile
from beets import autotag, config, dbcore, library, logging, plugins, util
from beets.util import (
MoveOperation,
PathLike,
ancestry,
displayable_path,
normpath,
@ -61,6 +60,10 @@ REIMPORT_FRESH_FIELDS_ITEM = list(REIMPORT_FRESH_FIELDS_ALBUM)
# Global logger.
log = logging.getLogger("beets")
# Here for now to allow for a easy replace later on
# once we can move to a PathLike
PathBytes = bytes
class ImportAbortError(Exception):
"""Raised when the user aborts the tagging operation."""
@ -94,12 +97,12 @@ class ImportState:
```
"""
tagprogress: dict
taghistory: set
path: PathLike
tagprogress: dict[PathBytes, list[PathBytes]]
taghistory: set[tuple[PathBytes, ...]]
path: PathBytes
def __init__(self, readonly=False, path: Union[PathLike, None] = None):
self.path = path or cast(PathLike, config["statefile"].as_filename())
def __init__(self, readonly=False, path: Optional[PathBytes] = None):
self.path = path or cast(PathBytes, config["statefile"].as_filename())
self.tagprogress = {}
self.taghistory = set()
self._open()
@ -141,7 +144,7 @@ class ImportState:
# -------------------------------- Tagprogress ------------------------------- #
def progress_add(self, toppath: PathLike, *paths: PathLike):
def progress_add(self, toppath: PathBytes, *paths: PathBytes):
"""Record that the files under all of the `paths` have been imported
under `toppath`.
"""
@ -153,19 +156,19 @@ class ImportState:
else:
insort(imported, path)
def progress_has_element(self, toppath: PathLike, path: PathLike) -> bool:
def progress_has_element(self, toppath: PathBytes, path: PathBytes) -> bool:
"""Return whether `path` has been imported in `toppath`."""
imported = self.tagprogress.get(toppath, [])
i = bisect_left(imported, path)
return i != len(imported) and imported[i] == path
def progress_has(self, toppath: PathLike) -> bool:
def progress_has(self, toppath: PathBytes) -> bool:
"""Return `True` if there exist paths that have already been
imported under `toppath`.
"""
return toppath in self.tagprogress
def progress_reset(self, toppath: PathLike):
def progress_reset(self, toppath: PathBytes):
"""Reset the progress for `toppath`."""
with self as state:
if toppath in state.tagprogress:
@ -173,7 +176,7 @@ class ImportState:
# -------------------------------- Taghistory -------------------------------- #
def history_add(self, paths: list[PathLike]):
def history_add(self, paths: list[PathBytes]):
"""Add the paths to the history."""
with self as state:
state.taghistory.add(tuple(paths))
@ -185,18 +188,18 @@ class ImportSession:
"""
logger: logging.Logger
paths: Union[list[bytes], None] = None
paths: Optional[list[bytes]] = None
lib: library.Library
_is_resuming: Dict[bytes, bool]
_merged_items: set
_merged_dirs: set
_is_resuming: dict[bytes, bool]
_merged_items: set[PathBytes]
_merged_dirs: set[PathBytes]
def __init__(
self,
lib: library.Library,
loghandler: Optional[logging.Handler],
paths: Optional[Sequence[PathLike]],
paths: Optional[Sequence[PathBytes]],
query: Optional[dbcore.Query],
):
"""Create a session.
@ -284,7 +287,7 @@ class ImportSession:
self.want_resume = config["resume"].as_choice([True, False, "ask"])
def tag_log(self, status, paths: Sequence[PathLike]):
def tag_log(self, status, paths: Sequence[PathBytes]):
"""Log a message about a given album to the importer log. The status
should reflect the reason the album couldn't be tagged.
"""
@ -311,7 +314,7 @@ class ImportSession:
elif task.choice_flag is action.SKIP:
self.tag_log("skip", paths)
def should_resume(self, path: PathLike):
def should_resume(self, path: PathBytes):
raise NotImplementedError(
"Inheriting class must implement `should_resume`"
)
@ -383,7 +386,7 @@ class ImportSession:
# Incremental and resumed imports
def already_imported(self, toppath: PathLike, paths: Sequence[PathLike]):
def already_imported(self, toppath: PathBytes, paths: Sequence[PathBytes]):
"""Returns true if the files belonging to this task have already
been imported in a previous session.
"""
@ -404,7 +407,7 @@ class ImportSession:
self._history_dirs = ImportState().taghistory
return self._history_dirs
def already_merged(self, paths: Sequence[PathLike]):
def already_merged(self, paths: Sequence[PathBytes]):
"""Returns true if all the paths being imported were part of a merge
during previous tasks.
"""
@ -413,7 +416,7 @@ class ImportSession:
return False
return True
def mark_merged(self, paths: Sequence[PathLike]):
def mark_merged(self, paths: Sequence[PathBytes]):
"""Mark paths and directories as merged for future reimport tasks."""
self._merged_items.update(paths)
dirs = {
@ -422,14 +425,14 @@ class ImportSession:
}
self._merged_dirs.update(dirs)
def is_resuming(self, toppath: PathLike):
def is_resuming(self, toppath: PathBytes):
"""Return `True` if user wants to resume import of this path.
You have to call `ask_resume` first to determine the return value.
"""
return self._is_resuming.get(normpath(toppath), False)
def ask_resume(self, toppath: PathLike):
def ask_resume(self, toppath: PathBytes):
"""If import of `toppath` was aborted in an earlier session, ask
user if they want to resume the import.
@ -457,14 +460,14 @@ class BaseImportTask:
Tasks flow through the importer pipeline. Each stage can update
them."""
toppath: Optional[PathLike]
paths: list[PathLike]
toppath: Optional[PathBytes]
paths: list[PathBytes]
items: list[library.Item]
def __init__(
self,
toppath: Optional[PathLike],
paths: Optional[Iterable[PathLike]],
toppath: Optional[PathBytes],
paths: Optional[Iterable[PathBytes]],
items: Optional[Iterable[library.Item]],
):
"""Create a task. The primary fields that define a task are:
@ -521,7 +524,7 @@ class ImportTask(BaseImportTask):
"""
choice_flag: Optional[action] = None
match: Union[autotag.AlbumMatch, autotag.TrackMatch, None] = None
match: autotag.AlbumMatch | autotag.TrackMatch | None = None
# Keep track of the current task item
cur_album: Optional[str] = None
@ -537,7 +540,7 @@ class ImportTask(BaseImportTask):
self.search_ids = [] # user-supplied candidate IDs.
def set_choice(
self, choice: Union[action, autotag.AlbumMatch, autotag.TrackMatch]
self, choice: action | autotag.AlbumMatch | autotag.TrackMatch
):
"""Given an AlbumMatch or TrackMatch object or an action constant,
indicates that an action has been selected for this task.
@ -547,6 +550,7 @@ class ImportTask(BaseImportTask):
"""
# Not part of the task structure:
assert choice != action.APPLY # Only used internally.
if choice in (
action.SKIP,
action.ASIS,
@ -554,11 +558,11 @@ class ImportTask(BaseImportTask):
action.ALBUMS,
action.RETAG,
):
self.choice_flag = choice
self.choice_flag = cast(action, choice)
self.match = None
else:
self.choice_flag = action.APPLY # Implicit choice.
self.match = choice
self.match = cast(autotag.AlbumMatch | autotag.TrackMatch, choice)
def save_progress(self):
"""Updates the progress state to indicate that this album has
@ -827,7 +831,7 @@ class ImportTask(BaseImportTask):
items = self.imported_items()
# Save the original paths of all items for deletion and pruning
# in the next step (finalization).
self.old_paths: list[PathLike] = [item.path for item in items]
self.old_paths: list[PathBytes] = [item.path for item in items]
for item in items:
if operation is not None:
# In copy and link modes, treat re-imports specially:
@ -882,7 +886,7 @@ class ImportTask(BaseImportTask):
and `replaced_albums` dictionaries.
"""
self.replaced_items = defaultdict(list)
self.replaced_albums: dict[PathLike, library.Album] = defaultdict()
self.replaced_albums: dict[PathBytes, library.Album] = defaultdict()
replaced_album_ids = set()
for item in self.imported_items():
dup_items = list(
@ -1033,7 +1037,7 @@ class ImportTask(BaseImportTask):
class SingletonImportTask(ImportTask):
"""ImportTask for a single track that is not associated to an album."""
def __init__(self, toppath: Optional[PathLike], item: library.Item):
def __init__(self, toppath: Optional[PathBytes], item: library.Item):
super().__init__(toppath, [item.path], [item])
self.item = item
self.is_album = False
@ -1287,7 +1291,7 @@ class ImportTaskFactory:
indicated by a path.
"""
def __init__(self, toppath: PathLike, session: ImportSession):
def __init__(self, toppath: PathBytes, session: ImportSession):
"""Create a new task factory.
`toppath` is the user-specified path to search for music to
@ -1314,6 +1318,7 @@ class ImportTaskFactory:
extracted data.
"""
# Check whether this is an archive.
archive_task: ArchiveImportTask | None = None
if self.is_archive:
archive_task = self.unarchive()
if not archive_task:
@ -1335,8 +1340,8 @@ class ImportTaskFactory:
# it is finished. This is usually just a SentinelImportTask, but
# for archive imports, send the archive task instead (to remove
# the extracted directory).
if self.is_archive:
yield archive_task # type: ignore
if archive_task:
yield archive_task
else:
yield self.sentinel()
@ -1373,7 +1378,7 @@ class ImportTaskFactory:
for dirs, paths in albums_in_dir(self.toppath):
yield dirs, paths
def singleton(self, path: PathLike):
def singleton(self, path: PathBytes):
"""Return a `SingletonImportTask` for the music file."""
if self.session.already_imported(self.toppath, [path]):
log.debug(
@ -1388,7 +1393,7 @@ class ImportTaskFactory:
else:
return None
def album(self, paths: Iterable[PathLike], dirs=None):
def album(self, paths: Iterable[PathBytes], dirs=None):
"""Return a `ImportTask` with all media files from paths.
`dirs` is a list of parent directories used to record already
@ -1407,15 +1412,16 @@ class ImportTaskFactory:
self.skipped += 1
return None
items = map(self.read_item, paths)
items = [item for item in items if item]
items: list[library.Item] = [
item for item in map(self.read_item, paths) if item
]
if items:
return ImportTask(self.toppath, dirs, items)
else:
return None
def sentinel(self, paths: Optional[Iterable[PathLike]] = None):
def sentinel(self, paths: Optional[Iterable[PathBytes]] = None):
"""Return a `SentinelImportTask` indicating the end of a
top-level directory import.
"""
@ -1450,7 +1456,7 @@ class ImportTaskFactory:
log.debug("Archive extracted to: {0}", self.toppath)
return archive_task
def read_item(self, path: PathLike):
def read_item(self, path: PathBytes):
"""Return an `Item` read from the path.
If an item cannot be read, return `None` instead and log an
@ -1528,9 +1534,9 @@ def query_tasks(session: ImportSession):
if session.config["singletons"]:
# Search for items.
for item in session.lib.items(session.query):
task = SingletonImportTask(None, item)
for task in task.handle_created(session):
yield task
singleton_task = SingletonImportTask(None, item)
for task in singleton_task.handle_created(session):
yield singleton_task
else:
# Search for albums.
@ -1607,7 +1613,7 @@ def user_query(session: ImportSession, task: ImportTask):
return _extend_pipeline(
emitter(task),
lookup_candidates(session),
user_query(session), # type: ignore # Recursion in decorators
user_query(session),
)
# As albums: group items by albums and create task for each album
@ -1616,7 +1622,7 @@ def user_query(session: ImportSession, task: ImportTask):
[task],
group_albums(session),
lookup_candidates(session),
user_query(session), # type: ignore # Recursion in decorators
user_query(session),
)
resolve_duplicates(session, task)
@ -1638,9 +1644,7 @@ def user_query(session: ImportSession, task: ImportTask):
)
return _extend_pipeline(
[merged_task],
lookup_candidates(session),
user_query(session), # type: ignore # Recursion in decorators
[merged_task], lookup_candidates(session), user_query(session)
)
apply_choice(session, task)
@ -1730,7 +1734,7 @@ def apply_choice(session: ImportSession, task: ImportTask):
@pipeline.mutator_stage
def plugin_stage(
session: ImportSession,
func: Callable[[ImportSession, ImportTask]],
func: Callable[[ImportSession, ImportTask], None],
task: ImportTask,
):
"""A coroutine (pipeline stage) that calls the given function with
@ -1811,10 +1815,10 @@ def group_albums(session: ImportSession):
if task.skip:
continue
tasks = []
sorted_items = sorted(task.items, key=group)
sorted_items: list[library.Item] = sorted(task.items, key=group)
for _, items in itertools.groupby(sorted_items, group):
items = list(items)
task = ImportTask(task.toppath, [i.path for i in items], items)
l_items = list(items)
task = ImportTask(task.toppath, [i.path for i in l_items], l_items)
tasks += task.handle_created(session)
tasks.append(SentinelImportTask(task.toppath, task.paths))
@ -1833,7 +1837,7 @@ def is_subdir_of_any_in_list(path, dirs):
return any(d in ancestors for d in dirs)
def albums_in_dir(path: PathLike):
def albums_in_dir(path: PathBytes):
"""Recursively searches the given directory and returns an iterable
of (paths, items) where paths is a list of directories and items is
a list of Items that is probably an album. Specifically, any folder