Formatting

This commit is contained in:
Sebastian Mohr 2025-02-08 22:20:40 +01:00
parent 44074d7464
commit 1a24e3f1d0

View file

@ -269,7 +269,9 @@ class ImportSession:
iconfig["incremental"] = False
if iconfig["reflink"]:
iconfig["reflink"] = iconfig["reflink"].as_choice(["auto", True, False])
iconfig["reflink"] = iconfig["reflink"].as_choice(
["auto", True, False]
)
# Copy, move, reflink, link, and hardlink are mutually exclusive.
if iconfig["move"]:
@ -327,16 +329,24 @@ class ImportSession:
self.tag_log("skip", paths)
def should_resume(self, path: PathBytes):
raise NotImplementedError("Inheriting class must implement `should_resume`")
raise NotImplementedError(
"Inheriting class must implement `should_resume`"
)
def choose_match(self, task: ImportTask):
raise NotImplementedError("Inheriting class must implement `choose_match`")
raise NotImplementedError(
"Inheriting class must implement `choose_match`"
)
def resolve_duplicate(self, task: ImportTask, found_duplicates):
raise NotImplementedError("Inheriting class must implement `resolve_duplicate`")
raise NotImplementedError(
"Inheriting class must implement `resolve_duplicate`"
)
def choose_item(self, task: ImportTask):
raise NotImplementedError("Inheriting class must implement `choose_item`")
raise NotImplementedError(
"Inheriting class must implement `choose_item`"
)
def run(self):
"""Run the import task."""
@ -543,7 +553,9 @@ class ImportTask(BaseImportTask):
self.is_album = True
self.search_ids = [] # user-supplied candidate IDs.
def set_choice(self, choice: action | autotag.AlbumMatch | autotag.TrackMatch):
def set_choice(
self, choice: action | autotag.AlbumMatch | autotag.TrackMatch
):
"""Given an AlbumMatch or TrackMatch object or an action constant,
indicates that an action has been selected for this task.
@ -565,7 +577,9 @@ class ImportTask(BaseImportTask):
else:
self.choice_flag = action.APPLY # Implicit choice.
# Union is needed here for python 3.9 compatibility!
self.match = cast(Union[autotag.AlbumMatch, autotag.TrackMatch], choice)
self.match = cast(
Union[autotag.AlbumMatch, autotag.TrackMatch], choice
)
def save_progress(self):
"""Updates the progress state to indicate that this album has
@ -643,7 +657,9 @@ class ImportTask(BaseImportTask):
for item in duplicate_items:
item.remove()
if lib.directory in util.ancestry(item.path):
log.debug("deleting duplicate {0}", util.displayable_path(item.path))
log.debug(
"deleting duplicate {0}", util.displayable_path(item.path)
)
util.remove(item.path)
util.prune_dirs(os.path.dirname(item.path), lib.directory)
@ -675,8 +691,7 @@ class ImportTask(BaseImportTask):
self.save_progress()
if session.config["incremental"] and not (
# Should we skip recording to incremental list?
self.skip
and session.config["incremental_skip_later"]
self.skip and session.config["incremental_skip_later"]
):
self.save_history()
@ -733,7 +748,9 @@ class ImportTask(BaseImportTask):
candidate IDs are stored in self.search_ids: if present, the
initial lookup is restricted to only those IDs.
"""
artist, album, prop = autotag.tag_album(self.items, search_ids=self.search_ids)
artist, album, prop = autotag.tag_album(
self.items, search_ids=self.search_ids
)
self.cur_artist = artist
self.cur_album = album
self.candidates = prop.candidates
@ -753,7 +770,9 @@ class ImportTask(BaseImportTask):
# Construct a query to find duplicates with this metadata. We
# use a temporary Album object to generate any computed fields.
tmp_album = library.Album(lib, **info)
keys = cast(list[str], config["import"]["duplicate_keys"]["album"].as_str_seq())
keys = cast(
list[str], config["import"]["duplicate_keys"]["album"].as_str_seq()
)
dup_query = tmp_album.duplicates_query(keys)
# Don't count albums with the same files as duplicates.
@ -784,7 +803,8 @@ class ImportTask(BaseImportTask):
[i.albumartist or i.artist for i in self.items]
)
if freq == len(self.items) or (
freq > 1 and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH
freq > 1
and float(freq) / len(self.items) >= SINGLE_ARTIST_THRESH
):
# Single-artist album.
changes["albumartist"] = plur_albumartist
@ -886,10 +906,15 @@ class ImportTask(BaseImportTask):
self.replaced_albums: dict[PathBytes, library.Album] = defaultdict()
replaced_album_ids = set()
for item in self.imported_items():
dup_items = list(lib.items(dbcore.query.BytesQuery("path", item.path)))
dup_items = list(
lib.items(dbcore.query.BytesQuery("path", item.path))
)
self.replaced_items[item] = dup_items
for dup_item in dup_items:
if not dup_item.album_id or dup_item.album_id in replaced_album_ids:
if (
not dup_item.album_id
or dup_item.album_id in replaced_album_ids
):
continue
replaced_album = dup_item._cached_album
if replaced_album:
@ -942,7 +967,8 @@ class ImportTask(BaseImportTask):
self.album.artpath = replaced_album.artpath
self.album.store()
log.debug(
"Reimported album {}. Preserving attribute ['added']. " "Path: {}",
"Reimported album {}. Preserving attribute ['added']. "
"Path: {}",
self.album.id,
displayable_path(self.album.path),
)
@ -1074,7 +1100,9 @@ class SingletonImportTask(ImportTask):
# Query for existing items using the same metadata. We use a
# temporary `Item` object to generate any computed fields.
tmp_item = library.Item(lib, **info)
keys = cast(list[str], config["import"]["duplicate_keys"]["item"].as_str_seq())
keys = cast(
list[str], config["import"]["duplicate_keys"]["item"].as_str_seq()
)
dup_query = tmp_item.duplicates_query(keys)
found_items = []
@ -1248,7 +1276,9 @@ class ArchiveImportTask(SentinelImportTask):
break
if handler_class is None:
raise ValueError("No handler found for archive: {0}".format(self.toppath))
raise ValueError(
"No handler found for archive: {0}".format(self.toppath)
)
extract_to = mkdtemp()
archive = handler_class(os.fsdecode(self.toppath), mode="r")
@ -1368,7 +1398,9 @@ class ImportTaskFactory:
def singleton(self, path: PathBytes):
"""Return a `SingletonImportTask` for the music file."""
if self.session.already_imported(self.toppath, [path]):
log.debug("Skipping previously-imported path: {0}", displayable_path(path))
log.debug(
"Skipping previously-imported path: {0}", displayable_path(path)
)
self.skipped += 1
return None
@ -1391,7 +1423,9 @@ class ImportTaskFactory:
dirs = list({os.path.dirname(p) for p in paths})
if self.session.already_imported(self.toppath, dirs):
log.debug("Skipping previously-imported path: {0}", displayable_path(dirs))
log.debug(
"Skipping previously-imported path: {0}", displayable_path(dirs)
)
self.skipped += 1
return None
@ -1421,7 +1455,8 @@ class ImportTaskFactory:
if not (self.session.config["move"] or self.session.config["copy"]):
log.warning(
"Archive importing requires either " "'copy' or 'move' to be enabled."
"Archive importing requires either "
"'copy' or 'move' to be enabled."
)
return
@ -1640,7 +1675,9 @@ def resolve_duplicates(session: ImportSession, task: ImportTask):
if task.choice_flag in (action.ASIS, action.APPLY, action.RETAG):
found_duplicates = task.find_duplicates(session.lib)
if found_duplicates:
log.debug("found duplicates: {}".format([o.id for o in found_duplicates]))
log.debug(
"found duplicates: {}".format([o.id for o in found_duplicates])
)
# Get the default action to follow from config.
duplicate_action = config["import"]["duplicate_action"].as_choice(