Use os.fsencode to encode bytes (#5662)

# Remove Python 2 compatibility code for string encoding/decoding

This PR simplifies the codebase by removing Python 2 compatibility code
related to string encoding and decoding operations. Key changes:

- Remove custom `_convert_args`, `arg_encoding()`, `_fsencoding()` and
`convert_command_args()` functions
- Replace with standard library's `os.fsencode()` and `os.fsdecode()`
for file system path handling
- Simplify bytestring/string conversions throughout the codebase
- Update test code to use modern string handling

These changes reduce code complexity and maintenance burden since Python
2 support is no longer needed.
This commit is contained in:
Benedikt 2025-04-22 11:56:53 +02:00 committed by GitHub
commit 5d010e95ec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 44 additions and 143 deletions

View file

@ -117,20 +117,9 @@ def capture_stdout():
print(capture.getvalue())
def _convert_args(args):
"""Convert args to bytestrings for Python 2 and convert them to strings
on Python 3.
"""
for i, elem in enumerate(args):
if isinstance(elem, bytes):
args[i] = elem.decode(util.arg_encoding())
return args
def has_program(cmd, args=["--version"]):
"""Returns `True` if `cmd` can be executed."""
full_cmd = _convert_args([cmd] + args)
full_cmd = [cmd] + args
try:
with open(os.devnull, "wb") as devnull:
subprocess.check_call(
@ -385,7 +374,7 @@ class TestHelper(_common.Assertions, ConfigMixin):
if hasattr(self, "lib"):
lib = self.lib
lib = kwargs.get("lib", lib)
beets.ui._raw_main(_convert_args(list(args)), lib)
beets.ui._raw_main(list(args), lib)
def run_with_output(self, *args):
with capture_stdout() as out:

View file

@ -1359,13 +1359,8 @@ def import_func(lib, opts, args):
# what we need. On Python 3, we need to undo the "helpful"
# conversion to Unicode strings to get the real bytestring
# filename.
paths = [
p.encode(util.arg_encoding(), "surrogateescape") for p in paths
]
paths_from_logfiles = [
p.encode(util.arg_encoding(), "surrogateescape")
for p in paths_from_logfiles
]
paths = [os.fsencode(p) for p in paths]
paths_from_logfiles = [os.fsencode(p) for p in paths_from_logfiles]
# Check the user-specified directories.
for path in paths:

View file

@ -369,28 +369,6 @@ def components(path: AnyStr) -> list[AnyStr]:
return comps
def arg_encoding() -> str:
"""Get the encoding for command-line arguments (and other OS
locale-sensitive strings).
"""
return sys.getfilesystemencoding()
def _fsencoding() -> str:
"""Get the system's filesystem encoding. On Windows, this is always
UTF-8 (not MBCS).
"""
encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
if encoding == "mbcs":
# On Windows, a broken encoding known to Python as "MBCS" is
# used for the filesystem. However, we only use the Unicode API
# for Windows paths, so the encoding is actually immaterial so
# we can avoid dealing with this nastiness. We arbitrarily
# choose UTF-8.
encoding = "utf-8"
return encoding
def bytestring_path(path: PathLike) -> bytes:
"""Given a path, which is either a bytes or a unicode, returns a str
path (ensuring that we never deal with Unicode pathnames). Path should be
@ -410,11 +388,7 @@ def bytestring_path(path: PathLike) -> bytes:
):
str_path = str_path[len(WINDOWS_MAGIC_PREFIX) :]
# Try to encode with default encodings, but fall back to utf-8.
try:
return str_path.encode(_fsencoding())
except (UnicodeError, LookupError):
return str_path.encode("utf-8")
return os.fsencode(str_path)
PATH_SEP: bytes = bytestring_path(os.sep)
@ -436,10 +410,7 @@ def displayable_path(
# A non-string object: just get its unicode representation.
return str(path)
try:
return path.decode(_fsencoding(), "ignore")
except (UnicodeError, LookupError):
return path.decode("utf-8", "ignore")
return os.fsdecode(path)
def syspath(path: PathLike, prefix: bool = True) -> str:
@ -854,19 +825,6 @@ def plurality(objs: Sequence[T]) -> tuple[T, int]:
return c.most_common(1)[0]
def convert_command_args(args: list[BytesOrStr]) -> list[str]:
"""Convert command arguments, which may either be `bytes` or `str`
objects, to uniformly surrogate-escaped strings."""
assert isinstance(args, list)
def convert(arg) -> str:
if isinstance(arg, bytes):
return os.fsdecode(arg)
return arg
return [convert(a) for a in args]
# stdout and stderr as bytes
class CommandOutput(NamedTuple):
stdout: bytes
@ -891,7 +849,7 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
This replaces `subprocess.check_output` which can have problems if lots of
output is sent to stderr.
"""
converted_cmd = convert_command_args(cmd)
converted_cmd = [os.fsdecode(a) for a in cmd]
devnull = subprocess.DEVNULL

View file

@ -28,7 +28,7 @@ from confuse import ConfigTypeError, Optional
from beets import art, config, plugins, ui, util
from beets.library import Item, parse_query_string
from beets.plugins import BeetsPlugin
from beets.util import arg_encoding, par_map
from beets.util import par_map
from beets.util.artresizer import ArtResizer
from beets.util.m3u import M3UFile
@ -284,7 +284,7 @@ class ConvertPlugin(BeetsPlugin):
if not quiet and not pretend:
self._log.info("Encoding {0}", util.displayable_path(source))
command = command.decode(arg_encoding(), "surrogateescape")
command = os.fsdecode(command)
source = os.fsdecode(source)
dest = os.fsdecode(dest)
@ -298,7 +298,7 @@ class ConvertPlugin(BeetsPlugin):
"dest": dest,
}
)
encode_cmd.append(args[i].encode(util.arg_encoding()))
encode_cmd.append(os.fsdecode(args[i]))
if pretend:
self._log.info("{0}", " ".join(ui.decargs(args)))

