mirror of
https://github.com/beetbox/beets.git
synced 2025-12-30 12:32:33 +01:00
Removed unnecessary cast even tho it now produces issues locally.
This commit is contained in:
parent
bbd92d97ab
commit
716720d2a5
1 changed files with 30 additions and 67 deletions
|
|
@ -29,7 +29,7 @@ from collections import defaultdict
|
|||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from tempfile import mkdtemp
|
||||
from typing import Callable, Iterable, Optional, Sequence, Union, cast
|
||||
from typing import Callable, Iterable, Sequence, Union, cast
|
||||
|
||||
import mediafile
|
||||
|
||||
|
|
@ -116,9 +116,7 @@ class ImportState:
|
|||
path: PathBytes
|
||||
|
||||
def __init__(self, readonly=False, path: PathBytes | None = None):
|
||||
self.path = path or os.fsencode(
|
||||
cast(str, config["statefile"].as_filename())
|
||||
)
|
||||
self.path = path or os.fsencode(config["statefile"].as_filename())
|
||||
self.tagprogress = {}
|
||||
self.taghistory = set()
|
||||
self._open()
|
||||
|
|
@ -271,9 +269,7 @@ class ImportSession:
|
|||
iconfig["incremental"] = False
|
||||
|
||||
if iconfig["reflink"]:
|
||||
iconfig["reflink"] = iconfig["reflink"].as_choice(
|
||||
["auto", True, False]
|
||||
)
|
||||
iconfig["reflink"] = iconfig["reflink"].as_choice(["auto", True, False])
|
||||
|
||||
# Copy, move, reflink, link, and hardlink are mutually exclusive.
|
||||
if iconfig["move"]:
|
||||
|
|
@ -331,24 +327,16 @@ class ImportSession:
|
|||
self.tag_log("skip", paths)
|
||||
|
||||
def should_resume(self, path: PathBytes):
|
||||
raise NotImplementedError(
|
||||
"Inheriting class must implement `should_resume`"
|
||||
)
|
||||
raise NotImplementedError("Inheriting class must implement `should_resume`")
|
||||
|
||||
def choose_match(self, task: ImportTask):
|
||||
raise NotImplementedError(
|
||||
"Inheriting class must implement `choose_match`"
|
||||
)
|
||||
raise NotImplementedError("Inheriting class must implement `choose_match`")
|
||||
|
||||
def resolve_duplicate(self, task: ImportTask, found_duplicates):
|
||||
raise NotImplementedError(
|
||||
"Inheriting class must implement `resolve_duplicate`"
|
||||
)
|
||||
raise NotImplementedError("Inheriting class must implement `resolve_duplicate`")
|
||||
|
||||
def choose_item(self, task: ImportTask):
|
||||
raise NotImplementedError(
|
||||
"Inheriting class must implement `choose_item`"
|
||||
)
|
||||
raise NotImplementedError("Inheriting class must implement `choose_item`")
|
||||
|
||||
def run(self):
|
||||
"""Run the import task."""
|
||||
|
|
@ -555,9 +543,7 @@ class ImportTask(BaseImportTask):
|
|||
self.is_album = True
|
||||
self.search_ids = [] # user-supplied candidate IDs.
|
||||
|
||||
def set_choice(
|
||||
self, choice: action | autotag.AlbumMatch | autotag.TrackMatch
|
||||
):
|
||||
def set_choice(self, choice: action | autotag.AlbumMatch | autotag.TrackMatch):
|
||||
"""Given an AlbumMatch or TrackMatch object or an action constant,
|
||||
indicates that an action has been selected for this task.
|
||||
|
||||
|
|
@ -574,14 +560,14 @@ class ImportTask(BaseImportTask):
|
|||
action.ALBUMS,
|
||||
action.RETAG,
|
||||
):
|
||||
# Cast needed as mypy can't infer the type
|
||||
self.choice_flag = cast(action, choice)
|
||||
self.match = None
|
||||
else:
|
||||
self.choice_flag = action.APPLY # Implicit choice.
|
||||
# Union is needed here for python 3.9 compatibility!
|
||||
self.match = cast(
|
||||
Union[autotag.AlbumMatch, autotag.TrackMatch], choice
|
||||
)
|
||||
# Cast needed as mypy can't infer the type
|
||||
self.match = cast(Union[autotag.AlbumMatch, autotag.TrackMatch], choice)
|
||||
|
||||
def save_progress(self):
|
||||
"""Updates the progress state to indicate that this album has
|
||||
|
|
@ -659,9 +645,7 @@ class ImportTask(BaseImportTask):
|
|||
for item in duplicate_items:
|
||||
item.remove()
|
||||
if lib.directory in util.ancestry(item.path):
|
||||
log.debug(
|
||||
"deleting duplicate {0}", util.displayable_path(item.path)
|
||||
)
|
||||
log.debug("deleting duplicate {0}", util.displayable_path(item.path))
|
||||
util.remove(item.path)
|
||||
util.prune_dirs(os.path.dirname(item.path), lib.directory)
|
||||
|
||||
|
|
@ -693,7 +677,8 @@ class ImportTask(BaseImportTask):
|
|||
self.save_progress()
|
||||
if session.config["incremental"] and not (
|
||||
# Should we skip recording to incremental list?
|
||||
self.skip and session.config["incremental_skip_later"]
|
||||
self.skip
|
||||
and session.config["incremental_skip_later"]
|
||||
):
|
||||
self.save_history()
|
||||
|
||||
|
|
@ -750,9 +735,7 @@ class ImportTask(BaseImportTask):
|
|||
candidate IDs are stored in self.search_ids: if present, the
|
||||
initial lookup is restricted to only those IDs.
|
||||
"""
|
||||
artist, album, prop = autotag.tag_album(
|
||||
self.items, search_ids=self.search_ids
|
||||
)
|
||||
artist, album, prop = autotag.tag_album(self.items, search_ids=self.search_ids)
|
||||
self.cur_artist = artist
|
||||
self.cur_album = album
|
||||
self.candidates = prop.candidates
|
||||
|
|
@ -772,9 +755,7 @@ class ImportTask(BaseImportTask):
|
|||
# Construct a query to find duplicates with this metadata. We
|
||||
# use a temporary Album object to generate any computed fields.
|
||||
tmp_album = library.Album(lib, **info)
|
||||
keys = cast(
|
||||
list[str], config["import"]["duplicate_keys"]["album"].as_str_seq()
|
||||
)
|
||||
keys: list[str] = config["import"]["duplicate_keys"]["album"].as_str_seq()
|
||||
dup_query = tmp_album.duplicates_query(keys)
|
||||
|
||||
# Don't count albums with the same files as duplicates.
|
||||
|
|
@ -805,8 +786,7 @@ class ImportTask(BaseImportTask):
|
|||
[i.albumartist or i.artist for i in self.items]
|
||||
)
|
||||
if freq == len(self.items) or (
|
||||
freq > 1
|
||||
and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH
|
||||
freq > 1 and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH
|
||||
):
|
||||
# Single-artist album.
|
||||
changes["albumartist"] = plur_albumartist
|
||||
|
|
@ -834,7 +814,7 @@ class ImportTask(BaseImportTask):
|
|||
|
||||
def manipulate_files(
|
||||
self,
|
||||
operation: Optional[MoveOperation] = None,
|
||||
operation: MoveOperation | None = None,
|
||||
write=False,
|
||||
session: ImportSession | None = None,
|
||||
):
|
||||
|
|
@ -908,15 +888,10 @@ class ImportTask(BaseImportTask):
|
|||
self.replaced_albums: dict[PathBytes, library.Album] = defaultdict()
|
||||
replaced_album_ids = set()
|
||||
for item in self.imported_items():
|
||||
dup_items = list(
|
||||
lib.items(dbcore.query.BytesQuery("path", item.path))
|
||||
)
|
||||
dup_items = list(lib.items(dbcore.query.BytesQuery("path", item.path)))
|
||||
self.replaced_items[item] = dup_items
|
||||
for dup_item in dup_items:
|
||||
if (
|
||||
not dup_item.album_id
|
||||
or dup_item.album_id in replaced_album_ids
|
||||
):
|
||||
if not dup_item.album_id or dup_item.album_id in replaced_album_ids:
|
||||
continue
|
||||
replaced_album = dup_item._cached_album
|
||||
if replaced_album:
|
||||
|
|
@ -969,8 +944,7 @@ class ImportTask(BaseImportTask):
|
|||
self.album.artpath = replaced_album.artpath
|
||||
self.album.store()
|
||||
log.debug(
|
||||
"Reimported album {}. Preserving attribute ['added']. "
|
||||
"Path: {}",
|
||||
"Reimported album {}. Preserving attribute ['added']. " "Path: {}",
|
||||
self.album.id,
|
||||
displayable_path(self.album.path),
|
||||
)
|
||||
|
|
@ -1049,7 +1023,7 @@ class ImportTask(BaseImportTask):
|
|||
util.prune_dirs(
|
||||
os.path.dirname(filename),
|
||||
self.toppath,
|
||||
clutter=cast(list[str], config["clutter"].as_str_seq()),
|
||||
clutter=config["clutter"].as_str_seq(),
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -1102,9 +1076,7 @@ class SingletonImportTask(ImportTask):
|
|||
# Query for existing items using the same metadata. We use a
|
||||
# temporary `Item` object to generate any computed fields.
|
||||
tmp_item = library.Item(lib, **info)
|
||||
keys = cast(
|
||||
list[str], config["import"]["duplicate_keys"]["item"].as_str_seq()
|
||||
)
|
||||
keys: list[str] = config["import"]["duplicate_keys"]["item"].as_str_seq()
|
||||
dup_query = tmp_item.duplicates_query(keys)
|
||||
|
||||
found_items = []
|
||||
|
|
@ -1278,9 +1250,7 @@ class ArchiveImportTask(SentinelImportTask):
|
|||
break
|
||||
|
||||
if handler_class is None:
|
||||
raise ValueError(
|
||||
"No handler found for archive: {0}".format(self.toppath)
|
||||
)
|
||||
raise ValueError("No handler found for archive: {0}".format(self.toppath))
|
||||
|
||||
extract_to = mkdtemp()
|
||||
archive = handler_class(os.fsdecode(self.toppath), mode="r")
|
||||
|
|
@ -1400,9 +1370,7 @@ class ImportTaskFactory:
|
|||
def singleton(self, path: PathBytes):
|
||||
"""Return a `SingletonImportTask` for the music file."""
|
||||
if self.session.already_imported(self.toppath, [path]):
|
||||
log.debug(
|
||||
"Skipping previously-imported path: {0}", displayable_path(path)
|
||||
)
|
||||
log.debug("Skipping previously-imported path: {0}", displayable_path(path))
|
||||
self.skipped += 1
|
||||
return None
|
||||
|
||||
|
|
@ -1425,9 +1393,7 @@ class ImportTaskFactory:
|
|||
dirs = list({os.path.dirname(p) for p in paths})
|
||||
|
||||
if self.session.already_imported(self.toppath, dirs):
|
||||
log.debug(
|
||||
"Skipping previously-imported path: {0}", displayable_path(dirs)
|
||||
)
|
||||
log.debug("Skipping previously-imported path: {0}", displayable_path(dirs))
|
||||
self.skipped += 1
|
||||
return None
|
||||
|
||||
|
|
@ -1457,8 +1423,7 @@ class ImportTaskFactory:
|
|||
|
||||
if not (self.session.config["move"] or self.session.config["copy"]):
|
||||
log.warning(
|
||||
"Archive importing requires either "
|
||||
"'copy' or 'move' to be enabled."
|
||||
"Archive importing requires either " "'copy' or 'move' to be enabled."
|
||||
)
|
||||
return
|
||||
|
||||
|
|
@ -1677,9 +1642,7 @@ def resolve_duplicates(session: ImportSession, task: ImportTask):
|
|||
if task.choice_flag in (action.ASIS, action.APPLY, action.RETAG):
|
||||
found_duplicates = task.find_duplicates(session.lib)
|
||||
if found_duplicates:
|
||||
log.debug(
|
||||
"found duplicates: {}".format([o.id for o in found_duplicates])
|
||||
)
|
||||
log.debug("found duplicates: {}".format([o.id for o in found_duplicates]))
|
||||
|
||||
# Get the default action to follow from config.
|
||||
duplicate_action = config["import"]["duplicate_action"].as_choice(
|
||||
|
|
@ -1863,8 +1826,8 @@ def albums_in_dir(path: PathBytes):
|
|||
containing any media files is an album.
|
||||
"""
|
||||
collapse_pat = collapse_paths = collapse_items = None
|
||||
ignore = cast(list[str], config["ignore"].as_str_seq())
|
||||
ignore_hidden = cast(bool, config["ignore_hidden"].get(bool))
|
||||
ignore: list[str] = config["ignore"].as_str_seq()
|
||||
ignore_hidden: bool = config["ignore_hidden"].get(bool)
|
||||
|
||||
for root, dirs, files in sorted_walk(
|
||||
path, ignore=ignore, ignore_hidden=ignore_hidden, logger=log
|
||||
|
|
|
|||
Loading…
Reference in a new issue