More work on the completion worker

This commit is contained in:
Kovid Goyal 2014-12-19 18:09:43 +05:30
parent 8d16c274fe
commit e1d7ce455b

View file

@ -7,14 +7,18 @@
__copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'
import cPickle, os, sys
from threading import Thread, Event
from threading import Thread, Event, Lock
from Queue import Queue
from contextlib import closing
from collections import namedtuple
from calibre.constants import iswindows
from calibre.gui2.tweak_book.completion.basic import Request
from calibre.utils.ipc import eintr_retry_call
COMPLETION_REQUEST = 'completion request'
CLEAR_REQUEST = 'clear request'
class CompletionWorker(Thread):
daemon = True
@ -24,9 +28,14 @@ def __init__(self, worker_entry_point='main'):
self.worker_entry_point = worker_entry_point
self.start()
self.main_queue = Queue()
self.result_queue = Queue()
self.reap_thread = None
self.shutting_down = False
self.connected = Event()
self.current_completion_request = None
self.latest_completion_request_id = None
self.request_count = 0
self.lock = Lock()
def launch_worker_process(self):
from calibre.utils.ipc.server import create_listener
@ -92,6 +101,35 @@ def run(self):
obj = self.main_queue.get()
if obj is None:
break
req_type, req_data = obj
try:
if req_type is COMPLETION_REQUEST:
with self.lock:
if self.current_completion_request is not None:
ccr, self.current_completion_request = self.current_completion_request, None
self.send_completion_request(ccr)
elif req_type is CLEAR_REQUEST:
self.send(req_data)
except EOFError:
break
except Exception:
import traceback
traceback.print_exc()
def send_completion_request(self, request):
self.send(request)
result = self.recv()
if result.request_id == self.latest_completion_request_id:
self.result_queue.put(result)
def clear_caches(self, cache_type=None):
self.main_queue.put((CLEAR_REQUEST, Request(None, 'clear_caches', cache_type, None)))
def queue_completion(self, request_id, completion_type, completion_data, query=None):
with self.lock:
self.current_completion_request = Request(request_id, completion_type, completion_data, query)
self.latest_completion_request_id = self.current_completion_request.id
self.main_queue.put((COMPLETION_REQUEST, None))
def shutdown(self):
self.shutting_down = True
@ -144,11 +182,12 @@ def main(control_conn, data_conn):
except Exception:
import traceback
ans, tb = None, traceback.format_exc()
result = Result(request.id, ans, tb)
try:
eintr_retry_call(control_conn.send, result)
except EOFError:
break
if request.id is not None:
result = Result(request.id, ans, tb)
try:
eintr_retry_call(control_conn.send, result)
except EOFError:
break
def test_main(control_conn, data_conn):
obj = control_conn.recv()