diff --git a/beets/dbcore/query.py b/beets/dbcore/query.py index 7d9f0cee7..ae8e0ddf6 100644 --- a/beets/dbcore/query.py +++ b/beets/dbcore/query.py @@ -411,39 +411,6 @@ class BooleanQuery(MatchQuery[int]): super().__init__(field_name, pattern_int, fast) -class BytesQuery(FieldQuery[bytes]): - """Match a raw bytes field (i.e., a path). This is a necessary hack - to work around the `sqlite3` module's desire to treat `bytes` and - `unicode` equivalently in Python 2. Always use this query instead of - `MatchQuery` when matching on BLOB values. - """ - - def __init__(self, field_name: str, pattern: bytes | str | memoryview): - # Use a buffer/memoryview representation of the pattern for SQLite - # matching. This instructs SQLite to treat the blob as binary - # rather than encoded Unicode. - if isinstance(pattern, (str, bytes)): - if isinstance(pattern, str): - bytes_pattern = pattern.encode("utf-8") - else: - bytes_pattern = pattern - self.buf_pattern = memoryview(bytes_pattern) - elif isinstance(pattern, memoryview): - self.buf_pattern = pattern - bytes_pattern = bytes(pattern) - else: - raise ValueError("pattern must be bytes, str, or memoryview") - - super().__init__(field_name, bytes_pattern) - - def col_clause(self) -> tuple[str, Sequence[SQLiteType]]: - return self.field + " = ?", [self.buf_pattern] - - @classmethod - def value_match(cls, pattern: bytes, value: Any) -> bool: - return pattern == value - - class NumericQuery(FieldQuery[str]): """Matches numeric fields. A syntax using Ruby-style range ellipses (``..``) lets users specify one- or two-sided ranges. For example, diff --git a/beets/importer/tasks.py b/beets/importer/tasks.py index 75f04cf5a..441224b6b 100644 --- a/beets/importer/tasks.py +++ b/beets/importer/tasks.py @@ -26,7 +26,8 @@ from typing import TYPE_CHECKING, Callable, Iterable, Sequence import mediafile -from beets import autotag, config, dbcore, library, plugins, util +from beets import autotag, config, library, plugins, util +from beets.dbcore.query import PathQuery from .state import ImportState @@ -520,9 +521,7 @@ class ImportTask(BaseImportTask): ) replaced_album_ids = set() for item in self.imported_items(): - dup_items = list( - lib.items(query=dbcore.query.BytesQuery("path", item.path)) - ) + dup_items = list(lib.items(query=PathQuery("path", item.path))) self.replaced_items[item] = dup_items for dup_item in dup_items: if ( diff --git a/beets/ui/__init__.py b/beets/ui/__init__.py index 109e39f4d..4f4236ff9 100644 --- a/beets/ui/__init__.py +++ b/beets/ui/__init__.py @@ -104,23 +104,15 @@ def _stream_encoding(stream, default="utf-8"): return stream.encoding or default -def print_(*strings, **kwargs): +def print_(*strings: str, end: str = "\n") -> None: """Like print, but rather than raising an error when a character is not in the terminal's encoding's character set, just silently replaces it. - The arguments must be Unicode strings: `unicode` on Python 2; `str` on - Python 3. - The `end` keyword argument behaves similarly to the built-in `print` (it defaults to a newline). """ - if not strings: - strings = [""] - assert isinstance(strings[0], str) - - txt = " ".join(strings) - txt += kwargs.get("end", "\n") + txt = " ".join(strings or ("",)) + end # Encode the string and write it to stdout. # On Python 3, sys.stdout expects text strings and uses the diff --git a/beets/ui/commands.py b/beets/ui/commands.py index 7153f30be..25af95646 100755 --- a/beets/ui/commands.py +++ b/beets/ui/commands.py @@ -1302,7 +1302,7 @@ class TerminalImportSession(importer.ImportSession): # The import command. -def import_files(lib, paths, query): +def import_files(lib, paths: list[bytes], query): """Import the files in the given list of paths or matching the query. """ @@ -1333,7 +1333,7 @@ def import_files(lib, paths, query): plugins.send("import", lib=lib, paths=paths) -def import_func(lib, opts, args): +def import_func(lib, opts, args: list[str]): config["import"].set_args(opts) # Special case: --copy flag suppresses import_move (which would @@ -1355,15 +1355,11 @@ def import_func(lib, opts, args): if not paths and not paths_from_logfiles: raise ui.UserError("no path specified") - # On Python 2, we used to get filenames as raw bytes, which is - # what we need. On Python 3, we need to undo the "helpful" - # conversion to Unicode strings to get the real bytestring - # filename. - paths = [os.fsencode(p) for p in paths] + byte_paths = [os.fsencode(p) for p in paths] paths_from_logfiles = [os.fsencode(p) for p in paths_from_logfiles] # Check the user-specified directories. - for path in paths: + for path in byte_paths: if not os.path.exists(syspath(normpath(path))): raise ui.UserError( "no such file or directory: {}".format( @@ -1384,14 +1380,14 @@ def import_func(lib, opts, args): ) continue - paths.append(path) + byte_paths.append(path) # If all paths were read from a logfile, and none of them exist, throw # an error if not paths: raise ui.UserError("none of the paths are importable") - import_files(lib, paths, query) + import_files(lib, byte_paths, query) import_cmd = ui.Subcommand( diff --git a/beets/util/__init__.py b/beets/util/__init__.py index c1c76c860..00c9ce05d 100644 --- a/beets/util/__init__.py +++ b/beets/util/__init__.py @@ -28,6 +28,7 @@ import sys import tempfile import traceback from collections import Counter +from collections.abc import Sequence from contextlib import suppress from enum import Enum from functools import cache @@ -41,7 +42,6 @@ from typing import ( AnyStr, Callable, Generic, - Iterable, NamedTuple, TypeVar, Union, @@ -53,23 +53,17 @@ import beets from beets.util import hidden if TYPE_CHECKING: - from collections.abc import Iterator, Sequence + from collections.abc import Iterable, Iterator from logging import Logger from beets.library import Item -if sys.version_info >= (3, 10): - from typing import TypeAlias -else: - from typing_extensions import TypeAlias - MAX_FILENAME_LENGTH = 200 WINDOWS_MAGIC_PREFIX = "\\\\?\\" T = TypeVar("T") -BytesOrStr = Union[str, bytes] -PathLike = Union[BytesOrStr, Path] -Replacements: TypeAlias = "Sequence[tuple[Pattern[str], str]]" +PathLike = Union[str, bytes, Path] +Replacements = Sequence[tuple[Pattern[str], str]] # Here for now to allow for a easy replace later on # once we can move to a PathLike (mainly used in importer) @@ -860,7 +854,9 @@ class CommandOutput(NamedTuple): stderr: bytes -def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput: +def command_output( + cmd: list[str] | list[bytes], shell: bool = False +) -> CommandOutput: """Runs the command and returns its output after it has exited. Returns a CommandOutput. The attributes ``stdout`` and ``stderr`` contain @@ -878,8 +874,6 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput: This replaces `subprocess.check_output` which can have problems if lots of output is sent to stderr. """ - converted_cmd = [os.fsdecode(a) for a in cmd] - devnull = subprocess.DEVNULL proc = subprocess.Popen( @@ -894,7 +888,7 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput: if proc.returncode: raise subprocess.CalledProcessError( returncode=proc.returncode, - cmd=" ".join(converted_cmd), + cmd=" ".join(map(os.fsdecode, cmd)), output=stdout + stderr, ) return CommandOutput(stdout, stderr) diff --git a/beets/util/artresizer.py b/beets/util/artresizer.py index 33b98c413..fe67c506e 100644 --- a/beets/util/artresizer.py +++ b/beets/util/artresizer.py @@ -214,9 +214,9 @@ class IMBackend(LocalBackend): else: return cls._version - convert_cmd: list[str | bytes] - identify_cmd: list[str | bytes] - compare_cmd: list[str | bytes] + convert_cmd: list[str] + identify_cmd: list[str] + compare_cmd: list[str] def __init__(self) -> None: """Initialize a wrapper around ImageMagick for local image operations. @@ -265,7 +265,7 @@ class IMBackend(LocalBackend): # with regards to the height. # ImageMagick already seems to default to no interlace, but we include # it here for the sake of explicitness. - cmd: list[str | bytes] = self.convert_cmd + [ + cmd: list[str] = self.convert_cmd + [ syspath(path_in, prefix=False), "-resize", f"{maxwidth}x>", @@ -295,7 +295,7 @@ class IMBackend(LocalBackend): return path_out def get_size(self, path_in: bytes) -> tuple[int, int] | None: - cmd: list[str | bytes] = self.identify_cmd + [ + cmd: list[str] = self.identify_cmd + [ "-format", "%w %h", syspath(path_in, prefix=False), @@ -480,10 +480,11 @@ class IMBackend(LocalBackend): return True def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None: - assignments = list( - chain.from_iterable(("-set", k, v) for k, v in metadata.items()) + assignments = chain.from_iterable( + ("-set", k, v) for k, v in metadata.items() ) - command = self.convert_cmd + [file, *assignments, file] + str_file = os.fsdecode(file) + command = self.convert_cmd + [str_file, *assignments, str_file] util.command_output(command) diff --git a/beetsplug/hook.py b/beetsplug/hook.py index 5ce5ef828..90d66553a 100644 --- a/beetsplug/hook.py +++ b/beetsplug/hook.py @@ -14,27 +14,21 @@ """Allows custom commands to be run when an event is emitted by beets""" +from __future__ import annotations + +import os import shlex import string import subprocess -import sys +from typing import Any from beets.plugins import BeetsPlugin -class CodingFormatter(string.Formatter): - """A variant of `string.Formatter` that converts everything to `unicode` - strings. +class BytesToStrFormatter(string.Formatter): + """A variant of `string.Formatter` that converts `bytes` to `str`.""" - This was necessary on Python 2, in needs to be kept for backwards - compatibility. - """ - - def __init__(self, coding): - """Creates a new coding formatter with the provided coding.""" - self._coding = coding - - def convert_field(self, value, conversion): + def convert_field(self, value: Any, conversion: str | None) -> Any: """Converts the provided value given a conversion type. This method decodes the converted value using the formatter's coding. @@ -42,7 +36,7 @@ class CodingFormatter(string.Formatter): converted = super().convert_field(value, conversion) if isinstance(converted, bytes): - return converted.decode(self._coding) + return os.fsdecode(converted) return converted @@ -72,8 +66,8 @@ class HookPlugin(BeetsPlugin): return # For backwards compatibility, use a string formatter that decodes - # bytes (in particular, paths) to unicode strings. - formatter = CodingFormatter(sys.getfilesystemencoding()) + # bytes (in particular, paths) to strings. + formatter = BytesToStrFormatter() command_pieces = [ formatter.format(piece, event=event, **kwargs) for piece in shlex.split(command) diff --git a/beetsplug/replaygain.py b/beetsplug/replaygain.py index df37717b9..00b651d99 100644 --- a/beetsplug/replaygain.py +++ b/beetsplug/replaygain.py @@ -62,7 +62,7 @@ class FatalGstreamerPluginReplayGainError(FatalReplayGainError): loading the required plugins.""" -def call(args: list[Any], log: Logger, **kwargs: Any): +def call(args: list[str], log: Logger, **kwargs: Any): """Execute the command and return its output or raise a ReplayGainError on failure. """ @@ -73,11 +73,6 @@ def call(args: list[Any], log: Logger, **kwargs: Any): raise ReplayGainError( "{} exited with status {}".format(args[0], e.returncode) ) - except UnicodeEncodeError: - # Due to a bug in Python 2's subprocess on Windows, Unicode - # filenames can fail to encode on that platform. See: - # https://github.com/google-code-export/beets/issues/499 - raise ReplayGainError("argument encoding failed") def db_to_lufs(db: float) -> float: @@ -403,20 +398,18 @@ class FfmpegBackend(Backend): def _construct_cmd( self, item: Item, peak_method: PeakMethod | None - ) -> list[str | bytes]: + ) -> list[str]: """Construct the shell command to analyse items.""" return [ self._ffmpeg_path, "-nostats", "-hide_banner", "-i", - item.path, + str(item.filepath), "-map", "a:0", "-filter", - "ebur128=peak={}".format( - "none" if peak_method is None else peak_method.name - ), + f"ebur128=peak={'none' if peak_method is None else peak_method.name}", "-f", "null", "-", @@ -660,7 +653,7 @@ class CommandBackend(Backend): # tag-writing; this turns the mp3gain/aacgain tool into a gain # calculator rather than a tag manipulator because we take care # of changing tags ourselves. - cmd: list[bytes | str] = [self.command, "-o", "-s", "s"] + cmd: list[str] = [self.command, "-o", "-s", "s"] if self.noclip: # Adjust to avoid clipping. cmd = cmd + ["-k"] @@ -1039,7 +1032,7 @@ class AudioToolsBackend(Backend): os.fsdecode(syspath(item.path)) ) except OSError: - raise ReplayGainError(f"File {item.path} was not found") + raise ReplayGainError(f"File {item.filepath} was not found") except self._mod_audiotools.UnsupportedFile: raise ReplayGainError(f"Unsupported file type {item.format}") diff --git a/beetsplug/web/__init__.py b/beetsplug/web/__init__.py index f05d1903e..559f0622c 100644 --- a/beetsplug/web/__init__.py +++ b/beetsplug/web/__init__.py @@ -308,18 +308,8 @@ def all_items(): def item_file(item_id): item = g.lib.get_item(item_id) - # On Windows under Python 2, Flask wants a Unicode path. On Python 3, it - # *always* wants a Unicode path. - if os.name == "nt": - item_path = util.syspath(item.path) - else: - item_path = os.fsdecode(item.path) - + item_path = util.syspath(item.path) base_filename = os.path.basename(item_path) - if isinstance(base_filename, bytes): - unicode_base_filename = util.displayable_path(base_filename) - else: - unicode_base_filename = base_filename try: # Imitate http.server behaviour @@ -327,7 +317,7 @@ def item_file(item_id): except UnicodeError: safe_filename = unidecode(base_filename) else: - safe_filename = unicode_base_filename + safe_filename = base_filename response = flask.send_file( item_path, as_attachment=True, download_name=safe_filename