diff --git a/src/calibre/gui2/main.py b/src/calibre/gui2/main.py index 5dafe3c683..e73ada9cb8 100644 --- a/src/calibre/gui2/main.py +++ b/src/calibre/gui2/main.py @@ -1241,9 +1241,10 @@ def main(args=sys.argv): if single_instance is not None and single_instance.is_running() and \ single_instance.send_message('launched:'+repr(args)): return 0 - + extra = '' if iswindows else \ + ('If you\'re sure it is not running, delete the file %s.'%os.path.expanduser('~/.calibre_calibre GUI.lock')) QMessageBox.critical(None, 'Cannot Start '+__appname__, - '
%s is already running.
'%__appname__) + '%s is already running. %s
'%(__appname__, extra)) return 1 initialize_file_icon_provider() try: diff --git a/src/calibre/parallel.py b/src/calibre/parallel.py index d3531c5525..d33728042b 100644 --- a/src/calibre/parallel.py +++ b/src/calibre/parallel.py @@ -6,7 +6,7 @@ ''' Used to run jobs in parallel in separate processes. Features output streaming, support for progress notification as well as job killing. The worker processes -are controlled via a simple protocol run over TCP/IP sockets. The control happens +are controlled via a simple protocol run over sockets. The control happens mainly in two class, :class:`Server` and :class:`Overseer`. The worker is encapsulated in the function :function:`worker`. Every worker process has the environment variable :envvar:`CALIBRE_WORKER` defined. @@ -25,7 +25,7 @@ is buffered and asynchronous to prevent the job from being IO bound. ''' import sys, os, gc, cPickle, traceback, atexit, cStringIO, time, signal, \ - subprocess, socket, collections, binascii, re, tempfile, thread + subprocess, socket, collections, binascii, re, tempfile, thread, tempfile from select import select from functools import partial from threading import RLock, Thread, Event @@ -33,6 +33,7 @@ from calibre.ptempfile import PersistentTemporaryFile from calibre import iswindows, detect_ncpus, isosx +DEBUG = False #: A mapping from job names to functions that perform the jobs PARALLEL_FUNCS = { @@ -51,11 +52,14 @@ isfrozen = hasattr(sys, 'frozen') +isworker = False win32event = __import__('win32event') if iswindows else None win32process = __import__('win32process') if iswindows else None msvcrt = __import__('msvcrt') if iswindows else None +SOCKET_TYPE = socket.AF_UNIX if not iswindows else socket.AF_INET + class WorkerStatus(object): ''' A platform independent class to control child processes. Provides the @@ -223,6 +227,7 @@ def spawn_free_spirit_windows(self, arg, type='free_spirit'): mother = WorkerMother() +_comm_lock = RLock() def write(socket, msg, timeout=5): ''' Write a message on socket. If `msg` is unicode, it is encoded in utf-8. @@ -230,22 +235,29 @@ def write(socket, msg, timeout=5): `msg` is broken into chunks of size 4096 and sent. The :function:`read` function automatically re-assembles the chunks into whole message. ''' - if isinstance(msg, unicode): - msg = msg.encode('utf-8') - length = None - while len(msg) > 0: - if length is None: - length = len(msg) - chunk = ('%-12d'%length) + msg[:4096-12] - msg = msg[4096-12:] - else: - chunk, msg = msg[:4096], msg[4096:] - w = select([], [socket], [], timeout)[1] - if not w: - raise RuntimeError('Write to socket timed out') - if socket.sendall(chunk) is not None: - raise RuntimeError('Failed to write chunk to socket') - + if isworker: + _comm_lock.acquire() + try: + if isinstance(msg, unicode): + msg = msg.encode('utf-8') + if DEBUG: + print >>sys.__stdout__, 'write(%s):'%('worker' if isworker else 'overseer'), repr(msg) + length = None + while len(msg) > 0: + if length is None: + length = len(msg) + chunk = ('%-12d'%length) + msg[:4096-12] + msg = msg[4096-12:] + else: + chunk, msg = msg[:4096], msg[4096:] + w = select([], [socket], [], timeout)[1] + if not w: + raise RuntimeError('Write to socket timed out') + if socket.sendall(chunk) is not None: + raise RuntimeError('Failed to write chunk to socket') + finally: + if isworker: + _comm_lock.release() def read(socket, timeout=5): ''' @@ -253,24 +265,33 @@ def read(socket, timeout=5): function. Raises a `RuntimeError` if the message is corrpted. Can return an empty string. ''' - buf = cStringIO.StringIO() - length = None - while select([socket],[],[],timeout)[0]: - msg = socket.recv(4096) - if not msg: - break - if length is None: - length, msg = int(msg[:12]), msg[12:] - buf.write(msg) - if buf.tell() >= length: - break - if not length: - return '' - msg = buf.getvalue()[:length] - if len(msg) < length: - raise RuntimeError('Corrupted packet received') - - return msg + if isworker: + _comm_lock.acquire() + try: + buf = cStringIO.StringIO() + length = None + while select([socket],[],[],timeout)[0]: + msg = socket.recv(4096) + if not msg: + break + if length is None: + length, msg = int(msg[:12]), msg[12:] + buf.write(msg) + if buf.tell() >= length: + break + if not length: + if DEBUG: + print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), 'nothing' + return '' + msg = buf.getvalue()[:length] + if len(msg) < length: + raise RuntimeError('Corrupted packet received') + if DEBUG: + print >>sys.__stdout__, 'read(%s):'%('worker' if isworker else 'overseer'), repr(msg) + return msg + finally: + if isworker: + _comm_lock.release() class RepeatingTimer(Thread): ''' @@ -306,11 +327,13 @@ class Overseer(object): INTERVAL = 0.1 def __init__(self, server, port, timeout=5): - self.worker_status = mother.spawn_worker('127.0.0.1:%d'%port) + self.worker_status = mother.spawn_worker('127.0.0.1:'+str(port)) self.socket = server.accept()[0] # Needed if terminate called hwen interpreter is shutting down self.os = os self.signal = signal + self.on_probation = False + self.terminated = False self.working = False self.timeout = timeout @@ -329,6 +352,7 @@ def __init__(self, server, port, timeout=5): def terminate(self): 'Kill worker process.' + self.terminated = True try: if self.socket: self.write('STOP:') @@ -363,7 +387,9 @@ def read(self, timeout=None): def __eq__(self, other): return hasattr(other, 'process') and hasattr(other, 'worker_pid') and self.worker_pid == other.worker_pid - def __bool__(self): + def is_viable(self): + if self.terminated: + return False return self.worker_status.is_alive() def select(self, timeout=0): @@ -386,6 +412,7 @@ def initialize_job(self, job): self.output = job.output if callable(job.output) else sys.stdout.write self.progress = job.progress if callable(job.progress) else None self.job = job + self.last_report = time.time() def control(self): ''' @@ -397,8 +424,21 @@ def control(self): ''' if select([self.socket],[],[],0)[0]: msg = self.read() + if msg: + self.on_probation = False + self.last_report = time.time() + else: + if self.on_probation: + self.terminate() + return Result(None, ControlError('Worker process died unexpectedly'), '') + else: + self.on_probation = True + return word, msg = msg.partition(':')[0], msg.partition(':')[-1] - if word == 'RESULT': + if word == 'PING': + self.write('OK') + return + elif word == 'RESULT': self.write('OK') return Result(cPickle.loads(msg), None, None) elif word == 'OUTPUT': @@ -421,11 +461,11 @@ def control(self): return Result(None, *cPickle.loads(msg)) else: self.terminate() - return Result(None, ControlError('Worker sent invalid msg: %s', repr(msg)), '') - if not self.worker_status.is_alive(): - return Result(None, ControlError('Worker process died unexpectedly with returncode: %d'%self.process.returncode), '') + return Result(None, ControlError('Worker sent invalid msg: %s'%repr(msg)), '') + if not self.worker_status.is_alive() or time.time() - self.last_report > 180: + self.terminate() + return Result(None, ControlError('Worker process died unexpectedly with returncode: %s'%str(self.process.returncode)), '') - class Job(object): @@ -458,18 +498,23 @@ class Server(Thread): KILL_RESULT = Overseer.KILL_RESULT START_PORT = 10013 + PID = os.getpid() + def __init__(self, number_of_workers=detect_ncpus()): Thread.__init__(self) self.setDaemon(True) - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.port = self.START_PORT + self.server_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM) + self.port = tempfile.mktemp(prefix='calibre_server')+'_%d_'%self.PID if not iswindows else self.START_PORT while True: try: - self.server_socket.bind(('localhost', self.port)) + address = ('localhost', self.port) if iswindows else self.port + self.server_socket.bind(address) break - except: - self.port += 1 + except socket.error: + self.port += (1 if iswindows else '1') + if not iswindows: + atexit.register(os.unlink, self.port) self.server_socket.listen(5) self.number_of_workers = number_of_workers self.pool, self.jobs, self.working, self.results = [], collections.deque(), [], {} @@ -525,7 +570,7 @@ def run(self): res = Result(None, unicode(err), traceback.format_exc()) job.done(res) o = None - if o: + if o and o.is_viable(): with self.working_lock: self.working.append(o) @@ -542,7 +587,7 @@ def run(self): done.append(o) for o in done: self.working.remove(o) - if o: + if o and o.is_viable(): with self.pool_lock: self.pool.append(o) @@ -601,9 +646,11 @@ def __init__(self, socket): self.socket = socket self.wbuf, self.pbuf = [], [] self.wlock, self.plock = RLock(), RLock() + self.last_report = None self.timer = RepeatingTimer(0.5, self.send, 'BufferedSender') self.timer.start() + def write(self, msg): if not isinstance(msg, basestring): msg = unicode(msg) @@ -623,20 +670,31 @@ def send(self): if not select([], [self.socket], [], 30)[1]: print >>sys.__stderr__, 'Cannot pipe to overseer' return - + + reported = False with self.wlock: if self.wbuf: msg = cPickle.dumps(self.wbuf, -1) self.wbuf = [] write(self.socket, 'OUTPUT:'+msg) read(self.socket, 10) + reported = True with self.plock: if self.pbuf: msg = cPickle.dumps(self.pbuf, -1) self.pbuf = [] write(self.socket, 'PROGRESS:'+msg) - read(self.socket, 10) + read(self.socket, 10) + reported = True + + if self.last_report is not None: + if reported: + self.last_report = time.time() + elif time.time() - self.last_report > 60: + write(self.socket, 'PING:') + read(self.socket, 10) + self.last_report = time.time() def notify(self, percent, msg=''): with self.plock: @@ -652,19 +710,25 @@ def get_func(name): return func, kwdargs, notification def work(client_socket, func, args, kwdargs): - func, kargs, notification = get_func(func) - if notification is not None and hasattr(sys.stdout, 'notify'): - kargs[notification] = sys.stdout.notify - kargs.update(kwdargs) - res = func(*args, **kargs) - if hasattr(sys.stdout, 'send'): - sys.stdout.send() - return res + sys.stdout.last_report = time.time() + try: + func, kargs, notification = get_func(func) + if notification is not None and hasattr(sys.stdout, 'notify'): + kargs[notification] = sys.stdout.notify + kargs.update(kwdargs) + res = func(*args, **kargs) + if hasattr(sys.stdout, 'send'): + sys.stdout.send() + return res + finally: + sys.stdout.last_report = None + time.sleep(5) # Give any in progress BufferedSend time to complete def worker(host, port): - client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - client_socket.connect((host, port)) + client_socket = socket.socket(SOCKET_TYPE, socket.SOCK_STREAM) + address = (host, port) if iswindows else port + client_socket.connect(address) write(client_socket, 'CALIBRE_WORKER:%d'%os.getpid()) msg = read(client_socket, timeout=10) if msg != 'OK': @@ -685,10 +749,11 @@ def worker(host, port): try: result = work(client_socket, func, args, kwdargs) write(client_socket, 'RESULT:'+ cPickle.dumps(result)) - except (Exception, SystemExit), err: + except BaseException, err: exception = (err.__class__.__name__, unicode(str(err), 'utf-8', 'replace')) tb = traceback.format_exc() - write(client_socket, 'ERROR:'+cPickle.dumps((exception, tb),-1)) + msg = 'ERROR:'+cPickle.dumps((exception, tb),-1) + write(client_socket, msg) if read(client_socket, 10) != 'OK': break gc.collect() @@ -714,11 +779,13 @@ def free_spirit(path): func(*args, **kargs) def main(args=sys.argv): + global isworker + isworker = True args = args[1].split(':') if len(args) == 1: free_spirit(binascii.unhexlify(re.sub(r'[^a-f0-9A-F]', '', args[0]))) else: - worker(args[0].replace("'", ''), int(args[1])) + worker(args[0].replace("'", ''), int(args[1]) if iswindows else args[1]) return 0 if __name__ == '__main__': diff --git a/src/calibre/utils/fontconfig.py b/src/calibre/utils/fontconfig.py index 3e74362720..4275d03479 100644 --- a/src/calibre/utils/fontconfig.py +++ b/src/calibre/utils/fontconfig.py @@ -130,36 +130,40 @@ class FcValue(Structure): _init_error = None _initialized = False -from threading import Timer -def _do_init(): - # Initialize the fontconfig library. This has to be done manually - # for the OS X bundle as it may have its own private fontconfig. - if hasattr(sys, 'frameworks_dir'): - config_dir = os.path.join(os.path.dirname(getattr(sys, 'frameworks_dir')), 'Resources', 'fonts') - if isinstance(config_dir, unicode): - config_dir = config_dir.encode(sys.getfilesystemencoding()) - config = lib.FcConfigCreate() - if not lib.FcConfigParseAndLoad(config, os.path.join(config_dir, 'fonts.conf'), 1): - _init_error = 'Could not parse the fontconfig configuration' +from threading import Thread + +class FontScanner(Thread): + def run(self): + # Initialize the fontconfig library. This has to be done manually + # for the OS X bundle as it may have its own private fontconfig. + if getattr(sys, 'frameworks_dir', False): + config_dir = os.path.join(os.path.dirname(getattr(sys, 'frameworks_dir')), 'Resources', 'fonts') + if isinstance(config_dir, unicode): + config_dir = config_dir.encode(sys.getfilesystemencoding()) + config = lib.FcConfigCreate() + if not lib.FcConfigParseAndLoad(config, os.path.join(config_dir, 'fonts.conf'), 1): + _init_error = 'Could not parse the fontconfig configuration' + return + if not lib.FcConfigBuildFonts(config): + _init_error = 'Could not build fonts' + return + if not lib.FcConfigSetCurrent(config): + _init_error = 'Could not set font config' + return + elif not lib.FcInit(): + _init_error = _('Could not initialize the fontconfig library') return - if not lib.FcConfigBuildFonts(config): - _init_error = 'Could not build fonts' - return - if not lib.FcConfigSetCurrent(config): - _init_error = 'Could not set font config' - return - elif not lib.FcInit(): - _init_error = _('Could not initialize the fontconfig library') - return - global _initialized - _initialized = True + global _initialized + _initialized = True -_init_timer = Timer(0.1, _do_init) -_init_timer.start() +_scanner = FontScanner() +_scanner.start() def join(): - _init_timer.join() + _scanner.join(120) + if _scanner.isAlive(): + raise RuntimeError('Scanning for system fonts seems to have hung. Try again in a little while.') if _init_error is not None: raise RuntimeError(_init_error)