mirror of
https://github.com/beetbox/beets.git
synced 2025-12-24 17:43:52 +01:00
Added some more typehints
This commit is contained in:
parent
ed92f9b997
commit
23f4f8261c
1 changed files with 46 additions and 36 deletions
|
|
@ -185,7 +185,7 @@ class ImportSession:
|
|||
"""
|
||||
|
||||
logger: logging.Logger
|
||||
paths: Union[list[bytes], None]
|
||||
paths: Union[list[bytes], None] = None
|
||||
lib: library.Library
|
||||
|
||||
_is_resuming: Dict[bytes, bool]
|
||||
|
|
@ -223,8 +223,6 @@ class ImportSession:
|
|||
# Normalize the paths.
|
||||
if paths is not None:
|
||||
self.paths = list(map(normpath, paths))
|
||||
else:
|
||||
self.paths = None
|
||||
|
||||
def _setup_logging(self, loghandler: Optional[logging.Handler]):
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -286,13 +284,13 @@ class ImportSession:
|
|||
|
||||
self.want_resume = config["resume"].as_choice([True, False, "ask"])
|
||||
|
||||
def tag_log(self, status, paths):
|
||||
def tag_log(self, status, paths: Sequence[PathLike]):
|
||||
"""Log a message about a given album to the importer log. The status
|
||||
should reflect the reason the album couldn't be tagged.
|
||||
"""
|
||||
self.logger.info("{0} {1}", status, displayable_path(paths))
|
||||
|
||||
def log_choice(self, task, duplicate=False):
|
||||
def log_choice(self, task: ImportTask, duplicate=False):
|
||||
"""Logs the task's current choice if it should be logged. If
|
||||
``duplicate``, then this is a secondary choice after a duplicate was
|
||||
detected and a decision was made.
|
||||
|
|
@ -313,22 +311,22 @@ class ImportSession:
|
|||
elif task.choice_flag is action.SKIP:
|
||||
self.tag_log("skip", paths)
|
||||
|
||||
def should_resume(self, path):
|
||||
def should_resume(self, path: PathLike):
|
||||
raise NotImplementedError(
|
||||
"Inheriting class must implement `should_resume`"
|
||||
)
|
||||
|
||||
def choose_match(self, task):
|
||||
def choose_match(self, task: ImportTask):
|
||||
raise NotImplementedError(
|
||||
"Inheriting class must implement `choose_match`"
|
||||
)
|
||||
|
||||
def resolve_duplicate(self, task, found_duplicates):
|
||||
def resolve_duplicate(self, task: ImportTask, found_duplicates):
|
||||
raise NotImplementedError(
|
||||
"Inheriting class must implement `resolve_duplicate`"
|
||||
)
|
||||
|
||||
def choose_item(self, task):
|
||||
def choose_item(self, task: ImportTask):
|
||||
raise NotImplementedError(
|
||||
"Inheriting class must implement `choose_item`"
|
||||
)
|
||||
|
|
@ -522,12 +520,16 @@ class ImportTask(BaseImportTask):
|
|||
system.
|
||||
"""
|
||||
|
||||
choice_flag: Optional[action] = None
|
||||
match: Union[autotag.AlbumMatch, autotag.TrackMatch, None] = None
|
||||
|
||||
# Keep track of the current task item
|
||||
cur_album: Optional[str] = None
|
||||
cur_artist: Optional[str] = None
|
||||
candidates: Sequence[autotag.AlbumMatch | autotag.TrackMatch] = []
|
||||
|
||||
def __init__(self, toppath, paths, items):
|
||||
super().__init__(toppath, paths, items)
|
||||
self.choice_flag = None
|
||||
self.cur_album = None
|
||||
self.cur_artist = None
|
||||
self.candidates = []
|
||||
self.rec = None
|
||||
self.should_remove_duplicates = False
|
||||
self.should_merge_duplicates = False
|
||||
|
|
@ -622,13 +624,13 @@ class ImportTask(BaseImportTask):
|
|||
|
||||
autotag.apply_metadata(self.match.info, self.match.mapping)
|
||||
|
||||
def duplicate_items(self, lib):
|
||||
def duplicate_items(self, lib: library.Library):
|
||||
duplicate_items = []
|
||||
for album in self.find_duplicates(lib):
|
||||
duplicate_items += album.items()
|
||||
return duplicate_items
|
||||
|
||||
def remove_duplicates(self, lib):
|
||||
def remove_duplicates(self, lib: library.Library):
|
||||
duplicate_items = self.duplicate_items(lib)
|
||||
log.debug("removing {0} old duplicated items", len(duplicate_items))
|
||||
for item in duplicate_items:
|
||||
|
|
@ -640,7 +642,7 @@ class ImportTask(BaseImportTask):
|
|||
util.remove(item.path)
|
||||
util.prune_dirs(os.path.dirname(item.path), lib.directory)
|
||||
|
||||
def set_fields(self, lib):
|
||||
def set_fields(self, lib: library.Library):
|
||||
"""Sets the fields given at CLI or configuration to the specified
|
||||
values, for both the album and all its items.
|
||||
"""
|
||||
|
|
@ -661,7 +663,7 @@ class ImportTask(BaseImportTask):
|
|||
item.store()
|
||||
self.album.store()
|
||||
|
||||
def finalize(self, session):
|
||||
def finalize(self, session: ImportSession):
|
||||
"""Save progress, clean up files, and emit plugin event."""
|
||||
# Update progress.
|
||||
if session.want_resume:
|
||||
|
|
@ -706,7 +708,7 @@ class ImportTask(BaseImportTask):
|
|||
def _emit_imported(self, lib: library.Library):
|
||||
plugins.send("album_imported", lib=lib, album=self.album)
|
||||
|
||||
def handle_created(self, session):
|
||||
def handle_created(self, session: ImportSession):
|
||||
"""Send the `import_task_created` event for this task. Return a list of
|
||||
tasks that should continue through the pipeline. By default, this is a
|
||||
list containing only the task itself, but plugins can replace the task
|
||||
|
|
@ -733,7 +735,7 @@ class ImportTask(BaseImportTask):
|
|||
self.candidates = prop.candidates
|
||||
self.rec = prop.recommendation
|
||||
|
||||
def find_duplicates(self, lib):
|
||||
def find_duplicates(self, lib: library.Library):
|
||||
"""Return a list of albums from `lib` with the same artist and
|
||||
album name as the task.
|
||||
"""
|
||||
|
|
@ -855,7 +857,7 @@ class ImportTask(BaseImportTask):
|
|||
|
||||
plugins.send("import_task_files", session=session, task=self)
|
||||
|
||||
def add(self, lib):
|
||||
def add(self, lib: library.Library):
|
||||
"""Add the items as an album to the library and remove replaced items."""
|
||||
self.align_album_level_fields()
|
||||
with lib.transaction():
|
||||
|
|
@ -1101,7 +1103,7 @@ class SingletonImportTask(ImportTask):
|
|||
def infer_album_fields(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def choose_match(self, session):
|
||||
def choose_match(self, session: ImportSession):
|
||||
"""Ask the session which match should apply and apply it."""
|
||||
choice = session.choose_item(self)
|
||||
self.set_choice(choice)
|
||||
|
|
@ -1285,7 +1287,7 @@ class ImportTaskFactory:
|
|||
indicated by a path.
|
||||
"""
|
||||
|
||||
def __init__(self, toppath, session):
|
||||
def __init__(self, toppath: PathLike, session: ImportSession):
|
||||
"""Create a new task factory.
|
||||
|
||||
`toppath` is the user-specified path to search for music to
|
||||
|
|
@ -1338,7 +1340,7 @@ class ImportTaskFactory:
|
|||
else:
|
||||
yield self.sentinel()
|
||||
|
||||
def _create(self, task):
|
||||
def _create(self, task: Optional[ImportTask]):
|
||||
"""Handle a new task to be emitted by the factory.
|
||||
|
||||
Emit the `import_task_created` event and increment the
|
||||
|
|
@ -1371,7 +1373,7 @@ class ImportTaskFactory:
|
|||
for dirs, paths in albums_in_dir(self.toppath):
|
||||
yield dirs, paths
|
||||
|
||||
def singleton(self, path):
|
||||
def singleton(self, path: PathLike):
|
||||
"""Return a `SingletonImportTask` for the music file."""
|
||||
if self.session.already_imported(self.toppath, [path]):
|
||||
log.debug(
|
||||
|
|
@ -1386,7 +1388,7 @@ class ImportTaskFactory:
|
|||
else:
|
||||
return None
|
||||
|
||||
def album(self, paths, dirs=None):
|
||||
def album(self, paths: Iterable[PathLike], dirs=None):
|
||||
"""Return a `ImportTask` with all media files from paths.
|
||||
|
||||
`dirs` is a list of parent directories used to record already
|
||||
|
|
@ -1413,7 +1415,7 @@ class ImportTaskFactory:
|
|||
else:
|
||||
return None
|
||||
|
||||
def sentinel(self, paths=None):
|
||||
def sentinel(self, paths: Optional[Iterable[PathLike]] = None):
|
||||
"""Return a `SentinelImportTask` indicating the end of a
|
||||
top-level directory import.
|
||||
"""
|
||||
|
|
@ -1448,7 +1450,7 @@ class ImportTaskFactory:
|
|||
log.debug("Archive extracted to: {0}", self.toppath)
|
||||
return archive_task
|
||||
|
||||
def read_item(self, path):
|
||||
def read_item(self, path: PathLike):
|
||||
"""Return an `Item` read from the path.
|
||||
|
||||
If an item cannot be read, return `None` instead and log an
|
||||
|
|
@ -1491,12 +1493,16 @@ def _extend_pipeline(tasks, *stages):
|
|||
# Full-album pipeline stages.
|
||||
|
||||
|
||||
def read_tasks(session):
|
||||
def read_tasks(session: ImportSession):
|
||||
"""A generator yielding all the albums (as ImportTask objects) found
|
||||
in the user-specified list of paths. In the case of a singleton
|
||||
import, yields single-item tasks instead.
|
||||
"""
|
||||
skipped = 0
|
||||
if session.paths is None:
|
||||
log.warning("No path specified in session.")
|
||||
return
|
||||
|
||||
for toppath in session.paths:
|
||||
# Check whether we need to resume the import.
|
||||
session.ask_resume(toppath)
|
||||
|
|
@ -1514,7 +1520,7 @@ def read_tasks(session):
|
|||
log.info("Skipped {0} paths.", skipped)
|
||||
|
||||
|
||||
def query_tasks(session):
|
||||
def query_tasks(session: ImportSession):
|
||||
"""A generator that works as a drop-in-replacement for read_tasks.
|
||||
Instead of finding files from the filesystem, a query is used to
|
||||
match items from the library.
|
||||
|
|
@ -1641,7 +1647,7 @@ def user_query(session: ImportSession, task: ImportTask):
|
|||
return task
|
||||
|
||||
|
||||
def resolve_duplicates(session, task):
|
||||
def resolve_duplicates(session: ImportSession, task: ImportTask):
|
||||
"""Check if a task conflicts with items or albums already imported
|
||||
and ask the session to resolve this.
|
||||
"""
|
||||
|
|
@ -1684,7 +1690,7 @@ def resolve_duplicates(session, task):
|
|||
|
||||
|
||||
@pipeline.mutator_stage
|
||||
def import_asis(session, task):
|
||||
def import_asis(session: ImportSession, task: ImportTask):
|
||||
"""Select the `action.ASIS` choice for all tasks.
|
||||
|
||||
This stage replaces the initial_lookup and user_query stages
|
||||
|
|
@ -1698,7 +1704,7 @@ def import_asis(session, task):
|
|||
apply_choice(session, task)
|
||||
|
||||
|
||||
def apply_choice(session, task):
|
||||
def apply_choice(session: ImportSession, task: ImportTask):
|
||||
"""Apply the task's choice to the Album or Item it contains and add
|
||||
it to the library.
|
||||
"""
|
||||
|
|
@ -1722,7 +1728,11 @@ def apply_choice(session, task):
|
|||
|
||||
|
||||
@pipeline.mutator_stage
|
||||
def plugin_stage(session, func, task):
|
||||
def plugin_stage(
|
||||
session: ImportSession,
|
||||
func: Callable[[ImportSession, ImportTask]],
|
||||
task: ImportTask,
|
||||
):
|
||||
"""A coroutine (pipeline stage) that calls the given function with
|
||||
each non-skipped import task. These stages occur between applying
|
||||
metadata changes and moving/copying/writing files.
|
||||
|
|
@ -1739,7 +1749,7 @@ def plugin_stage(session, func, task):
|
|||
|
||||
|
||||
@pipeline.stage
|
||||
def manipulate_files(session, task):
|
||||
def manipulate_files(session: ImportSession, task: ImportTask):
|
||||
"""A coroutine (pipeline stage) that performs necessary file
|
||||
manipulations *after* items have been added to the library and
|
||||
finalizes each task.
|
||||
|
|
@ -1784,7 +1794,7 @@ def log_files(session: ImportSession, task: ImportTask):
|
|||
log.info(" {0}", displayable_path(item["path"]))
|
||||
|
||||
|
||||
def group_albums(session):
|
||||
def group_albums(session: ImportSession):
|
||||
"""A pipeline stage that groups the items of each task into albums
|
||||
using their metadata.
|
||||
|
||||
|
|
@ -1823,7 +1833,7 @@ def is_subdir_of_any_in_list(path, dirs):
|
|||
return any(d in ancestors for d in dirs)
|
||||
|
||||
|
||||
def albums_in_dir(path):
|
||||
def albums_in_dir(path: PathLike):
|
||||
"""Recursively searches the given directory and returns an iterable
|
||||
of (paths, items) where paths is a list of directories and items is
|
||||
a list of Items that is probably an album. Specifically, any folder
|
||||
|
|
|
|||
Loading…
Reference in a new issue