diff --git a/beets/importer/tasks.py b/beets/importer/tasks.py index e56157ed0..586ef455a 100644 --- a/beets/importer/tasks.py +++ b/beets/importer/tasks.py @@ -18,10 +18,12 @@ import logging import os import re import shutil +import subprocess import time from collections import defaultdict from collections.abc import Callable from enum import Enum +from pathlib import Path from tempfile import mkdtemp from typing import TYPE_CHECKING, Any @@ -1077,6 +1079,12 @@ class ImportTaskFactory: If an item cannot be read, return `None` instead and log an error. """ + + # Check if the file has an extention, + # Add an extention if there isn't one. + if os.path.isfile(path): + path = self.check_extension(path) + try: return library.Item.from_path(path) except library.ReadError as exc: @@ -1090,6 +1098,102 @@ class ImportTaskFactory: "error reading {}: {}", util.displayable_path(path), exc ) + def check_extension(self, path: util.PathBytes): + path = Path(path.decode("utf-8")) + # if there is an extension, ignore + if path.suffix != "": + return path + + # no extension detexted + # use ffprobe to find the format + formats = [] + output = subprocess.run( + [ + "ffprobe", + "-hide_banner", + "-loglevel", + "fatal", + "-show_format", + path, + ], + capture_output=True, + ) + out = output.stdout.decode("utf-8") + err = output.stderr.decode("utf-8") + if err != "": + log.error("ffprobe error\n", err) + for line in out.split("\n"): + if line.startswith("format_name="): + formats = line.split("=")[1].split(",") + # a list of audio formats I got from wikipedia https://en.wikipedia.org/wiki/Audio_file_format + wiki_formats = [ + "3gp", + "aa", + "aac", + "aax", + "act", + "aiff", + "alac", + "amr", + "ape", + "au", + "awb", + "dss", + "dvf", + "flac", + "gsm", + "iklax", + "ivs", + "m4a", + "m4b", + "m4p", + "mmf", + "movpkg", + "mp1", + "mp2", + "mp3", + "mpc", + "msv", + "nmf", + "ogg", + "oga", + "mogg", + "opus", + "ra", + "rm", + "raw", + "rf64", + "sln", + "tta", + "voc", + "vox", + "wav", + "wma", + "wv", + "webm", + "8svx", + "cda", + ] + format = "" + # The first format from ffprobe that is on this list is taken + for f in formats: + if f in wiki_formats: + format = f + break + + # if ffprobe can't find a format, the file is prob not music + if format == "": + return path + + # cp and add ext. If already exist, use that file + # assume, for example, the only diff between 'asdf.mp3' and 'asdf' is format + new_path = path.with_suffix("." + format) + if not new_path.exists(): + util.copy(path, new_path) + else: + log.info("Import file with matching format to original target") + return new_path + MULTIDISC_MARKERS = (rb"dis[ck]", rb"cd") MULTIDISC_PAT_FMT = rb"^(.*%s[\W_]*)\d" diff --git a/docs/changelog.rst b/docs/changelog.rst index 2354e6539..8168b33c7 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -26,6 +26,9 @@ New features :bug:`2661` - :doc:`plugins/play`: Added ``-R``/``--randomize`` flag to shuffle the playlist order before passing it to the player. +- Use ffprobe to recognize format of any import music file that has no + extension. If the file cannot be recognized as a music file, leave it alone. + :bug:`4881` Bug fixes ~~~~~~~~~ diff --git a/test/rsrc/no_ext b/test/rsrc/no_ext new file mode 100644 index 000000000..fed8dc601 Binary files /dev/null and b/test/rsrc/no_ext differ diff --git a/test/rsrc/no_ext_not_music b/test/rsrc/no_ext_not_music new file mode 100644 index 000000000..e69de29bb diff --git a/test/test_importer.py b/test/test_importer.py index 1a8983c11..7de432266 100644 --- a/test/test_importer.py +++ b/test/test_importer.py @@ -351,6 +351,34 @@ class ImportTest(PathsMixin, AutotagImportTestCase): self.prepare_album_for_import(1) self.setup_importer() + def test_recognize_format(self): + resource_path = os.path.join(_common.RSRC, b"no_ext") + self.setup_importer() + self.importer.paths = [resource_path] + self.importer.run() + assert self.lib.items().get().path.endswith(b".mp3") + util.remove(os.path.join(_common.RSRC, b"no_ext.mp3")) + + def test_recognize_format_already_exist(self): + resource_path = os.path.join(_common.RSRC, b"no_ext") + new_path = os.path.join(_common.RSRC, b"no_ext.mp3") + util.copy(resource_path, new_path) + self.setup_importer() + self.importer.paths = [resource_path] + self.importer.run() + assert self.lib.items().get().path.endswith(b".mp3") + with capture_log() as logs: + self.importer.run() + assert "Import file with matching format to original target" in logs + util.remove(new_path) + + def test_recognize_format_not_music(self): + resource_path = os.path.join(_common.RSRC, b"no_ext_not_music") + self.setup_importer() + self.importer.paths = [resource_path] + self.importer.run() + assert len(self.lib.items()) == 0 + def test_asis_moves_album_and_track(self): self.importer.add_choice(importer.Action.ASIS) self.importer.run()