Import zip archives

`beet import archive.zip` extracts the archive to a temporary directory and
imports the content.

The code is very hacky. To make it cleaner the `importer` module needs some
refactoring. One thing the code hints at is extending the `ImportTask` class.
This commit is contained in:
Thomas Scholtes 2014-04-15 15:40:51 +02:00
parent 770bee3583
commit b783097329
2 changed files with 75 additions and 1 deletions

View file

@ -22,6 +22,9 @@ import logging
import pickle
import itertools
from collections import defaultdict
from zipfile import is_zipfile, ZipFile
from tempfile import mkdtemp
import shutil
from beets import autotag
from beets import library
@ -525,6 +528,11 @@ class ImportTask(object):
else:
return [self.item]
def cleanup(self):
"""Perform clean up during `finalize` stage.
"""
pass
# Utilities.
def prune(self, filename):
@ -540,6 +548,16 @@ class ImportTask(object):
clutter=config['clutter'].as_str_seq())
class ArchiveImportTask(ImportTask):
def __init__(self, toppath):
super(ArchiveImportTask, self).__init__(toppath)
self.sentinel = True
def cleanup(self):
shutil.rmtree(self.toppath)
# Full-album pipeline stages.
def read_tasks(session):
@ -573,6 +591,26 @@ def read_tasks(session):
history_dirs = history_get()
for toppath in session.paths:
extracted = None
if is_zipfile(syspath(toppath)):
if not (config['import']['move'] or config['import']['copy']):
log.warn("Cannot import archive. Please set "
"the 'move' or 'copy' option.")
continue
log.debug('extracting archive {0}'
.format(displayable_path(toppath)))
try:
extracted = mkdtemp()
zip_file = ZipFile(toppath, mode='r')
zip_file.extractall(extracted)
except IOError as exc:
log.error('extraction failed: {0}'.format(exc))
continue
finally:
zip_file.close()
toppath = extracted
# Check whether the path is to a file.
if not os.path.isdir(syspath(toppath)):
try:
@ -627,7 +665,11 @@ def read_tasks(session):
yield ImportTask(toppath, paths, items)
# Indicate the directory is finished.
yield ImportTask.done_sentinel(toppath)
# FIXME hack to delete extraced archives
if extracted is None:
yield ImportTask.done_sentinel(toppath)
else:
yield ArchiveImportTask(extracted)
# Show skipped directories.
if config['import']['incremental'] and incremental_skipped:
@ -937,6 +979,7 @@ def finalize(session):
task.save_progress()
if config['import']['incremental']:
task.save_history()
task.cleanup()
continue
items = task.imported_items()
@ -971,6 +1014,7 @@ def finalize(session):
task.save_progress()
if config['import']['incremental']:
task.save_history()
task.cleanup()
# Singleton pipeline stages.

View file

@ -17,6 +17,8 @@
import os
import shutil
import StringIO
from tempfile import mkstemp
from zipfile import ZipFile
import _common
from _common import unittest
@ -299,6 +301,34 @@ class NonAutotaggedImportTest(_common.TestCase, ImportHelper):
self.assertNotExists(os.path.join(self.import_dir, 'the_album'))
class ImportArchiveTest(unittest.TestCase, ImportHelper):
def setUp(self):
self.setup_beets()
def tearDown(self):
self.teardown_beets()
def test_import_zip(self):
zip_path = self.create_zip_archive()
self.assertEqual(len(self.lib.items()), 0)
self.assertEqual(len(self.lib.albums()), 0)
self._setup_import_session(autotag=False, import_dir=zip_path)
self.importer.run()
self.assertEqual(len(self.lib.items()), 1)
self.assertEqual(len(self.lib.albums()), 1)
def create_zip_archive(self):
(handle, zip_path) = mkstemp('.zip', dir=self.temp_dir)
os.close(handle)
zip_file = ZipFile(zip_path, mode='w')
zip_file.write(os.path.join(_common.RSRC, 'full.mp3'),
'full.mp3')
zip_file.close()
return zip_path
class ImportSingletonTest(_common.TestCase, ImportHelper):
"""Test ``APPLY`` and ``ASIS`` choices for an import session with singletons
config set to True.