Merge pull request #1218 from brunal/no-global-no-print

Clean plugins: no global, no print
This commit is contained in:
Adrian Sampson 2015-01-13 10:37:56 -08:00
commit ccd5e71519
6 changed files with 137 additions and 148 deletions

View file

@ -185,7 +185,8 @@ class BeetsPlugin(object):
"""
if cls.listeners is None:
cls.listeners = defaultdict(list)
cls.listeners[event].append(func)
if func not in cls.listeners[event]:
cls.listeners[event].append(func)
@classmethod
def listen(cls, event):
@ -202,9 +203,7 @@ class BeetsPlugin(object):
... pass
"""
def helper(func):
if cls.listeners is None:
cls.listeners = defaultdict(list)
cls.listeners[event].append(func)
cls.register_listener(event, func)
return func
return helper

View file

@ -20,17 +20,11 @@ Put something like the following in your config.yaml to configure:
port: 6600
password: seekrit
"""
from __future__ import print_function
from beets.plugins import BeetsPlugin
import os
import socket
from beets import config
# Global variable so that mpdupdate can detect database changes and run only
# once before beets exits.
database_changed = False
# No need to introduce a dependency on an MPD library for such a
# simple use case. Here's a simple socket abstraction to make things
@ -66,37 +60,6 @@ class BufferedSocket(object):
self.sock.close()
def update_mpd(host='localhost', port=6600, password=None):
"""Sends the "update" command to the MPD server indicated,
possibly authenticating with a password first.
"""
print('Updating MPD database...')
s = BufferedSocket(host, port)
resp = s.readline()
if 'OK MPD' not in resp:
print('MPD connection failed:', repr(resp))
return
if password:
s.send('password "%s"\n' % password)
resp = s.readline()
if 'OK' not in resp:
print('Authentication failed:', repr(resp))
s.send('close\n')
s.close()
return
s.send('update\n')
resp = s.readline()
if 'updating_db' not in resp:
print('Update failed:', repr(resp))
s.send('close\n')
s.close()
print('... updated.')
class MPDUpdatePlugin(BeetsPlugin):
def __init__(self):
super(MPDUpdatePlugin, self).__init__()
@ -112,18 +75,44 @@ class MPDUpdatePlugin(BeetsPlugin):
if self.config[key].exists():
config['mpd'][key] = self.config[key].get()
self.register_listener('database_change', self.db_change)
@MPDUpdatePlugin.listen('database_change')
def handle_change(lib=None):
global database_changed
database_changed = True
def db_change(self, lib):
self.register_listener('cli_exit', self.update)
@MPDUpdatePlugin.listen('cli_exit')
def update(lib=None):
if database_changed:
update_mpd(
def update(self, lib):
self.update_mpd(
config['mpd']['host'].get(unicode),
config['mpd']['port'].get(int),
config['mpd']['password'].get(unicode),
)
def update_mpd(self, host='localhost', port=6600, password=None):
"""Sends the "update" command to the MPD server indicated,
possibly authenticating with a password first.
"""
self._log.info('Updating MPD database...')
s = BufferedSocket(host, port)
resp = s.readline()
if 'OK MPD' not in resp:
self._log.warning(u'MPD connection failed: {0!r}', resp)
return
if password:
s.send('password "%s"\n' % password)
resp = s.readline()
if 'OK' not in resp:
self._log.warning(u'Authentication failed: {0!r}', resp)
s.send('close\n')
s.close()
return
s.send('update\n')
resp = s.readline()
if 'updating_db' not in resp:
self._log.warning(u'Update failed: {0!r}', resp)
s.send('close\n')
s.close()
self._log.info('Database updated.')

View file

@ -12,11 +12,6 @@ from beets import config
from beets.plugins import BeetsPlugin
# Global variable to detect if database is changed that the update
# is only run once before beets exists.
database_changed = False
def get_music_section(host, port):
"""Getting the section key for the music library in Plex.
"""
@ -55,30 +50,23 @@ class PlexUpdate(BeetsPlugin):
u'host': u'localhost',
u'port': 32400})
self.register_listener('database_change', self.listen_for_db_change)
@PlexUpdate.listen('database_change')
def listen_for_db_change(lib=None):
"""Listens for beets db change and set global database_changed
variable to True.
"""
global database_changed
database_changed = True
def listen_for_db_change(self, lib):
"""Listens for beets db change and register the update for the end"""
self.register_listener('cli_exit', self.update)
@PlexUpdate.listen('cli_exit')
def update(lib=None):
"""When the client exists and the database_changed variable is True
trying to send refresh request to Plex server.
"""
if database_changed:
print('Updating Plex library...')
def update(self, lib):
"""When the client exists try to send refresh request to Plex server.
"""
self._log.info('Updating Plex library...')
# Try to send update request.
try:
update_plex(
config['plex']['host'].get(),
config['plex']['port'].get())
print('... started.')
self._log.info('... started.')
except requests.exceptions.RequestException:
print('Update failed.')
self._log.warning('Update failed.')

View file

@ -15,76 +15,23 @@
"""Generates smart playlists based on beets queries.
"""
from __future__ import print_function
from itertools import chain
from beets.plugins import BeetsPlugin
from beets import config, ui, library
from beets import ui
from beets.util import normpath, syspath
import os
# Global variable so that smartplaylist can detect database changes and run
# only once before beets exits.
database_changed = False
def _items_for_query(lib, playlist, album=False):
"""Get the matching items for a playlist's configured queries.
`album` indicates whether to process the item-level query or the
album-level query (if any).
def _items_for_query(lib, queries, album):
"""Get the matching items for a query.
`album` indicates whether the queries are item-level or album-level.
"""
key = 'album_query' if album else 'query'
if key not in playlist:
return []
# Parse quer(ies). If it's a list, perform the queries and manually
# concatenate the results
query_strings = playlist[key]
if not isinstance(query_strings, (list, tuple)):
query_strings = [query_strings]
model = library.Album if album else library.Item
results = []
for q in query_strings:
query, sort = library.parse_query_string(q, model)
if album:
new = lib.albums(query, sort)
else:
new = lib.items(query, sort)
results.extend(new)
return results
def update_playlists(lib):
ui.print_("Updating smart playlists...")
playlists = config['smartplaylist']['playlists'].get(list)
playlist_dir = config['smartplaylist']['playlist_dir'].as_filename()
relative_to = config['smartplaylist']['relative_to'].get()
if relative_to:
relative_to = normpath(relative_to)
for playlist in playlists:
items = []
items.extend(_items_for_query(lib, playlist, True))
items.extend(_items_for_query(lib, playlist, False))
m3us = {}
basename = playlist['name'].encode('utf8')
# As we allow tags in the m3u names, we'll need to iterate through
# the items and generate the correct m3u file names.
for item in items:
m3u_name = item.evaluate_template(basename, True)
if not (m3u_name in m3us):
m3us[m3u_name] = []
item_path = item.path
if relative_to:
item_path = os.path.relpath(item.path, relative_to)
if item_path not in m3us[m3u_name]:
m3us[m3u_name].append(item_path)
# Now iterate through the m3us that we need to generate
for m3u in m3us:
m3u_path = normpath(os.path.join(playlist_dir, m3u))
with open(syspath(m3u_path), 'w') as f:
for path in m3us[m3u]:
f.write(path + '\n')
ui.print_("... Done")
request = lib.albums if album else lib.items
if isinstance(queries, basestring):
return request(queries)
else:
return chain.from_iterable(map(request, queries))
class SmartPlaylistPlugin(BeetsPlugin):
@ -97,23 +44,54 @@ class SmartPlaylistPlugin(BeetsPlugin):
'playlists': []
})
if self.config['auto']:
self.register_listener('database_change', self.db_change)
def commands(self):
def update(lib, opts, args):
update_playlists(lib)
self.update_playlists(lib)
spl_update = ui.Subcommand('splupdate',
help='update the smart playlists')
spl_update.func = update
return [spl_update]
def db_change(self, lib):
self.register_listener('cli_exit', self.update_playlists)
@SmartPlaylistPlugin.listen('database_change')
def handle_change(lib):
global database_changed
database_changed = True
def update_playlists(self, lib):
self._log.info("Updating smart playlists...")
playlists = self.config['playlists'].get(list)
playlist_dir = self.config['playlist_dir'].as_filename()
relative_to = self.config['relative_to'].get()
if relative_to:
relative_to = normpath(relative_to)
for playlist in playlists:
self._log.debug(u"Creating playlist {0.name}", playlist)
items = []
if 'album_query' in playlist:
items.extend(_items_for_query(lib, playlist['album_query'],
True))
if 'query' in playlist:
items.extend(_items_for_query(lib, playlist['query'], False))
@SmartPlaylistPlugin.listen('cli_exit')
def update(lib):
auto = config['smartplaylist']['auto']
if database_changed and auto:
update_playlists(lib)
m3us = {}
basename = playlist['name'].encode('utf8')
# As we allow tags in the m3u names, we'll need to iterate through
# the items and generate the correct m3u file names.
for item in items:
m3u_name = item.evaluate_template(basename, True)
if m3u_name not in m3us:
m3us[m3u_name] = []
item_path = item.path
if relative_to:
item_path = os.path.relpath(item.path, relative_to)
if item_path not in m3us[m3u_name]:
m3us[m3u_name].append(item_path)
# Now iterate through the m3us that we need to generate
for m3u in m3us:
m3u_path = normpath(os.path.join(playlist_dir, m3u))
with open(syspath(m3u_path), 'w') as f:
for path in m3us[m3u]:
f.write(path + '\n')
self._log.info("{0} playlists updated", len(playlists))