View file

@ -17,9 +17,9 @@
import shlex
import string
import subprocess
import sys
from beets.plugins import BeetsPlugin
from beets.util import arg_encoding
class CodingFormatter(string.Formatter):
@ -73,7 +73,7 @@ class HookPlugin(BeetsPlugin):
# For backwards compatibility, use a string formatter that decodes
# bytes (in particular, paths) to unicode strings.
formatter = CodingFormatter(arg_encoding())
formatter = CodingFormatter(sys.getfilesystemencoding())
command_pieces = [
formatter.format(piece, event=event, **kwargs)
for piece in shlex.split(command)

View file

@ -20,7 +20,6 @@ import tempfile
from beets import config, library, ui, util
from beets.plugins import BeetsPlugin
from beets.util import syspath
class IPFSPlugin(BeetsPlugin):
@ -193,7 +192,7 @@ class IPFSPlugin(BeetsPlugin):
# This uses a relative path, hence we cannot use util.syspath(_hash,
# prefix=True). However, that should be fine since the hash will not
# exceed MAX_PATH.
shutil.rmtree(syspath(_hash, prefix=False))
shutil.rmtree(util.syspath(_hash, prefix=False))
def ipfs_publish(self, lib):
with tempfile.NamedTemporaryFile() as tmp:
@ -299,9 +298,7 @@ class IPFSPlugin(BeetsPlugin):
break
except AttributeError:
pass
item_path = os.path.basename(item.path).decode(
util._fsencoding(), "ignore"
)
item_path = os.fsdecode(os.path.basename(item.path))
# Clear current path from item
item.path = f"/ipfs/{album.ipfs}/{item_path}"

View file

@ -27,7 +27,6 @@ from pathlib import PurePosixPath
from xdg import BaseDirectory
from beets import util
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, decargs
from beets.util import bytestring_path, displayable_path, syspath
@ -288,6 +287,6 @@ class GioURI(URIGetter):
self.libgio.g_free(uri_ptr)
try:
return uri.decode(util._fsencoding())
return os.fsdecode(uri)
except UnicodeDecodeError:
raise RuntimeError(f"Could not decode filename from GIO: {uri!r}")

View file

@ -315,13 +315,8 @@ def item_file(item_id):
item_path = os.fsdecode(item.path)
base_filename = os.path.basename(item_path)
# FIXME: Arguably, this should just use `displayable_path`: The latter
# tries `_fsencoding()` first, but then falls back to `utf-8`, too.
if isinstance(base_filename, bytes):
try:
unicode_base_filename = base_filename.decode("utf-8")
except UnicodeError:
unicode_base_filename = util.displayable_path(base_filename)
unicode_base_filename = util.displayable_path(base_filename)
else:
unicode_base_filename = base_filename

View file

@ -143,18 +143,13 @@ class ConvertCommand:
in tests.
"""
def run_convert_path(self, path, *args):
def run_convert_path(self, item, *args):
"""Run the `convert` command on a given path."""
# The path is currently a filesystem bytestring. Convert it to
# an argument bytestring.
path = path.decode(util._fsencoding()).encode(util.arg_encoding())
args = args + (b"path:" + path,)
return self.run_command("convert", *args)
return self.run_command("convert", *args, f"path:{item.filepath}")
def run_convert(self, *args):
"""Run the `convert` command on `self.item`."""
return self.run_convert_path(self.item.path, *args)
return self.run_convert_path(self.item, *args)
@_common.slow_test()
@ -320,7 +315,7 @@ class NeverConvertLossyFilesTest(ConvertTestCase, ConvertCommand):
def test_transcode_from_lossless(self):
[item] = self.add_item_fixtures(ext="flac")
with control_stdin("y"):
self.run_convert_path(item.path)
self.run_convert_path(item)
converted = os.path.join(self.convert_dest, b"converted.mp3")
self.assertFileTag(converted, "mp3")
@ -328,14 +323,14 @@ class NeverConvertLossyFilesTest(ConvertTestCase, ConvertCommand):
self.config["convert"]["never_convert_lossy_files"] = False
[item] = self.add_item_fixtures(ext="ogg")
with control_stdin("y"):
self.run_convert_path(item.path)
self.run_convert_path(item)
converted = os.path.join(self.convert_dest, b"converted.mp3")
self.assertFileTag(converted, "mp3")
def test_transcode_from_lossy_prevented(self):
[item] = self.add_item_fixtures(ext="ogg")
with control_stdin("y"):
self.run_convert_path(item.path)
self.run_convert_path(item)
converted = os.path.join(self.convert_dest, b"converted.ogg")
self.assertNoFileTag(converted, "mp3")

View file

@ -17,7 +17,7 @@ from unittest.mock import Mock, patch
from beets.test import _common
from beets.test.helper import PluginTestCase
from beets.util import _fsencoding, bytestring_path
from beets.util import bytestring_path
from beetsplug.ipfs import IPFSPlugin
@ -36,9 +36,7 @@ class IPFSPluginTest(PluginTestCase):
for check_item in added_album.items():
try:
if check_item.get("ipfs", with_album=False):
ipfs_item = os.path.basename(want_item.path).decode(
_fsencoding(),
)
ipfs_item = os.fsdecode(os.path.basename(want_item.path))
want_path = "/ipfs/{}/{}".format(test_album.ipfs, ipfs_item)
want_path = bytestring_path(want_path)
assert check_item.path == want_path

View file

@ -100,12 +100,10 @@ class ThumbnailsTest(BeetsTestCase):
@patch("beetsplug.thumbnails.ThumbnailsPlugin._check_local_ok", Mock())
@patch("beetsplug.thumbnails.ArtResizer")
@patch("beetsplug.thumbnails.util")
@patch("beets.util.syspath", Mock(side_effect=lambda x: x))
@patch("beetsplug.thumbnails.os")
@patch("beetsplug.thumbnails.shutil")
def test_make_cover_thumbnail(
self, mock_shutils, mock_os, mock_util, mock_artresizer
):
def test_make_cover_thumbnail(self, mock_shutils, mock_os, mock_artresizer):
thumbnail_dir = os.path.normpath(b"/thumbnail/dir")
md5_file = os.path.join(thumbnail_dir, b"md5")
path_to_art = os.path.normpath(b"/path/to/art")
@ -116,7 +114,6 @@ class ThumbnailsTest(BeetsTestCase):
plugin.add_tags = Mock()
album = Mock(artpath=path_to_art)
mock_util.syspath.side_effect = lambda x: x
plugin.thumbnail_file_name = Mock(return_value=b"md5")
mock_os.path.exists.return_value = False

View file

@ -4,18 +4,9 @@
a specified text tag.
"""
import locale
import sys
# From `beets.util`.
def arg_encoding():
try:
return locale.getdefaultlocale()[1] or "utf-8"
except ValueError:
return "utf-8"
def convert(in_file, out_file, tag):
"""Copy `in_file` to `out_file` and append the string `tag`."""
if not isinstance(tag, bytes):

View file

@ -865,6 +865,9 @@ class ConfigTest(TestPluginTestCase):
# Custom BEETSDIR
self.beetsdir = os.path.join(self.temp_dir, b"beetsdir")
self.cli_config_path = os.path.join(
os.fsdecode(self.temp_dir), "config.yaml"
)
os.makedirs(syspath(self.beetsdir))
self.env_patcher = patch(
"os.environ",
@ -952,20 +955,18 @@ class ConfigTest(TestPluginTestCase):
assert repls == [("[xy]", "z"), ("foo", "bar")]
def test_cli_config_option(self):
config_path = os.path.join(self.temp_dir, b"config.yaml")
with open(config_path, "w") as file:
with open(self.cli_config_path, "w") as file:
file.write("anoption: value")
self.run_command("--config", config_path, "test", lib=None)
self.run_command("--config", self.cli_config_path, "test", lib=None)
assert config["anoption"].get() == "value"
def test_cli_config_file_overwrites_user_defaults(self):
with open(self.user_config_path, "w") as file:
file.write("anoption: value")
cli_config_path = os.path.join(self.temp_dir, b"config.yaml")
with open(cli_config_path, "w") as file:
with open(self.cli_config_path, "w") as file:
file.write("anoption: cli overwrite")
self.run_command("--config", cli_config_path, "test", lib=None)
self.run_command("--config", self.cli_config_path, "test", lib=None)
assert config["anoption"].get() == "cli overwrite"
def test_cli_config_file_overwrites_beetsdir_defaults(self):
@ -974,10 +975,9 @@ class ConfigTest(TestPluginTestCase):
with open(env_config_path, "w") as file:
file.write("anoption: value")
cli_config_path = os.path.join(self.temp_dir, b"config.yaml")
with open(cli_config_path, "w") as file:
with open(self.cli_config_path, "w") as file:
file.write("anoption: cli overwrite")
self.run_command("--config", cli_config_path, "test", lib=None)
self.run_command("--config", self.cli_config_path, "test", lib=None)
assert config["anoption"].get() == "cli overwrite"
# @unittest.skip('Difficult to implement with optparse')
@ -998,29 +998,27 @@ class ConfigTest(TestPluginTestCase):
#
# @unittest.skip('Difficult to implement with optparse')
# def test_multiple_cli_config_overwrite(self):
# cli_config_path = os.path.join(self.temp_dir, b'config.yaml')
# cli_overwrite_config_path = os.path.join(self.temp_dir,
# b'overwrite_config.yaml')
#
# with open(cli_config_path, 'w') as file:
# with open(self.cli_config_path, 'w') as file:
# file.write('anoption: value')
#
# with open(cli_overwrite_config_path, 'w') as file:
# file.write('anoption: overwrite')
#
# self.run_command('--config', cli_config_path,
# self.run_command('--config', self.cli_config_path,
# '--config', cli_overwrite_config_path, 'test')
# assert config['anoption'].get() == 'cli overwrite'
# FIXME: fails on windows
@unittest.skipIf(sys.platform == "win32", "win32")
def test_cli_config_paths_resolve_relative_to_user_dir(self):
cli_config_path = os.path.join(self.temp_dir, b"config.yaml")
with open(cli_config_path, "w") as file:
with open(self.cli_config_path, "w") as file:
file.write("library: beets.db\n")
file.write("statefile: state")
self.run_command("--config", cli_config_path, "test", lib=None)
self.run_command("--config", self.cli_config_path, "test", lib=None)
self.assert_equal_path(
util.bytestring_path(config["library"].as_filename()),
os.path.join(self.user_config_dir, b"beets.db"),
@ -1033,12 +1031,11 @@ class ConfigTest(TestPluginTestCase):
def test_cli_config_paths_resolve_relative_to_beetsdir(self):
os.environ["BEETSDIR"] = os.fsdecode(self.beetsdir)
cli_config_path = os.path.join(self.temp_dir, b"config.yaml")
with open(cli_config_path, "w") as file:
with open(self.cli_config_path, "w") as file:
file.write("library: beets.db\n")
file.write("statefile: state")
self.run_command("--config", cli_config_path, "test", lib=None)
self.run_command("--config", self.cli_config_path, "test", lib=None)
self.assert_equal_path(
util.bytestring_path(config["library"].as_filename()),
os.path.join(self.beetsdir, b"beets.db"),
@ -1057,12 +1054,11 @@ class ConfigTest(TestPluginTestCase):
)
def test_cli_config_file_loads_plugin_commands(self):
cli_config_path = os.path.join(self.temp_dir, b"config.yaml")
with open(cli_config_path, "w") as file:
with open(self.cli_config_path, "w") as file:
file.write("pluginpath: %s\n" % _common.PLUGINPATH)
file.write("plugins: test")
self.run_command("--config", cli_config_path, "plugin", lib=None)
self.run_command("--config", self.cli_config_path, "plugin", lib=None)
assert plugins.find_plugins()[0].is_test_plugin
self.unload_plugins()
@ -1483,9 +1479,7 @@ class CommonOptionsParserCliTest(BeetsTestCase):
assert output == "the album artist\n"
def test_format_option_unicode(self):
output = self.run_with_output(
b"ls", b"-f", "caf\xe9".encode(util.arg_encoding())
)
output = self.run_with_output("ls", "-f", "caf\xe9")
assert output == "caf\xe9\n"
def test_root_format_option(self):

View file

@ -97,13 +97,6 @@ class UtilTest(unittest.TestCase):
p = util.sanitize_path("foo//bar", [(re.compile(r"^$"), "_")])
assert p == "foo/_/bar"
@unittest.skipIf(sys.platform == "win32", "win32")
def test_convert_command_args_keeps_undecodeable_bytes(self):
arg = b"\x82" # non-ascii bytes
cmd_args = util.convert_command_args([arg])
assert cmd_args[0] == arg.decode(util.arg_encoding(), "surrogateescape")
@patch("beets.util.subprocess.Popen")
def test_command_output(self, mock_popen):
def popen_fail(*args, **kwargs):