Handle incremental and resumed imports in session

This commit is contained in:
Thomas Scholtes 2014-07-31 11:48:35 +02:00
parent 0f2a9bdcdc
commit 1eb62bcd72

View file

@ -179,6 +179,7 @@ class ImportSession(object):
self.paths = paths
self.query = query
self.seen_idents = set()
self._is_resuming = dict()
# Normalize the paths.
if self.paths:
@ -294,6 +295,50 @@ class ImportSession(object):
# User aborted operation. Silently stop.
pass
# Incremental and resumed imports
def already_imported(self, toppath, paths):
"""Returns true if the files belonging to this task have already
been imported in a previous session.
"""
if self.is_resuming(toppath) \
and all(map(lambda p: progress_element(toppath, p), paths)):
return True
if self.config['incremental'] \
and tuple(paths) in self.history_dirs:
return True
return False
@property
def history_dirs(self):
if not hasattr(self, '_history_dirs'):
self._history_dirs = history_get()
return self._history_dirs
def is_resuming(self, toppath):
"""Return `True` if user wants to resume import of this path.
You have to call `ask_resume` first to determine the return value.
"""
return self._is_resuming.get(toppath, False)
def ask_resume(self, toppath):
"""If import of `toppath` was aborted in an earlier session, ask
user if she wants to resume the import.
Determines the return value of `is_resuming(toppath)`.
"""
if self.want_resume and has_progress(toppath):
# Either accept immediately or prompt for input to decide.
if self.want_resume is True or \
self.should_resume(toppath):
log.warn('Resuming interrupted import of %s' % toppath)
self._is_resuming[toppath] = True
else:
# Clear progress; we're starting from the top.
progress_reset(toppath)
# The importer task class.
@ -815,24 +860,11 @@ def read_tasks(session):
in the user-specified list of paths. In the case of a singleton
import, yields single-item tasks instead.
"""
# Look for saved incremental directories.
if session.config['incremental']:
incremental_skipped = 0
history_dirs = history_get()
skipped = 0
for toppath in session.paths:
# Determine if we want to resume import of the toppath
resuming = False
if session.want_resume and has_progress(toppath):
# Either accept immediately or prompt for input to decide.
if session.want_resume is True or \
session.should_resume(toppath):
log.warn('Resuming interrupted import of %s' % toppath)
resuming = True
else:
# Clear progress; we're starting from the top.
progress_reset(toppath)
session.ask_resume(toppath)
# Extract archives.
archive_task = None
@ -856,7 +888,13 @@ def read_tasks(session):
# Check whether the path is to a file.
if not os.path.isdir(syspath(toppath)):
if resuming and progress_element(toppath, toppath):
# FIXME remove duplicate code. We could put the debug
# statement into `session.alread_imported` but I don't feel
# comfortable triggering an action in a query.
if session.already_imported(toppath, toppath):
log.debug(u'Skipping previously-imported path: {0}'
.format(displayable_path(toppath)))
skipped += 1
continue
try:
@ -878,40 +916,33 @@ def read_tasks(session):
for _, items in autotag.albums_in_dir(toppath):
all_items += items
if all_items:
if session.already_imported(toppath, [toppath]):
log.debug(u'Skipping previously-imported path: {0}'
.format(displayable_path(toppath)))
skipped += 1
continue
yield ImportTask(toppath, [toppath], all_items)
yield SentinelImportTask(toppath)
continue
# Produce paths under this directory.
for paths, items in autotag.albums_in_dir(toppath):
# Skip according to progress.
if resuming \
and all(map(lambda p: progress_element(toppath, p), paths)):
continue
# When incremental, skip paths in the history.
if session.config['incremental'] and tuple(paths) in history_dirs:
log.debug(u'Skipping previously-imported path: {0}'
.format(displayable_path(paths)))
incremental_skipped += 1
continue
# Yield all the necessary tasks.
if session.config['singletons']:
for item in items:
# TODO Abstract all the progress and incremental
# stuff away!
if resuming and progress_element(toppath, item.path):
continue
if session.config['incremental'] \
and (item.path,) in history_dirs:
if session.already_imported(toppath, [item.path]):
log.debug(u'Skipping previously-imported path: {0}'
.format(displayable_path(paths)))
incremental_skipped += 1
skipped += 1
continue
yield SingletonImportTask(toppath, item)
yield SentinelImportTask(toppath, paths)
else:
if session.already_imported(toppath, paths):
log.debug(u'Skipping previously-imported path: {0}'
.format(displayable_path(paths)))
skipped += 1
continue
yield ImportTask(toppath, paths, items)
# Indicate the directory is finished.
@ -922,9 +953,8 @@ def read_tasks(session):
yield archive_task
# Show skipped directories.
if session.config['incremental'] and incremental_skipped:
log.info(u'Incremental import: skipped %i directories.' %
incremental_skipped)
if skipped:
log.info(u'Skipped {0} directories.'.format(skipped))
def query_tasks(session):