make helper functions methods in plugin

This helps use the _log attribute that is now available
This commit is contained in:
Pedro Silva 2015-03-28 10:08:07 +01:00
parent 8feb4bdf34
commit 82ce6f054d

View file

@ -26,89 +26,6 @@ from beets.util import command_output, displayable_path, subprocess
PLUGIN = 'duplicates'
def _process_item(item, lib, copy=False, move=False, delete=False,
tag=False, fmt=''):
"""Process Item `item` in `lib`.
"""
if copy:
item.move(basedir=copy, copy=True)
item.store()
if move:
item.move(basedir=move, copy=False)
item.store()
if delete:
item.remove(delete=True)
if tag:
try:
k, v = tag.split('=')
except:
raise UserError('%s: can\'t parse k=v tag: %s' % (PLUGIN, tag))
setattr(k, v)
item.store()
print_(format(item, fmt))
def _checksum(item, prog, log):
"""Run external `prog` on file path associated with `item`, cache
output as flexattr on a key that is the name of the program, and
return the key, checksum tuple.
"""
args = [p.format(file=item.path) for p in shlex.split(prog)]
key = args[0]
checksum = getattr(item, key, False)
if not checksum:
log.debug(u'key {0} on item {1} not cached: computing checksum',
key, displayable_path(item.path))
try:
checksum = command_output(args)
setattr(item, key, checksum)
item.store()
log.debug(u'computed checksum for {0} using {1}',
item.title, key)
except subprocess.CalledProcessError as e:
log.debug(u'failed to checksum {0}: {1}',
displayable_path(item.path), e)
else:
log.debug(u'key {0} on item {1} cached: not computing checksum',
key, displayable_path(item.path))
return key, checksum
def _group_by(objs, keys, strict, log):
"""Return a dictionary with keys arbitrary concatenations of attributes and
values lists of objects (Albums or Items) with those keys.
If strict, all attributes must be defined for a duplicate match.
"""
import collections
counts = collections.defaultdict(list)
for obj in objs:
values = [getattr(obj, k, None) for k in keys]
values = [v for v in values if v not in (None, '')]
if strict and len(values) < len(keys):
log.debug(u'some keys {0} on item {1} are null or empty: '
'skipping',
keys, displayable_path(obj.path))
elif (not strict and not len(values)):
log.debug(u'all keys {0} on item {1} are null or empty: '
'skipping',
keys, displayable_path(obj.path))
else:
key = tuple(values)
counts[key].append(obj)
return counts
def _duplicates(objs, keys, full, strict, log):
"""Generate triples of keys, duplicate counts, and constituent objects.
"""
offset = 0 if full else 1
for k, objs in _group_by(objs, keys, strict, log).iteritems():
if len(objs) > 1:
yield (k, len(objs) - offset, objs[offset:])
class DuplicatesPlugin(BeetsPlugin):
"""List duplicate tracks or albums
"""
@ -214,22 +131,102 @@ class DuplicatesPlugin(BeetsPlugin):
'duplicates: "checksum" option must be a command'
)
for i in items:
k, _ = self._checksum(i, checksum, self._log)
k, _ = self._checksum(i, checksum)
keys = [k]
for obj_id, obj_count, objs in _duplicates(items,
keys=keys,
full=full,
strict=strict,
log=self._log):
for obj_id, obj_count, objs in self._duplicates(items,
keys=keys,
full=full,
strict=strict):
if obj_id: # Skip empty IDs.
for o in objs:
_process_item(o, lib,
copy=copy,
move=move,
delete=delete,
tag=tag,
fmt=fmt.format(obj_count))
self._process_item(o, lib,
copy=copy,
move=move,
delete=delete,
tag=tag,
fmt=fmt.format(obj_count))
self._command.func = _dup
return [self._command]
def _process_item(self, item, lib, copy=False, move=False, delete=False,
tag=False, fmt=''):
"""Process Item `item` in `lib`.
"""
if copy:
item.move(basedir=copy, copy=True)
item.store()
if move:
item.move(basedir=move, copy=False)
item.store()
if delete:
item.remove(delete=True)
if tag:
try:
k, v = tag.split('=')
except:
raise UserError('%s: can\'t parse k=v tag: %s' % (PLUGIN, tag))
setattr(k, v)
item.store()
print_(format(item, fmt))
def _checksum(self, item, prog):
"""Run external `prog` on file path associated with `item`, cache
output as flexattr on a key that is the name of the program, and
return the key, checksum tuple.
"""
args = [p.format(file=item.path) for p in shlex.split(prog)]
key = args[0]
checksum = getattr(item, key, False)
if not checksum:
self._log.debug(u'key {0} on item {1} not cached:'
'computing checksum',
key, displayable_path(item.path))
try:
checksum = command_output(args)
setattr(item, key, checksum)
item.store()
self._log.debug(u'computed checksum for {0} using {1}',
item.title, key)
except subprocess.CalledProcessError as e:
self._log.debug(u'failed to checksum {0}: {1}',
displayable_path(item.path), e)
else:
self._log.debug(u'key {0} on item {1} cached:'
'not computing checksum',
key, displayable_path(item.path))
return key, checksum
def _group_by(self, objs, keys, strict):
"""Return a dictionary with keys arbitrary concatenations of attributes and
values lists of objects (Albums or Items) with those keys.
If strict, all attributes must be defined for a duplicate match.
"""
import collections
counts = collections.defaultdict(list)
for obj in objs:
values = [getattr(obj, k, None) for k in keys]
values = [v for v in values if v not in (None, '')]
if strict and len(values) < len(keys):
self._log.debug(u'some keys {0} on item {1} are null or empty:'
' skipping',
keys, displayable_path(obj.path))
elif (not strict and not len(values)):
self._log.debug(u'all keys {0} on item {1} are null or empty:'
' skipping',
keys, displayable_path(obj.path))
else:
key = tuple(values)
counts[key].append(obj)
return counts
def _duplicates(self, objs, keys, full, strict):
"""Generate triples of keys, duplicate counts, and constituent objects.
"""
offset = 0 if full else 1
for k, objs in self._group_by(objs, keys, strict).iteritems():
if len(objs) > 1:
yield (k, len(objs) - offset, objs[offset:])