Remove assert_file_in_lib

This commit is contained in:
Šarūnas Nejus 2025-06-01 10:06:24 +01:00
parent 452644bbf3
commit e36e8f1f51
No known key found for this signature in database
GPG key ID: DD28F6704DBE3435
2 changed files with 114 additions and 226 deletions

View file

@ -645,18 +645,6 @@ class ImportHelper(TestHelper):
def setup_singleton_importer(self, **kwargs) -> ImportSession: def setup_singleton_importer(self, **kwargs) -> ImportSession:
return self.setup_importer(singletons=True, **kwargs) return self.setup_importer(singletons=True, **kwargs)
def assert_file_in_lib(self, *segments):
"""Join the ``segments`` and assert that this path exists in the
library directory.
"""
assert self.lib_path.joinpath(*segments).exists()
def assert_file_not_in_lib(self, *segments):
"""Join the ``segments`` and assert that this path does not
exist in the library directory.
"""
assert not self.lib_path.joinpath(*segments).exists()
class AsIsImporterMixin: class AsIsImporterMixin:
def setUp(self): def setUp(self):

View file

@ -15,6 +15,8 @@
"""Tests for the general importer functionality.""" """Tests for the general importer functionality."""
from __future__ import annotations
import os import os
import re import re
import shutil import shutil
@ -22,6 +24,7 @@ import stat
import sys import sys
import unicodedata import unicodedata
import unittest import unittest
from functools import cached_property
from io import StringIO from io import StringIO
from pathlib import Path from pathlib import Path
from tarfile import TarFile from tarfile import TarFile
@ -51,87 +54,71 @@ from beets.test.helper import (
from beets.util import bytestring_path, displayable_path, syspath from beets.util import bytestring_path, displayable_path, syspath
class PathsMixin:
import_media: list[MediaFile]
@cached_property
def track_import_path(self) -> Path:
return Path(self.import_media[0].path)
@cached_property
def album_path(self) -> Path:
return self.track_import_path.parent
@cached_property
def track_lib_path(self):
return self.lib_path / "Tag Artist" / "Tag Album" / "Tag Track 1.mp3"
@_common.slow_test() @_common.slow_test()
class NonAutotaggedImportTest(AsIsImporterMixin, ImportTestCase): class NonAutotaggedImportTest(PathsMixin, AsIsImporterMixin, ImportTestCase):
db_on_disk = True db_on_disk = True
def test_album_created_with_track_artist(self): def test_album_created_with_track_artist(self):
self.run_asis_importer() self.run_asis_importer()
albums = self.lib.albums() albums = self.lib.albums()
assert len(albums) == 1 assert len(albums) == 1
assert albums[0].albumartist == "Tag Artist" assert albums[0].albumartist == "Tag Artist"
def test_import_copy_arrives(self): def test_import_copy_arrives(self):
self.run_asis_importer() self.run_asis_importer()
for mediafile in self.import_media:
self.assert_file_in_lib( assert self.track_lib_path.exists()
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
def test_threaded_import_copy_arrives(self): def test_threaded_import_copy_arrives(self):
config["threaded"] = True config["threaded"] = True
self.run_asis_importer() self.run_asis_importer()
for mediafile in self.import_media: assert self.track_lib_path.exists()
self.assert_file_in_lib(
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
def test_import_with_move_deletes_import_files(self): def test_import_with_move_deletes_import_files(self):
for mediafile in self.import_media: assert self.album_path.exists()
assert Path(mediafile.path).exists() assert self.track_import_path.exists()
self.run_asis_importer(move=True) (self.album_path / "alog.log").touch()
for mediafile in self.import_media:
assert not Path(mediafile.path).exists()
def test_import_with_move_prunes_directory_empty(self):
album_path = self.import_path / "album"
assert album_path.exists()
self.run_asis_importer(move=True)
assert not album_path.exists()
def test_import_with_move_prunes_with_extra_clutter(self):
self.touch(os.path.join(self.import_dir, b"album", b"alog.log"))
config["clutter"] = ["*.log"] config["clutter"] = ["*.log"]
album_path = self.import_path / "album"
assert album_path.exists()
self.run_asis_importer(move=True) self.run_asis_importer(move=True)
assert not album_path.exists()
assert not self.track_import_path.exists()
assert not self.album_path.exists()
def test_threaded_import_move_arrives(self): def test_threaded_import_move_arrives(self):
self.run_asis_importer(move=True, threaded=True) self.run_asis_importer(move=True, threaded=True)
for mediafile in self.import_media:
self.assert_file_in_lib(
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
def test_threaded_import_move_deletes_import(self): assert self.track_lib_path.exists()
self.run_asis_importer(move=True, threaded=True) assert not self.track_import_path.exists()
for mediafile in self.import_media:
assert not Path(mediafile.path).exists()
def test_import_without_delete_retains_files(self): def test_import_without_delete_retains_files(self):
self.run_asis_importer(delete=False) self.run_asis_importer(delete=False)
for mediafile in self.import_media:
assert Path(mediafile.path).exists() assert self.track_import_path.exists()
def test_import_with_delete_removes_files(self): def test_import_with_delete_removes_files(self):
self.run_asis_importer(delete=True) self.run_asis_importer(delete=True)
for mediafile in self.import_media:
assert not Path(mediafile.path).exists()
def test_import_with_delete_prunes_directory_empty(self): assert not self.album_path.exists()
album_path = self.import_path / "album" assert not self.track_import_path.exists()
assert album_path.exists()
self.run_asis_importer(delete=True)
assert not album_path.exists()
def test_album_mb_albumartistids(self): def test_album_mb_albumartistids(self):
self.run_asis_importer() self.run_asis_importer()
@ -141,51 +128,36 @@ class NonAutotaggedImportTest(AsIsImporterMixin, ImportTestCase):
@unittest.skipUnless(_common.HAVE_SYMLINK, "need symlinks") @unittest.skipUnless(_common.HAVE_SYMLINK, "need symlinks")
def test_import_link_arrives(self): def test_import_link_arrives(self):
self.run_asis_importer(link=True) self.run_asis_importer(link=True)
for mediafile in self.import_media:
path = ( assert self.track_lib_path.exists()
self.lib_path / "Tag Artist" / "Tag Album" / "Tag Track 1.mp3" assert self.track_lib_path.is_symlink()
self.assert_equal_path(
self.track_lib_path.resolve(), self.track_import_path
) )
assert path.exists()
assert path.is_symlink()
self.assert_equal_path(path.resolve(), mediafile.path)
@unittest.skipUnless(_common.HAVE_HARDLINK, "need hardlinks") @unittest.skipUnless(_common.HAVE_HARDLINK, "need hardlinks")
def test_import_hardlink_arrives(self): def test_import_hardlink_arrives(self):
self.run_asis_importer(hardlink=True) self.run_asis_importer(hardlink=True)
for mediafile in self.import_media:
path = (
self.lib_path / "Tag Artist" / "Tag Album" / "Tag Track 1.mp3"
)
assert path.exists()
s1 = os.stat(syspath(mediafile.path)) assert self.track_lib_path.exists()
s2 = path.stat() media_stat = self.track_import_path.stat()
assert (s1[stat.ST_INO], s1[stat.ST_DEV]) == ( lib_media_stat = self.track_lib_path.stat()
s2[stat.ST_INO], assert media_stat[stat.ST_INO] == lib_media_stat[stat.ST_INO]
s2[stat.ST_DEV], assert media_stat[stat.ST_DEV] == lib_media_stat[stat.ST_DEV]
)
@NEEDS_REFLINK @NEEDS_REFLINK
def test_import_reflink_arrives(self): def test_import_reflink_arrives(self):
# Detecting reflinks is currently tricky due to various fs # Detecting reflinks is currently tricky due to various fs
# implementations, we'll just check the file exists. # implementations, we'll just check the file exists.
self.run_asis_importer(reflink=True) self.run_asis_importer(reflink=True)
for mediafile in self.import_media:
self.assert_file_in_lib( assert self.track_lib_path.exists()
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
def test_import_reflink_auto_arrives(self): def test_import_reflink_auto_arrives(self):
# Should pass regardless of reflink support due to fallback. # Should pass regardless of reflink support due to fallback.
self.run_asis_importer(reflink="auto") self.run_asis_importer(reflink="auto")
for mediafile in self.import_media:
self.assert_file_in_lib( assert self.track_lib_path.exists()
b"Tag Artist",
b"Tag Album",
util.bytestring_path(f"{mediafile.title}.mp3"),
)
def create_archive(session): def create_archive(session):
@ -271,52 +243,36 @@ class ImportSingletonTest(AutotagImportTestCase):
self.prepare_album_for_import(1) self.prepare_album_for_import(1)
self.importer = self.setup_singleton_importer() self.importer = self.setup_singleton_importer()
def test_apply_asis_adds_track(self): def test_apply_asis_adds_only_singleton_track(self):
assert self.lib.items().get() is None
self.importer.add_choice(importer.Action.ASIS) self.importer.add_choice(importer.Action.ASIS)
self.importer.run() self.importer.run()
# album not added
assert not self.lib.albums()
assert self.lib.items().get().title == "Tag Track 1" assert self.lib.items().get().title == "Tag Track 1"
assert (self.lib_path / "singletons" / "Tag Track 1.mp3").exists()
def test_apply_asis_does_not_add_album(self):
assert self.lib.albums().get() is None
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.albums().get() is None
def test_apply_asis_adds_singleton_path(self):
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
self.assert_file_in_lib(b"singletons", b"Tag Track 1.mp3")
def test_apply_candidate_adds_track(self): def test_apply_candidate_adds_track(self):
assert self.lib.items().get() is None
self.importer.add_choice(importer.Action.APPLY) self.importer.add_choice(importer.Action.APPLY)
self.importer.run() self.importer.run()
assert not self.lib.albums()
assert self.lib.items().get().title == "Applied Track 1" assert self.lib.items().get().title == "Applied Track 1"
assert (self.lib_path / "singletons" / "Applied Track 1.mp3").exists()
def test_apply_candidate_does_not_add_album(self): def test_skip_does_not_add_track(self):
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.albums().get() is None
def test_apply_candidate_adds_singleton_path(self):
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assert_file_in_lib(b"singletons", b"Applied Track 1.mp3")
def test_skip_does_not_add_first_track(self):
self.importer.add_choice(importer.Action.SKIP) self.importer.add_choice(importer.Action.SKIP)
self.importer.run() self.importer.run()
assert self.lib.items().get() is None
def test_skip_adds_other_tracks(self): assert not self.lib.items()
def test_skip_first_add_second_asis(self):
self.prepare_album_for_import(2) self.prepare_album_for_import(2)
self.importer.add_choice(importer.Action.SKIP) self.importer.add_choice(importer.Action.SKIP)
self.importer.add_choice(importer.Action.ASIS) self.importer.add_choice(importer.Action.ASIS)
self.importer.run() self.importer.run()
assert len(self.lib.items()) == 1 assert len(self.lib.items()) == 1
def test_import_single_files(self): def test_import_single_files(self):
@ -365,7 +321,7 @@ class ImportSingletonTest(AutotagImportTestCase):
item.remove() item.remove()
# Autotagged. # Autotagged.
assert self.lib.albums().get() is None assert not self.lib.albums()
self.importer.clear_choices() self.importer.clear_choices()
self.importer.add_choice(importer.Action.APPLY) self.importer.add_choice(importer.Action.APPLY)
self.importer.run() self.importer.run()
@ -378,7 +334,7 @@ class ImportSingletonTest(AutotagImportTestCase):
assert item.disc == disc assert item.disc == disc
class ImportTest(AutotagImportTestCase): class ImportTest(PathsMixin, AutotagImportTestCase):
"""Test APPLY, ASIS and SKIP choices.""" """Test APPLY, ASIS and SKIP choices."""
def setUp(self): def setUp(self):
@ -386,44 +342,23 @@ class ImportTest(AutotagImportTestCase):
self.prepare_album_for_import(1) self.prepare_album_for_import(1)
self.setup_importer() self.setup_importer()
def test_apply_asis_adds_album(self): def test_asis_moves_album_and_track(self):
assert self.lib.albums().get() is None
self.importer.add_choice(importer.Action.ASIS) self.importer.add_choice(importer.Action.ASIS)
self.importer.run() self.importer.run()
assert self.lib.albums().get().album == "Tag Album" assert self.lib.albums().get().album == "Tag Album"
item = self.lib.items().get()
assert item.title == "Tag Track 1"
assert item.filepath.exists()
def test_apply_asis_adds_tracks(self): def test_apply_moves_album_and_track(self):
assert self.lib.items().get() is None
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
assert self.lib.items().get().title == "Tag Track 1"
def test_apply_asis_adds_album_path(self):
self.importer.add_choice(importer.Action.ASIS)
self.importer.run()
self.assert_file_in_lib(b"Tag Artist", b"Tag Album", b"Tag Track 1.mp3")
def test_apply_candidate_adds_album(self):
assert self.lib.albums().get() is None
self.importer.add_choice(importer.Action.APPLY) self.importer.add_choice(importer.Action.APPLY)
self.importer.run() self.importer.run()
assert self.lib.albums().get().album == "Applied Album" assert self.lib.albums().get().album == "Applied Album"
item = self.lib.items().get()
def test_apply_candidate_adds_tracks(self): assert item.title == "Applied Track 1"
assert self.lib.items().get() is None assert item.filepath.exists()
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
assert self.lib.items().get().title == "Applied Track 1"
def test_apply_candidate_adds_album_path(self):
self.importer.add_choice(importer.Action.APPLY)
self.importer.run()
self.assert_file_in_lib(
b"Applied Artist", b"Applied Album", b"Applied Track 1.mp3"
)
def test_apply_from_scratch_removes_other_metadata(self): def test_apply_from_scratch_removes_other_metadata(self):
config["import"]["from_scratch"] = True config["import"]["from_scratch"] = True
@ -452,35 +387,35 @@ class ImportTest(AutotagImportTestCase):
assert self.lib.items().get().bitrate == bitrate assert self.lib.items().get().bitrate == bitrate
def test_apply_with_move_deletes_import(self): def test_apply_with_move_deletes_import(self):
assert self.track_import_path.exists()
config["import"]["move"] = True config["import"]["move"] = True
track_path = Path(self.import_media[0].path)
assert track_path.exists()
self.importer.add_choice(importer.Action.APPLY) self.importer.add_choice(importer.Action.APPLY)
self.importer.run() self.importer.run()
assert not track_path.exists()
assert not self.track_import_path.exists()
def test_apply_with_delete_deletes_import(self): def test_apply_with_delete_deletes_import(self):
assert self.track_import_path.exists()
config["import"]["delete"] = True config["import"]["delete"] = True
track_path = Path(self.import_media[0].path)
assert track_path.exists()
self.importer.add_choice(importer.Action.APPLY) self.importer.add_choice(importer.Action.APPLY)
self.importer.run() self.importer.run()
assert not track_path.exists()
assert not self.track_import_path.exists()
def test_skip_does_not_add_track(self): def test_skip_does_not_add_track(self):
self.importer.add_choice(importer.Action.SKIP) self.importer.add_choice(importer.Action.SKIP)
self.importer.run() self.importer.run()
assert self.lib.items().get() is None
assert not self.lib.items()
def test_skip_non_album_dirs(self): def test_skip_non_album_dirs(self):
self.assertIsDir(os.path.join(self.import_dir, b"album")) self.assertIsDir(os.path.join(self.import_dir, b"album"))
self.touch(b"cruft", dir=self.import_dir) self.touch(b"cruft", dir=self.import_dir)
self.importer.add_choice(importer.Action.APPLY) self.importer.add_choice(importer.Action.APPLY)
self.importer.run() self.importer.run()
assert len(self.lib.albums()) == 1 assert len(self.lib.albums()) == 1
def test_unmatched_tracks_not_added(self): def test_unmatched_tracks_not_added(self):
@ -584,22 +519,21 @@ class ImportTracksTest(AutotagImportTestCase):
self.setup_importer() self.setup_importer()
def test_apply_tracks_adds_singleton_track(self): def test_apply_tracks_adds_singleton_track(self):
assert self.lib.items().get() is None
assert self.lib.albums().get() is None
self.importer.add_choice(importer.Action.TRACKS) self.importer.add_choice(importer.Action.TRACKS)
self.importer.add_choice(importer.Action.APPLY) self.importer.add_choice(importer.Action.APPLY)
self.importer.add_choice(importer.Action.APPLY) self.importer.add_choice(importer.Action.APPLY)
self.importer.run() self.importer.run()
assert self.lib.items().get().title == "Applied Track 1" assert self.lib.items().get().title == "Applied Track 1"
assert self.lib.albums().get() is None assert not self.lib.albums()
def test_apply_tracks_adds_singleton_path(self): def test_apply_tracks_adds_singleton_path(self):
self.importer.add_choice(importer.Action.TRACKS) self.importer.add_choice(importer.Action.TRACKS)
self.importer.add_choice(importer.Action.APPLY) self.importer.add_choice(importer.Action.APPLY)
self.importer.add_choice(importer.Action.APPLY) self.importer.add_choice(importer.Action.APPLY)
self.importer.run() self.importer.run()
self.assert_file_in_lib(b"singletons", b"Applied Track 1.mp3")
assert (self.lib_path / "singletons" / "Applied Track 1.mp3").exists()
class ImportCompilationTest(AutotagImportTestCase): class ImportCompilationTest(AutotagImportTestCase):
@ -707,7 +641,7 @@ class ImportCompilationTest(AutotagImportTestCase):
assert asserted_multi_artists_1 assert asserted_multi_artists_1
class ImportExistingTest(AutotagImportTestCase): class ImportExistingTest(PathsMixin, AutotagImportTestCase):
"""Test importing files that are already in the library directory.""" """Test importing files that are already in the library directory."""
def setUp(self): def setUp(self):
@ -717,20 +651,23 @@ class ImportExistingTest(AutotagImportTestCase):
self.reimporter = self.setup_importer(import_dir=self.libdir) self.reimporter = self.setup_importer(import_dir=self.libdir)
self.importer = self.setup_importer() self.importer = self.setup_importer()
def test_does_not_duplicate_item(self): def tearDown(self):
super().tearDown()
self.matcher.restore()
@cached_property
def applied_track_path(self) -> Path:
return Path(str(self.track_lib_path).replace("Tag", "Applied"))
def test_does_not_duplicate_item_nor_album(self):
self.importer.run() self.importer.run()
assert len(self.lib.items()) == 1 assert len(self.lib.items()) == 1
self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run()
assert len(self.lib.items()) == 1
def test_does_not_duplicate_album(self):
self.importer.run()
assert len(self.lib.albums()) == 1 assert len(self.lib.albums()) == 1
self.reimporter.add_choice(importer.Action.APPLY) self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run() self.reimporter.run()
assert len(self.lib.items()) == 1
assert len(self.lib.albums()) == 1 assert len(self.lib.albums()) == 1
def test_does_not_duplicate_singleton_track(self): def test_does_not_duplicate_singleton_track(self):
@ -744,33 +681,19 @@ class ImportExistingTest(AutotagImportTestCase):
self.reimporter.run() self.reimporter.run()
assert len(self.lib.items()) == 1 assert len(self.lib.items()) == 1
def test_asis_updates_metadata(self): def test_asis_updates_metadata_and_moves_file(self):
self.importer.run() self.importer.run()
medium = MediaFile(self.lib.items().get().path) medium = MediaFile(self.lib.items().get().path)
medium.title = "New Title" medium.title = "New Title"
medium.save() medium.save()
self.reimporter.add_choice(importer.Action.ASIS) self.reimporter.add_choice(importer.Action.ASIS)
self.reimporter.run() self.reimporter.run()
assert self.lib.items().get().title == "New Title" assert self.lib.items().get().title == "New Title"
assert not self.applied_track_path.exists()
def test_asis_updated_moves_file(self): assert self.applied_track_path.with_name("New Title.mp3").exists()
self.importer.run()
medium = MediaFile(self.lib.items().get().path)
medium.title = "New Title"
medium.save()
old_path = os.path.join(
b"Applied Artist", b"Applied Album", b"Applied Track 1.mp3"
)
self.assert_file_in_lib(old_path)
self.reimporter.add_choice(importer.Action.ASIS)
self.reimporter.run()
self.assert_file_in_lib(
b"Applied Artist", b"Applied Album", b"New Title.mp3"
)
self.assert_file_not_in_lib(old_path)
def test_asis_updated_without_copy_does_not_move_file(self): def test_asis_updated_without_copy_does_not_move_file(self):
self.importer.run() self.importer.run()
@ -778,18 +701,12 @@ class ImportExistingTest(AutotagImportTestCase):
medium.title = "New Title" medium.title = "New Title"
medium.save() medium.save()
old_path = os.path.join(
b"Applied Artist", b"Applied Album", b"Applied Track 1.mp3"
)
self.assert_file_in_lib(old_path)
config["import"]["copy"] = False config["import"]["copy"] = False
self.reimporter.add_choice(importer.Action.ASIS) self.reimporter.add_choice(importer.Action.ASIS)
self.reimporter.run() self.reimporter.run()
self.assert_file_not_in_lib(
b"Applied Artist", b"Applied Album", b"New Title.mp3" assert self.applied_track_path.exists()
) assert not self.applied_track_path.with_name("New Title.mp3").exists()
self.assert_file_in_lib(old_path)
def test_outside_file_is_copied(self): def test_outside_file_is_copied(self):
config["import"]["copy"] = False config["import"]["copy"] = False
@ -801,26 +718,9 @@ class ImportExistingTest(AutotagImportTestCase):
self.reimporter = self.setup_importer() self.reimporter = self.setup_importer()
self.reimporter.add_choice(importer.Action.APPLY) self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run() self.reimporter.run()
new_path = os.path.join(
b"Applied Artist", b"Applied Album", b"Applied Track 1.mp3"
)
self.assert_file_in_lib(new_path) assert self.applied_track_path.exists()
self.assert_equal_path( assert self.lib.items().get().filepath == self.applied_track_path
self.lib.items().get().path, os.path.join(self.libdir, new_path)
)
def test_outside_file_is_moved(self):
config["import"]["copy"] = False
self.importer.run()
self.assert_equal_path(
self.lib.items().get().path, self.import_media[0].path
)
self.reimporter = self.setup_importer(move=True)
self.reimporter.add_choice(importer.Action.APPLY)
self.reimporter.run()
assert not Path(self.import_media[0].path).exists()
class GroupAlbumsImportTest(AutotagImportTestCase): class GroupAlbumsImportTest(AutotagImportTestCase):