Move libprs500 from a multi-threaded to a multi-process architecture. This should greatly improve the stability and responsiveness of the GUI.

This commit is contained in:
Kovid Goyal 2008-01-27 03:09:45 +00:00
parent 449bc10862
commit c03508cf37
8 changed files with 188 additions and 84 deletions

View file

@ -49,6 +49,9 @@
],
}
if 'win32' in sys.platform.lower() or 'win64' in sys.platform.lower():
entry_points['console_scripts'].append('parallel = libprs500.parallel:main')
def _ep_to_script(ep, base='src'):
return (base+os.path.sep+re.search(r'.*=\s*(.*?):', ep).group(1).replace('.', '/')+'.py').strip()

View file

@ -27,7 +27,7 @@
from libprs500.translations.msgfmt import make
iswindows = 'win32' in sys.platform.lower()
iswindows = 'win32' in sys.platform.lower() or 'win64' in sys.platform.lower()
isosx = 'darwin' in sys.platform.lower()
islinux = not(iswindows or isosx)

View file

@ -14,7 +14,7 @@
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
'''Convert known websites into LRF files.'''
import sys, time, tempfile, shutil, os, logging, imp, inspect
import sys, time, tempfile, shutil, os, logging, imp, inspect, re
from urlparse import urlsplit
from libprs500 import __appname__, setup_cli_handlers, CommandLineError
@ -23,7 +23,7 @@
from libprs500.web.fetch.simple import create_fetcher
from libprs500.ebooks.lrf.web.profiles import DefaultProfile
from libprs500.ebooks.lrf.web.profiles import DefaultProfile, FullContentProfile, create_class
from libprs500.ebooks.lrf.web import builtin_profiles, available_profiles
@ -89,35 +89,39 @@ def process_profile(args, options, logger=None):
logger = logging.getLogger('web2lrf')
setup_cli_handlers(logger, level)
index = -1
if options.user_profile is not None:
path = os.path.abspath(options.user_profile)
name = os.path.splitext(os.path.basename(path))[0]
res = imp.find_module(name, [os.path.dirname(path)])
module = imp.load_module(name, *res)
classes = inspect.getmembers(module,
lambda x : inspect.isclass(x) and issubclass(x, DefaultProfile)\
and x is not DefaultProfile)
if not classes:
raise CommandLineError('Invalid user profile '+path)
builtin_profiles.append(classes[0][1])
available_profiles.append(name)
if len(args) < 2:
args.append(name)
args[1] = name
index = -1
if len(args) == 2:
try:
if isinstance(args[1], basestring):
if args[1] != 'default':
index = available_profiles.index(args[1])
except ValueError:
raise CommandLineError('Unknown profile: %s\nValid profiles: %s'%(args[1], available_profiles))
else:
raise CommandLineError('Only one profile at a time is allowed.')
if isinstance(args[1], basestring):
if len(args) == 2 and re.search(r'class\s+\S+\(\S+\)\s*\:', args[1]):
profile = create_class(args[1])
else:
if options.user_profile is not None:
path = os.path.abspath(options.user_profile)
name = os.path.splitext(os.path.basename(path))[0]
res = imp.find_module(name, [os.path.dirname(path)])
module = imp.load_module(name, *res)
classes = inspect.getmembers(module,
lambda x : inspect.isclass(x) and issubclass(x, DefaultProfile)\
and x is not DefaultProfile and x is not FullContentProfile)
if not classes:
raise CommandLineError('Invalid user profile '+path)
builtin_profiles.append(classes[0][1])
available_profiles.append(name)
if len(args) < 2:
args.append(name)
args[1] = name
index = -1
if len(args) == 2:
try:
if isinstance(args[1], basestring):
if args[1] != 'default':
index = available_profiles.index(args[1])
except ValueError:
raise CommandLineError('Unknown profile: %s\nValid profiles: %s'%(args[1], available_profiles))
else:
raise CommandLineError('Only one profile at a time is allowed.')
profile = DefaultProfile if index == -1 else builtin_profiles[index]
else:
profile = args[1]
profile = profile(logger, options.verbose, options.username, options.password)
if profile.browser is not None:
options.browser = profile.browser
@ -174,11 +178,7 @@ def process_profile(args, options, logger=None):
def main(args=sys.argv, logger=None):
parser = option_parser()
if not isinstance(args[-1], basestring): # Called from GUI
options, args2 = parser.parse_args(args[:-1])
args = args2 + [args[-1]]
else:
options, args = parser.parse_args(args)
options, args = parser.parse_args(args)
if len(args) > 2 or (len(args) == 1 and not options.user_profile):
parser.print_help()
return 1

