mirror of
https://github.com/beetbox/beets.git
synced 2025-12-06 00:24:25 +01:00
Remove all Python 2 references
This commit is contained in:
parent
7cada1c9f8
commit
4260162d44
9 changed files with 46 additions and 120 deletions
|
|
@ -411,39 +411,6 @@ class BooleanQuery(MatchQuery[int]):
|
|||
super().__init__(field_name, pattern_int, fast)
|
||||
|
||||
|
||||
class BytesQuery(FieldQuery[bytes]):
|
||||
"""Match a raw bytes field (i.e., a path). This is a necessary hack
|
||||
to work around the `sqlite3` module's desire to treat `bytes` and
|
||||
`unicode` equivalently in Python 2. Always use this query instead of
|
||||
`MatchQuery` when matching on BLOB values.
|
||||
"""
|
||||
|
||||
def __init__(self, field_name: str, pattern: bytes | str | memoryview):
|
||||
# Use a buffer/memoryview representation of the pattern for SQLite
|
||||
# matching. This instructs SQLite to treat the blob as binary
|
||||
# rather than encoded Unicode.
|
||||
if isinstance(pattern, (str, bytes)):
|
||||
if isinstance(pattern, str):
|
||||
bytes_pattern = pattern.encode("utf-8")
|
||||
else:
|
||||
bytes_pattern = pattern
|
||||
self.buf_pattern = memoryview(bytes_pattern)
|
||||
elif isinstance(pattern, memoryview):
|
||||
self.buf_pattern = pattern
|
||||
bytes_pattern = bytes(pattern)
|
||||
else:
|
||||
raise ValueError("pattern must be bytes, str, or memoryview")
|
||||
|
||||
super().__init__(field_name, bytes_pattern)
|
||||
|
||||
def col_clause(self) -> tuple[str, Sequence[SQLiteType]]:
|
||||
return self.field + " = ?", [self.buf_pattern]
|
||||
|
||||
@classmethod
|
||||
def value_match(cls, pattern: bytes, value: Any) -> bool:
|
||||
return pattern == value
|
||||
|
||||
|
||||
class NumericQuery(FieldQuery[str]):
|
||||
"""Matches numeric fields. A syntax using Ruby-style range ellipses
|
||||
(``..``) lets users specify one- or two-sided ranges. For example,
|
||||
|
|
|
|||
|
|
@ -26,7 +26,8 @@ from typing import TYPE_CHECKING, Callable, Iterable, Sequence
|
|||
|
||||
import mediafile
|
||||
|
||||
from beets import autotag, config, dbcore, library, plugins, util
|
||||
from beets import autotag, config, library, plugins, util
|
||||
from beets.dbcore.query import PathQuery
|
||||
|
||||
from .state import ImportState
|
||||
|
||||
|
|
@ -520,9 +521,7 @@ class ImportTask(BaseImportTask):
|
|||
)
|
||||
replaced_album_ids = set()
|
||||
for item in self.imported_items():
|
||||
dup_items = list(
|
||||
lib.items(query=dbcore.query.BytesQuery("path", item.path))
|
||||
)
|
||||
dup_items = list(lib.items(query=PathQuery("path", item.path)))
|
||||
self.replaced_items[item] = dup_items
|
||||
for dup_item in dup_items:
|
||||
if (
|
||||
|
|
|
|||
|
|
@ -104,23 +104,15 @@ def _stream_encoding(stream, default="utf-8"):
|
|||
return stream.encoding or default
|
||||
|
||||
|
||||
def print_(*strings, **kwargs):
|
||||
def print_(*strings: str, end: str = "\n") -> None:
|
||||
"""Like print, but rather than raising an error when a character
|
||||
is not in the terminal's encoding's character set, just silently
|
||||
replaces it.
|
||||
|
||||
The arguments must be Unicode strings: `unicode` on Python 2; `str` on
|
||||
Python 3.
|
||||
|
||||
The `end` keyword argument behaves similarly to the built-in `print`
|
||||
(it defaults to a newline).
|
||||
"""
|
||||
if not strings:
|
||||
strings = [""]
|
||||
assert isinstance(strings[0], str)
|
||||
|
||||
txt = " ".join(strings)
|
||||
txt += kwargs.get("end", "\n")
|
||||
txt = " ".join(strings or ("",)) + end
|
||||
|
||||
# Encode the string and write it to stdout.
|
||||
# On Python 3, sys.stdout expects text strings and uses the
|
||||
|
|
|
|||
|
|
@ -1302,7 +1302,7 @@ class TerminalImportSession(importer.ImportSession):
|
|||
# The import command.
|
||||
|
||||
|
||||
def import_files(lib, paths, query):
|
||||
def import_files(lib, paths: list[bytes], query):
|
||||
"""Import the files in the given list of paths or matching the
|
||||
query.
|
||||
"""
|
||||
|
|
@ -1333,7 +1333,7 @@ def import_files(lib, paths, query):
|
|||
plugins.send("import", lib=lib, paths=paths)
|
||||
|
||||
|
||||
def import_func(lib, opts, args):
|
||||
def import_func(lib, opts, args: list[str]):
|
||||
config["import"].set_args(opts)
|
||||
|
||||
# Special case: --copy flag suppresses import_move (which would
|
||||
|
|
@ -1355,15 +1355,11 @@ def import_func(lib, opts, args):
|
|||
if not paths and not paths_from_logfiles:
|
||||
raise ui.UserError("no path specified")
|
||||
|
||||
# On Python 2, we used to get filenames as raw bytes, which is
|
||||
# what we need. On Python 3, we need to undo the "helpful"
|
||||
# conversion to Unicode strings to get the real bytestring
|
||||
# filename.
|
||||
paths = [os.fsencode(p) for p in paths]
|
||||
byte_paths = [os.fsencode(p) for p in paths]
|
||||
paths_from_logfiles = [os.fsencode(p) for p in paths_from_logfiles]
|
||||
|
||||
# Check the user-specified directories.
|
||||
for path in paths:
|
||||
for path in byte_paths:
|
||||
if not os.path.exists(syspath(normpath(path))):
|
||||
raise ui.UserError(
|
||||
"no such file or directory: {}".format(
|
||||
|
|
@ -1384,14 +1380,14 @@ def import_func(lib, opts, args):
|
|||
)
|
||||
continue
|
||||
|
||||
paths.append(path)
|
||||
byte_paths.append(path)
|
||||
|
||||
# If all paths were read from a logfile, and none of them exist, throw
|
||||
# an error
|
||||
if not paths:
|
||||
raise ui.UserError("none of the paths are importable")
|
||||
|
||||
import_files(lib, paths, query)
|
||||
import_files(lib, byte_paths, query)
|
||||
|
||||
|
||||
import_cmd = ui.Subcommand(
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import sys
|
|||
import tempfile
|
||||
import traceback
|
||||
from collections import Counter
|
||||
from collections.abc import Sequence
|
||||
from contextlib import suppress
|
||||
from enum import Enum
|
||||
from functools import cache
|
||||
|
|
@ -41,7 +42,6 @@ from typing import (
|
|||
AnyStr,
|
||||
Callable,
|
||||
Generic,
|
||||
Iterable,
|
||||
NamedTuple,
|
||||
TypeVar,
|
||||
Union,
|
||||
|
|
@ -53,23 +53,17 @@ import beets
|
|||
from beets.util import hidden
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator, Sequence
|
||||
from collections.abc import Iterable, Iterator
|
||||
from logging import Logger
|
||||
|
||||
from beets.library import Item
|
||||
|
||||
if sys.version_info >= (3, 10):
|
||||
from typing import TypeAlias
|
||||
else:
|
||||
from typing_extensions import TypeAlias
|
||||
|
||||
|
||||
MAX_FILENAME_LENGTH = 200
|
||||
WINDOWS_MAGIC_PREFIX = "\\\\?\\"
|
||||
T = TypeVar("T")
|
||||
BytesOrStr = Union[str, bytes]
|
||||
PathLike = Union[BytesOrStr, Path]
|
||||
Replacements: TypeAlias = "Sequence[tuple[Pattern[str], str]]"
|
||||
PathLike = Union[str, bytes, Path]
|
||||
Replacements = Sequence[tuple[Pattern[str], str]]
|
||||
|
||||
# Here for now to allow for a easy replace later on
|
||||
# once we can move to a PathLike (mainly used in importer)
|
||||
|
|
@ -860,7 +854,9 @@ class CommandOutput(NamedTuple):
|
|||
stderr: bytes
|
||||
|
||||
|
||||
def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
|
||||
def command_output(
|
||||
cmd: list[str] | list[bytes], shell: bool = False
|
||||
) -> CommandOutput:
|
||||
"""Runs the command and returns its output after it has exited.
|
||||
|
||||
Returns a CommandOutput. The attributes ``stdout`` and ``stderr`` contain
|
||||
|
|
@ -878,8 +874,6 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
|
|||
This replaces `subprocess.check_output` which can have problems if lots of
|
||||
output is sent to stderr.
|
||||
"""
|
||||
converted_cmd = [os.fsdecode(a) for a in cmd]
|
||||
|
||||
devnull = subprocess.DEVNULL
|
||||
|
||||
proc = subprocess.Popen(
|
||||
|
|
@ -894,7 +888,7 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
|
|||
if proc.returncode:
|
||||
raise subprocess.CalledProcessError(
|
||||
returncode=proc.returncode,
|
||||
cmd=" ".join(converted_cmd),
|
||||
cmd=" ".join(map(os.fsdecode, cmd)),
|
||||
output=stdout + stderr,
|
||||
)
|
||||
return CommandOutput(stdout, stderr)
|
||||
|
|
|
|||
|
|
@ -214,9 +214,9 @@ class IMBackend(LocalBackend):
|
|||
else:
|
||||
return cls._version
|
||||
|
||||
convert_cmd: list[str | bytes]
|
||||
identify_cmd: list[str | bytes]
|
||||
compare_cmd: list[str | bytes]
|
||||
convert_cmd: list[str]
|
||||
identify_cmd: list[str]
|
||||
compare_cmd: list[str]
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize a wrapper around ImageMagick for local image operations.
|
||||
|
|
@ -265,7 +265,7 @@ class IMBackend(LocalBackend):
|
|||
# with regards to the height.
|
||||
# ImageMagick already seems to default to no interlace, but we include
|
||||
# it here for the sake of explicitness.
|
||||
cmd: list[str | bytes] = self.convert_cmd + [
|
||||
cmd: list[str] = self.convert_cmd + [
|
||||
syspath(path_in, prefix=False),
|
||||
"-resize",
|
||||
f"{maxwidth}x>",
|
||||
|
|
@ -295,7 +295,7 @@ class IMBackend(LocalBackend):
|
|||
return path_out
|
||||
|
||||
def get_size(self, path_in: bytes) -> tuple[int, int] | None:
|
||||
cmd: list[str | bytes] = self.identify_cmd + [
|
||||
cmd: list[str] = self.identify_cmd + [
|
||||
"-format",
|
||||
"%w %h",
|
||||
syspath(path_in, prefix=False),
|
||||
|
|
@ -480,10 +480,11 @@ class IMBackend(LocalBackend):
|
|||
return True
|
||||
|
||||
def write_metadata(self, file: bytes, metadata: Mapping[str, str]) -> None:
|
||||
assignments = list(
|
||||
chain.from_iterable(("-set", k, v) for k, v in metadata.items())
|
||||
assignments = chain.from_iterable(
|
||||
("-set", k, v) for k, v in metadata.items()
|
||||
)
|
||||
command = self.convert_cmd + [file, *assignments, file]
|
||||
str_file = os.fsdecode(file)
|
||||
command = self.convert_cmd + [str_file, *assignments, str_file]
|
||||
|
||||
util.command_output(command)
|
||||
|
||||
|
|
|
|||
|
|
@ -14,27 +14,21 @@
|
|||
|
||||
"""Allows custom commands to be run when an event is emitted by beets"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shlex
|
||||
import string
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Any
|
||||
|
||||
from beets.plugins import BeetsPlugin
|
||||
|
||||
|
||||
class CodingFormatter(string.Formatter):
|
||||
"""A variant of `string.Formatter` that converts everything to `unicode`
|
||||
strings.
|
||||
class BytesToStrFormatter(string.Formatter):
|
||||
"""A variant of `string.Formatter` that converts `bytes` to `str`."""
|
||||
|
||||
This was necessary on Python 2, in needs to be kept for backwards
|
||||
compatibility.
|
||||
"""
|
||||
|
||||
def __init__(self, coding):
|
||||
"""Creates a new coding formatter with the provided coding."""
|
||||
self._coding = coding
|
||||
|
||||
def convert_field(self, value, conversion):
|
||||
def convert_field(self, value: Any, conversion: str | None) -> Any:
|
||||
"""Converts the provided value given a conversion type.
|
||||
|
||||
This method decodes the converted value using the formatter's coding.
|
||||
|
|
@ -42,7 +36,7 @@ class CodingFormatter(string.Formatter):
|
|||
converted = super().convert_field(value, conversion)
|
||||
|
||||
if isinstance(converted, bytes):
|
||||
return converted.decode(self._coding)
|
||||
return os.fsdecode(converted)
|
||||
|
||||
return converted
|
||||
|
||||
|
|
@ -72,8 +66,8 @@ class HookPlugin(BeetsPlugin):
|
|||
return
|
||||
|
||||
# For backwards compatibility, use a string formatter that decodes
|
||||
# bytes (in particular, paths) to unicode strings.
|
||||
formatter = CodingFormatter(sys.getfilesystemencoding())
|
||||
# bytes (in particular, paths) to strings.
|
||||
formatter = BytesToStrFormatter()
|
||||
command_pieces = [
|
||||
formatter.format(piece, event=event, **kwargs)
|
||||
for piece in shlex.split(command)
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ class FatalGstreamerPluginReplayGainError(FatalReplayGainError):
|
|||
loading the required plugins."""
|
||||
|
||||
|
||||
def call(args: list[Any], log: Logger, **kwargs: Any):
|
||||
def call(args: list[str], log: Logger, **kwargs: Any):
|
||||
"""Execute the command and return its output or raise a
|
||||
ReplayGainError on failure.
|
||||
"""
|
||||
|
|
@ -73,11 +73,6 @@ def call(args: list[Any], log: Logger, **kwargs: Any):
|
|||
raise ReplayGainError(
|
||||
"{} exited with status {}".format(args[0], e.returncode)
|
||||
)
|
||||
except UnicodeEncodeError:
|
||||
# Due to a bug in Python 2's subprocess on Windows, Unicode
|
||||
# filenames can fail to encode on that platform. See:
|
||||
# https://github.com/google-code-export/beets/issues/499
|
||||
raise ReplayGainError("argument encoding failed")
|
||||
|
||||
|
||||
def db_to_lufs(db: float) -> float:
|
||||
|
|
@ -403,20 +398,18 @@ class FfmpegBackend(Backend):
|
|||
|
||||
def _construct_cmd(
|
||||
self, item: Item, peak_method: PeakMethod | None
|
||||
) -> list[str | bytes]:
|
||||
) -> list[str]:
|
||||
"""Construct the shell command to analyse items."""
|
||||
return [
|
||||
self._ffmpeg_path,
|
||||
"-nostats",
|
||||
"-hide_banner",
|
||||
"-i",
|
||||
item.path,
|
||||
str(item.filepath),
|
||||
"-map",
|
||||
"a:0",
|
||||
"-filter",
|
||||
"ebur128=peak={}".format(
|
||||
"none" if peak_method is None else peak_method.name
|
||||
),
|
||||
f"ebur128=peak={'none' if peak_method is None else peak_method.name}",
|
||||
"-f",
|
||||
"null",
|
||||
"-",
|
||||
|
|
@ -660,7 +653,7 @@ class CommandBackend(Backend):
|
|||
# tag-writing; this turns the mp3gain/aacgain tool into a gain
|
||||
# calculator rather than a tag manipulator because we take care
|
||||
# of changing tags ourselves.
|
||||
cmd: list[bytes | str] = [self.command, "-o", "-s", "s"]
|
||||
cmd: list[str] = [self.command, "-o", "-s", "s"]
|
||||
if self.noclip:
|
||||
# Adjust to avoid clipping.
|
||||
cmd = cmd + ["-k"]
|
||||
|
|
@ -1039,7 +1032,7 @@ class AudioToolsBackend(Backend):
|
|||
os.fsdecode(syspath(item.path))
|
||||
)
|
||||
except OSError:
|
||||
raise ReplayGainError(f"File {item.path} was not found")
|
||||
raise ReplayGainError(f"File {item.filepath} was not found")
|
||||
except self._mod_audiotools.UnsupportedFile:
|
||||
raise ReplayGainError(f"Unsupported file type {item.format}")
|
||||
|
||||
|
|
|
|||
|
|
@ -308,18 +308,8 @@ def all_items():
|
|||
def item_file(item_id):
|
||||
item = g.lib.get_item(item_id)
|
||||
|
||||
# On Windows under Python 2, Flask wants a Unicode path. On Python 3, it
|
||||
# *always* wants a Unicode path.
|
||||
if os.name == "nt":
|
||||
item_path = util.syspath(item.path)
|
||||
else:
|
||||
item_path = os.fsdecode(item.path)
|
||||
|
||||
item_path = util.syspath(item.path)
|
||||
base_filename = os.path.basename(item_path)
|
||||
if isinstance(base_filename, bytes):
|
||||
unicode_base_filename = util.displayable_path(base_filename)
|
||||
else:
|
||||
unicode_base_filename = base_filename
|
||||
|
||||
try:
|
||||
# Imitate http.server behaviour
|
||||
|
|
@ -327,7 +317,7 @@ def item_file(item_id):
|
|||
except UnicodeError:
|
||||
safe_filename = unidecode(base_filename)
|
||||
else:
|
||||
safe_filename = unicode_base_filename
|
||||
safe_filename = base_filename
|
||||
|
||||
response = flask.send_file(
|
||||
item_path, as_attachment=True, download_name=safe_filename
|
||||
|
|
|
|||
Loading…
Reference in a new issue