View file

@ -53,9 +53,9 @@ to albums that have a ``for_travel`` extensible field set to 1::
album_query: 'for_travel:1'
query: 'for_travel:1'
By default, all playlists are automatically regenerated after every beets
command that changes the library database. To force regeneration, you can invoke it manually from the
command line::
By default, all playlists are automatically regenerated at the end of the
session if the library database was changed. To force regeneration, you can
invoke it manually from the command line::
$ beet splupdate

View file

@ -161,6 +161,41 @@ class HelpersTest(unittest.TestCase):
('A', 'B', 'C', 'D')), ['D', 'B', 'C', 'A'])
class ListenersTest(unittest.TestCase, TestHelper):
def setUp(self):
self.setup_plugin_loader()
def tearDown(self):
self.teardown_plugin_loader()
self.teardown_beets()
def test_register(self):
class DummyPlugin(plugins.BeetsPlugin):
def __init__(self):
super(DummyPlugin, self).__init__()
self.register_listener('cli_exit', self.dummy)
self.register_listener('cli_exit', self.dummy)
def dummy(self):
pass
d = DummyPlugin()
self.assertEqual(DummyPlugin.listeners['cli_exit'], [d.dummy])
d2 = DummyPlugin()
DummyPlugin.register_listener('cli_exit', d.dummy)
self.assertEqual(DummyPlugin.listeners['cli_exit'],
[d.dummy, d2.dummy])
@DummyPlugin.listen('cli_exit')
def dummy(lib):
pass
self.assertEqual(DummyPlugin.listeners['cli_exit'],
[d.dummy, d2.dummy, dummy])
def suite():
return unittest.TestLoader().loadTestsFromName(__name__)