View file

@ -17,7 +17,7 @@
from PyQt4.QtCore import SIGNAL
from PyQt4.QtGui import QDialog, QMessageBox
from libprs500.ebooks.lrf.web.profiles import FullContentProfile, DefaultProfile
from libprs500.ebooks.lrf.web.profiles import FullContentProfile, create_class
from libprs500.gui2.dialogs.user_profiles_ui import Ui_Dialog
from libprs500.gui2 import qstring_to_unicode, error_dialog, question_dialog
@ -52,7 +52,7 @@ def edit_profile(self, current, previous):
current = previous
src = current.user_data[1]
if 'class BasicUserProfile' in src:
profile = self.create_class(src)
profile = create_class(src)
self.populate_options(profile)
self.stacks.setCurrentIndex(0)
self.toggle_mode_button.setText('Switch to Advanced mode')
@ -122,22 +122,12 @@ def populate_source_code(self):
src = self.options_to_profile().replace('BasicUserProfile', 'AdvancedUserProfile')
self.source_code.setPlainText(src)
@classmethod
def create_class(cls, src):
environment = {'FullContentProfile':FullContentProfile, 'DefaultProfile':DefaultProfile}
exec src in environment
for item in environment.values():
if hasattr(item, 'build_index'):
if item.__name__ not in ['DefaultProfile', 'FullContentProfile']:
return item
def add_profile(self, clicked):
if self.stacks.currentIndex() == 0:
src, title = self.options_to_profile()
try:
self.create_class(src)
create_class(src)
except Exception, err:
error_dialog(self, 'Invalid input',
'<p>Could not create profile. Error:<br>%s'%str(err)).exec_()
@ -146,7 +136,7 @@ def add_profile(self, clicked):
else:
src = qstring_to_unicode(self.source_code.toPlainText())
try:
title = self.create_class(src).title
title = create_class(src).title
except Exception, err:
error_dialog(self, 'Invalid input',
'<p>Could not create profile. Error:<br>%s'%str(err)).exec_()

View file

@ -12,13 +12,14 @@
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import traceback, logging, cStringIO
import traceback, logging
from PyQt4.QtCore import QAbstractTableModel, QMutex, QObject, SIGNAL, Qt, \
QVariant, QThread, QModelIndex, QSettings
from PyQt4.QtGui import QIcon
from libprs500.gui2 import NONE
from libprs500.parallel import Server
class JobException(Exception):
pass
@ -44,11 +45,7 @@ def __init__(self, id, description, mutex, func, *args, **kwargs):
self.logger = logging.getLogger('Job #'+str(id))
self.logger.setLevel(logging.DEBUG)
self.is_locked = False
self.log_dest = cStringIO.StringIO()
handler = logging.StreamHandler(self.log_dest)
handler.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter('[%(levelname)s] %(filename)s:%(lineno)s: %(message)s'))
self.logger.addHandler(handler)
self.log = None
def lock(self):
@ -85,23 +82,23 @@ def run(self):
self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'),
self.id, self.description, self.result, exception, last_traceback)
MPServer = None
class ConversionJob(Job):
''' Jobs that invlove conversion of content. Synchronous. '''
''' Jobs that involve conversion of content. Synchronous. '''
def run(self):
self.lock()
last_traceback, exception = None, None
try:
try:
self.kwargs['logger'] = self.logger
self.result = self.func(*self.args, **self.kwargs)
self.result, exception, last_traceback, self.log = \
MPServer.run(self.id, self.func, self.args, self.kwargs)
except Exception, err:
exception = err
last_traceback = traceback.format_exc()
last_traceback = traceback.format_exc()
exception = (exception.__class__.__name__, unicode(str(err), 'utf8', 'replace'))
finally:
self.unlock()
self.emit(SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'),
self.id, self.description, self.result, exception, last_traceback, self.log_dest.getvalue())
self.id, self.description, self.result, exception, last_traceback, self.log)
@ -125,6 +122,8 @@ def __init__(self):
self.cleanup = {}
self.device_job_icon = QVariant(QIcon(':/images/reader.svg'))
self.job_icon = QVariant(QIcon(':/images/jobs.svg'))
global MPServer
MPServer = Server()
def terminate_device_jobs(self):
changed = False
@ -143,8 +142,8 @@ def create_job(self, job_class, description, lock, *args, **kwargs):
try:
self.next_id += 1
job = job_class(self.next_id, description, lock, *args, **kwargs)
QObject.connect(job, SIGNAL('finished()'), self.cleanup_jobs)
QObject.connect(job, SIGNAL('status_update(int, int)'), self.status_update)
QObject.connect(job, SIGNAL('finished()'), self.cleanup_jobs, Qt.QueuedConnection)
QObject.connect(job, SIGNAL('status_update(int, int)'), self.status_update, Qt.QueuedConnection)
self.beginInsertRows(QModelIndex(), len(self.jobs), len(self.jobs))
self.jobs[self.next_id] = job
self.endInsertRows()
@ -162,7 +161,7 @@ def has_device_jobs(self):
def has_jobs(self):
return len(self.jobs.values()) > 0
def run_conversion_job(self, slot, callable, *args, **kwargs):
def run_conversion_job(self, slot, callable, args=[], **kwargs):
'''
Run a conversion job.
@param slot: The function to call with the job result.
@ -174,10 +173,10 @@ def run_conversion_job(self, slot, callable, *args, **kwargs):
job = self.create_job(ConversionJob, desc, self.conversion_lock,
callable, *args, **kwargs)
QObject.connect(job, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'),
self.job_done)
self.job_done, Qt.QueuedConnection)
if slot:
QObject.connect(job, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'),
slot)
slot, Qt.QueuedConnection)
priority = self.PRIORITY[str(QSettings().value('conversion job priority',
QVariant('Normal')).toString())]
job.start(priority)
@ -195,10 +194,10 @@ def run_device_job(self, slot, callable, *args, **kwargs):
desc += kwargs.pop('job_extra_description', '')
job = self.create_job(DeviceJob, desc, self.device_lock, callable, *args, **kwargs)
QObject.connect(job, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'),
self.job_done)
self.job_done, Qt.QueuedConnection)
if slot:
QObject.connect(job, SIGNAL('jobdone(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)'),
slot)
slot, Qt.QueuedConnection)
job.start()
return job.id

View file

@ -13,7 +13,6 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.Warning
import os, sys, textwrap, cStringIO, collections, traceback, shutil
from functools import partial
from PyQt4.QtCore import Qt, SIGNAL, QObject, QCoreApplication, \
QSettings, QVariant, QSize, QThread
@ -24,8 +23,6 @@
from libprs500 import __version__, __appname__, islinux, sanitize_file_name
from libprs500.ptempfile import PersistentTemporaryFile
from libprs500.ebooks.metadata.meta import get_metadata
from libprs500.ebooks.lrf.web.convert_from import main as web2lrf
from libprs500.ebooks.lrf.any.convert_from import main as _any2lrf
from libprs500.devices.errors import FreeSpaceError
from libprs500.devices.interface import Device
from libprs500.gui2 import APP_UID, warning_dialog, choose_files, error_dialog, \
@ -56,7 +53,7 @@
from libprs500.ebooks.metadata import MetaInformation
from libprs500.ebooks import BOOK_EXTENSIONS
any2lrf = partial(_any2lrf, gui_mode=True)
class Main(MainWindow, Ui_MainWindow):
@ -557,7 +554,7 @@ def fetch_news(self, data):
if data['password']:
args.extend(['--password', data['password']])
args.append(data['profile'])
id = self.job_manager.run_conversion_job(self.news_fetched, web2lrf, args=args,
id = self.job_manager.run_conversion_job(self.news_fetched, 'web2lrf', args=[args],
job_description='Fetch news from '+data['title'])
self.conversion_jobs[id] = (pt, 'lrf')
self.status_bar.showMessage('Fetching news from '+data['title'], 2000)
@ -606,9 +603,8 @@ def convert_single(self, checked):
of.close()
cmdline.extend(['-o', of.name])
cmdline.append(pt.name)
id = self.job_manager.run_conversion_job(self.book_converted,
any2lrf, args=cmdline,
'any2lrf', args=[cmdline],
job_description='Convert book:'+d.title())
@ -763,7 +759,7 @@ def device_job_exception(self, id, description, exception, formatted_traceback):
_('There was a temporary error talking to the device. Please unplug and reconnect the device and or reboot.')).show()
return
print >>sys.stderr, 'Error in job:', description.encode('utf8')
print >>sys.stderr, exception
print >>sys.stderr, exception[0], exception[1]
print >>sys.stderr, formatted_traceback.encode('utf8')
if not self.device_error_dialog.isVisible():
msg = u'<p><b>%s</b>: '%(exception.__class__.__name__,) + unicode(str(exception), 'utf8', 'replace') + u'</p>'
@ -776,15 +772,17 @@ def device_job_exception(self, id, description, exception, formatted_traceback):
def conversion_job_exception(self, id, description, exception, formatted_traceback, log):
print >>sys.stderr, 'Error in job:', description.encode('utf8')
print >>sys.stderr, log.encode('utf8')
if log:
print >>sys.stderr, log.encode('utf8')
print >>sys.stderr, exception
print >>sys.stderr, formatted_traceback.encode('utf8')
msg = u'<p><b>%s</b>: '%(exception.__class__.__name__,) + unicode(str(exception), 'utf8', 'replace') + u'</p>'
msg = u'<p><b>%s</b>: %s</p>'%exception
msg += u'<p>Failed to perform <b>job</b>: '+description
msg += u'<p>Detailed <b>traceback</b>:<pre>'
msg += formatted_traceback + '</pre>'
msg += '<p><b>Log:</b></p><pre>'
msg += log
if log:
msg += log
ConversionErrorDialog(self, 'Conversion Error', msg, show=True)

View file

@ -17,7 +17,7 @@
from libprs500.gui2.dialogs.password import PasswordDialog
from libprs500.ebooks.lrf.web import builtin_profiles, available_profiles
from libprs500.gui2.dialogs.user_profiles import UserProfiles
from libprs500.ebooks.lrf.web.profiles import create_class
class NewsAction(QAction):
@ -59,6 +59,9 @@ def fetch_news(self, profile, module=None):
module = profile.title
username = password = None
fetch = True
if isinstance(profile, basestring):
module = profile
profile = create_class(module)
if profile.needs_subscription:
d = PasswordDialog(self, module + ' info dialog',
'<p>Please enter your username and password for %s<br>If you do not have one, please subscribe to get access to the articles.<br/> Click OK to proceed.'%(profile.title,))
@ -68,7 +71,7 @@ def fetch_news(self, profile, module=None):
else:
fetch = False
if fetch:
data = dict(profile=profile, title=profile.title, username=username, password=password)
data = dict(profile=module, title=profile.title, username=username, password=password)
self.emit(SIGNAL('fetch_news(PyQt_PyObject)'), data)
def set_custom_feeds(self, feeds):
@ -90,7 +93,7 @@ def __init__(self):
self.connect(self, SIGNAL('triggered(QAction*)'), self.launch)
def launch(self, action):
profile = UserProfiles.create_class(action.script)
profile = action.script
self.emit(SIGNAL('start_news_fetch(PyQt_PyObject, PyQt_PyObject)'),
profile, None)

111
src/libprs500/parallel.py Normal file
View file

@ -0,0 +1,111 @@
## Copyright (C) 2008 Kovid Goyal kovid@kovidgoyal.net
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
'''
Used to run jobs in parallel in separate processes.
'''
import re, sys, tempfile, os, subprocess, cPickle, cStringIO, traceback, atexit, time, binascii
from functools import partial
from libprs500.ebooks.lrf.any.convert_from import main as any2lrf
from libprs500.ebooks.lrf.web.convert_from import main as web2lrf
from libprs500 import iswindows
PARALLEL_FUNCS = {
'any2lrf' : partial(any2lrf, gui_mode=True),
'web2lrf' : web2lrf,
}
Popen = subprocess.Popen
python = sys.executable
if iswindows:
import win32con
Popen = partial(Popen, creationflags=win32con.CREATE_NO_WINDOW)
if hasattr(sys, 'frozen'):
python = os.path.join(os.path.dirname(python), 'parallel.exe')
else:
python = os.path.join(os.path.dirname(python), 'Scripts\\parallel.exe')
def cleanup(tdir):
try:
import shutil
shutil.rmtree(tdir, True)
except:
pass
class Server(object):
def __init__(self):
self.tdir = tempfile.mkdtemp('', 'libprs500_IPC_')
atexit.register(cleanup, self.tdir)
self.stdout = {}
def run(self, job_id, func, args=(), kwdargs={}):
job_id = str(job_id)
job_dir = os.path.join(self.tdir, job_id)
if os.path.exists(job_dir):
raise ValueError('Cannot run job. The job_id %s has already been used.')
os.mkdir(job_dir)
self.stdout[job_id] = cStringIO.StringIO()
job_data = os.path.join(job_dir, 'job_data.pickle')
cPickle.dump((func, args, kwdargs), open(job_data, 'wb'), -1)
prefix = ''
if hasattr(sys, 'frameworks_dir'):
fd = getattr(sys, 'frameworks_dir')
prefix = 'import sys; sys.frameworks_dir = "%s"; sys.frozen = "macosx_app"; '%fd
if fd not in os.environ['PATH']:
os.environ['PATH'] += ':'+fd
cmd = prefix + 'from libprs500.parallel import run_job; run_job(\'%s\')'%binascii.hexlify(job_data)
p = Popen((python, '-c', cmd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while p.returncode is None:
self.stdout[job_id].write(p.stdout.readline())
p.poll()
time.sleep(0.5) # Wait for half a second
self.stdout[job_id].write(p.stdout.read())
job_result = os.path.join(job_dir, 'job_result.pickle')
if not os.path.exists(job_result):
result, exception, traceback = None, ('ParallelRuntimeError', 'The worker process died unexpectedly.'), ''
else:
result, exception, traceback = cPickle.load(open(job_result, 'rb'))
log = self.stdout[job_id].getvalue()
self.stdout.pop(job_id)
return result, exception, traceback, log
def run_job(job_data):
job_data = binascii.unhexlify(job_data)
job_result = os.path.join(os.path.dirname(job_data), 'job_result.pickle')
func, args, kwdargs = cPickle.load(open(job_data, 'rb'))
func = PARALLEL_FUNCS[func]
exception, tb = None, None
try:
result = func(*args, **kwdargs)
except (Exception, SystemExit), err:
result = None
exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace'))
tb = traceback.format_exc()
cPickle.dump((result, exception, tb), open(job_result, 'wb'))
def main():
src = sys.argv[2]
job_data = re.search(r'run_job\(\'([a-f0-9A-F]+)\'\)', src).group(1)
run_job(job_data)
return 0
if __name__ == '__main__':
sys.exit(main())