Switch to using libusb1 to enumerate devices on a linux system

This commit is contained in:
Kovid Goyal 2009-12-11 14:16:56 -07:00
parent 0a7c55588f
commit ee8b98a1e3
13 changed files with 287 additions and 114 deletions

View file

@ -42,6 +42,8 @@
- title: "E-book viewer: Scroll past page-break to maintain reading flow"
tickets: [3328]
- title: "Linux device detection: Switch to using libusb1 to enumerate devices on system."
bug fixes:
- title: "LRF Viewer: Handle LRF files with corrupted end-of-stream tags"
@ -73,6 +75,9 @@
- title: "Linux source install: Write path to bin dir into launcher scripts to make IPC more robust"
- title: "Fix PocketBook 360 driver on windows when no SD card is inserted"
tickets: [4182]
new recipes:
- title: Rzeczpospolita OnLine

View file

@ -136,6 +136,10 @@ def __init__(self, name, sources, **kwargs):
['calibre/devices/usbobserver/usbobserver.c'],
ldflags=['-framework', 'IOKit'])
)
if islinux:
extensions.append(Extension('libusb',
['calibre/devices/libusb.c'],
ldflags=['-lusb-1.0']))
if isunix:

View file

@ -57,6 +57,7 @@ def load_plugins():
for plugin in ['pictureflow', 'lzx', 'msdes', 'podofo', 'cPalmdoc',
'fontconfig', 'pdfreflow', 'progress_indicator'] + \
(['winutil'] if iswindows else []) + \
(['libusb'] if islinux else []) + \
(['usbobserver'] if isosx else []):
try:
p, err = __import__(plugin), ''

View file

@ -6,7 +6,7 @@
Embedded console for debugging.
'''
import sys, os
import sys, os, pprint
from calibre.utils.config import OptionParser
from calibre.constants import iswindows, isosx
from calibre import prints
@ -65,7 +65,7 @@ def debug_device_driver():
from calibre.devices.scanner import DeviceScanner
s = DeviceScanner()
s.scan()
print 'USB devices on system:', repr(s.devices)
print 'USB devices on system:\n', pprint.pprint(s.devices)
if iswindows:
wmi = __import__('wmi', globals(), locals(), [], -1)
drives = []
@ -91,7 +91,7 @@ def debug_device_driver():
connected_devices = []
for dev in device_plugins():
print 'Looking for', dev.__class__.__name__
connected = s.is_device_connected(dev)
connected = s.is_device_connected(dev, debug=True)
if connected:
connected_devices.append(dev)

View file

@ -97,9 +97,3 @@ class POCKETBOOK360(EB600):
OSX_MAIN_MEM = 'Philips Mass Storge Media'
OSX_CARD_A_MEM = 'Philips Mass Storge Media'
def windows_open_callback(self, drives):
if 'main' not in drives and 'carda' in drives:
drives['main'] = drives.pop('carda')
return drives

View file

@ -8,7 +8,8 @@
import os
from calibre.customize import Plugin
from calibre.constants import iswindows
from calibre.constants import iswindows, islinux
from calibre.devices.libusb1 import info
class DevicePlugin(Plugin):
"""
@ -88,7 +89,7 @@ def test_bcd(cls, bcdDevice, bcd):
return False
@classmethod
def is_usb_connected(cls, devices_on_system):
def is_usb_connected(cls, devices_on_system, debug=False):
'''
Return True if a device handled by this plugin is currently connected.
@ -116,7 +117,8 @@ def is_usb_connected(cls, devices_on_system):
else:
cbcd = cls.BCD
if cls.test_bcd(bcd, cbcd) and cls.can_handle((vid,
pid, bcd)):
pid, bcd),
debug=debug):
return True
return False
@ -138,7 +140,7 @@ def get_fdi(cls):
return ''
@classmethod
def can_handle(cls, device_info):
def can_handle(cls, device_info, debug=False):
'''
Optional method to perform further checks on a device to see if this driver
is capable of handling it. If it is not it should return False. This method
@ -149,6 +151,14 @@ def can_handle(cls, device_info):
:param device_info: On windows a device ID string. On Unix a tuple of
``(vendor_id, product_id, bcd)``.
'''
if islinux:
try:
if debug:
dev = info(*device_info)
print '\t', repr(dev)
except:
import traceback
traceback.print_exc()
return True
def open(self):

View file

@ -0,0 +1,195 @@
/*
:mod:`libusb` -- Pythonic interface to libusb
=====================================================
.. module:: fontconfig
:platform: Linux
:synopsis: Pythonic interface to the libusb library
.. moduleauthor:: Kovid Goyal <kovid@kovidgoyal.net> Copyright 2009
*/
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <libusb-1.0/libusb.h>
libusb_context *ctxt = NULL;
void cleanup() {
if (ctxt != NULL) {
libusb_exit(ctxt);
}
}
PyObject*
py_libusb_scan(PyObject *self, PyObject *args) {
libusb_device **list = NULL;
struct libusb_device_descriptor dev;
ssize_t ret = 0, i = 0;
PyObject *ans, *pydev, *t;
if (ctxt == NULL) return PyErr_NoMemory();
ret = libusb_get_device_list(ctxt, &list);
if (ret == LIBUSB_ERROR_NO_MEM) return PyErr_NoMemory();
ans = PyTuple_New(ret);
if (ans == NULL) return PyErr_NoMemory();
for (i = 0; i < ret; i++) {
if (libusb_get_device_descriptor(list[i], &dev) != 0) {
PyTuple_SET_ITEM(ans, i, Py_None);
continue;
}
pydev = PyTuple_New(3);
if (pydev == NULL) return PyErr_NoMemory();
t = PyInt_FromLong(dev.idVendor);
if (t == NULL) return PyErr_NoMemory();
PyTuple_SET_ITEM(pydev, 0, t);
t = PyInt_FromLong(dev.idProduct);
if (t == NULL) return PyErr_NoMemory();
PyTuple_SET_ITEM(pydev, 1, t);
t = PyInt_FromLong(dev.bcdDevice);
if (t == NULL) return PyErr_NoMemory();
PyTuple_SET_ITEM(pydev, 2, t);
PyTuple_SET_ITEM(ans, i, pydev);
}
libusb_free_device_list(list, 1);
return ans;
}
PyObject*
py_libusb_info(PyObject *self, PyObject *args) {
unsigned long idVendor, idProduct, bcdDevice;
ssize_t ret = 0, i = 0; int err = 0, n;
libusb_device **list = NULL;
libusb_device_handle *handle = NULL;
struct libusb_device_descriptor dev;
PyObject *ans, *t;
unsigned char data[1000];
if (ctxt == NULL) return PyErr_NoMemory();
if (!PyArg_ParseTuple(args, "LLL", &idVendor, &idProduct, &bcdDevice))
return NULL;
ret = libusb_get_device_list(ctxt, &list);
if (ret == LIBUSB_ERROR_NO_MEM) return PyErr_NoMemory();
ans = PyDict_New();
if (ans == NULL) return PyErr_NoMemory();
for (i = 0; i < ret; i++) {
if (libusb_get_device_descriptor(list[i], &dev) != 0) continue;
if (idVendor == dev.idVendor && idProduct == dev.idProduct && bcdDevice == dev.bcdDevice) {
err = libusb_open(list[i], &handle);
if (!err) {
if (dev.iManufacturer) {
n = libusb_get_string_descriptor_ascii(handle, dev.iManufacturer, data, 1000);
if (n == LIBUSB_ERROR_TIMEOUT) {
libusb_close(handle);
err = libusb_open(list[i], &handle);
if (err) break;
n = libusb_get_string_descriptor_ascii(handle, dev.iManufacturer, data, 1000);
}
if (n > 0) {
t = PyBytes_FromStringAndSize((const char*)data, n);
if (t == NULL) return PyErr_NoMemory();
//Py_INCREF(t);
if (PyDict_SetItemString(ans, "manufacturer", t) != 0) return PyErr_NoMemory();
}
}
if (dev.iProduct) {
n = libusb_get_string_descriptor_ascii(handle, dev.iProduct, data, 1000);
if (n == LIBUSB_ERROR_TIMEOUT) {
libusb_close(handle);
err = libusb_open(list[i], &handle);
if (err) break;
n = libusb_get_string_descriptor_ascii(handle, dev.iManufacturer, data, 1000);
}
if (n > 0) {
t = PyBytes_FromStringAndSize((const char*)data, n);
if (t == NULL) return PyErr_NoMemory();
//Py_INCREF(t);
if (PyDict_SetItemString(ans, "product", t) != 0) return PyErr_NoMemory();
}
}
if (dev.iSerialNumber) {
n = libusb_get_string_descriptor_ascii(handle, dev.iSerialNumber, data, 1000);
if (n == LIBUSB_ERROR_TIMEOUT) {
libusb_close(handle);
err = libusb_open(list[i], &handle);
if (err) break;
n = libusb_get_string_descriptor_ascii(handle, dev.iManufacturer, data, 1000);
}
if (n > 0) {
t = PyBytes_FromStringAndSize((const char*)data, n);
if (t == NULL) return PyErr_NoMemory();
//Py_INCREF(t);
if (PyDict_SetItemString(ans, "serial", t) != 0) return PyErr_NoMemory();
}
}
libusb_close(handle);
}
break;
}
}
libusb_free_device_list(list, 1);
if (err != 0) {
switch (err) {
case LIBUSB_ERROR_NO_MEM:
return PyErr_NoMemory();
case LIBUSB_ERROR_ACCESS:
PyErr_SetString(PyExc_ValueError, "Dont have permission to access this device");
return NULL;
case LIBUSB_ERROR_NO_DEVICE:
PyErr_SetString(PyExc_ValueError, "Device disconnected");
return NULL;
default:
PyErr_SetString(PyExc_ValueError, "Failed to open device");
return NULL;
}
}
return ans;
}
static
PyMethodDef libusb_methods[] = {
{"scan", py_libusb_scan, METH_VARARGS,
"scan()\n\n"
"Return USB devices currently connected to system as a tuple of "
"3-tuples. Each 3-tuple has (idVendor, idProduct, bcdDevice)."
},
{"info", py_libusb_info, METH_VARARGS,
"info(idVendor, idProduct, bcdDevice)\n\n"
"Return extra information about the specified device. "
},
{NULL, NULL, 0, NULL}
};
PyMODINIT_FUNC
initlibusb(void) {
PyObject *m;
m = Py_InitModule3(
"libusb", libusb_methods,
"Interface with USB devices on system."
);
if (m == NULL) return;
if (libusb_init(&ctxt) != 0) ctxt = NULL;
Py_AtExit(cleanup);
}

View file

@ -0,0 +1,27 @@
#!/usr/bin/env python
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
from calibre.constants import plugins
libusb, libusb_err = plugins['libusb']
def scan():
if libusb_err:
raise RuntimeError('Failed to load libusb1: '+libusb_err)
return set([x for x in libusb.scan() if x is not None])
def info(vendor, product, bcd):
if libusb_err:
raise RuntimeError('Failed to load libusb1: '+libusb_err)
a = libusb.info(vendor, product, bcd)
ans = {}
for k, v in a.items():
ans[k] = v.decode('ascii', 'replace')
return ans

View file

@ -47,9 +47,4 @@ def windows_sort_drives(self, drives):
return drives
def windows_open_callback(self, drives):
if 'main' not in drives and 'carda' in drives:
drives['main'] = drives.pop('carda')
return drives

View file

@ -5,10 +5,10 @@
manner.
'''
import sys, re, os
import sys
from calibre import iswindows, isosx, plugins
from calibre.devices import libusb
from calibre import iswindows, isosx, plugins, islinux
from calibre.devices import libusb1
osx_scanner = win_scanner = linux_scanner = None
@ -24,42 +24,15 @@
raise RuntimeError('Failed to load the usbobserver plugin: %s'%plugins['usbobserver'][1])
_usb_re = re.compile(r'Vendor\s*=\s*([0-9a-fA-F]+)\s+ProdID\s*=\s*([0-9a-fA-F]+)\s+Rev\s*=\s*([0-9a-fA-f.]+)')
_DEVICES = '/proc/bus/usb/devices'
def _linux_scanner():
raw = open(_DEVICES).read()
devices = []
device = None
for x in raw.splitlines():
x = x.strip()
if x.startswith('T:'):
if device:
devices.append(device)
device = []
if device is not None and x.startswith('P:'):
match = _usb_re.search(x)
if match is not None:
ven, prod, bcd = match.group(1), match.group(2), match.group(3)
ven, prod, bcd = int(ven, 16), int(prod, 16), int(bcd.replace('.', ''), 16)
device = [ven, prod, bcd]
if device:
devices.append(device)
return devices
if libusb.has_library:
linux_scanner = libusb.get_devices
else:
linux_scanner = _linux_scanner
linux_scanner = libusb1.scan
class DeviceScanner(object):
def __init__(self, *args):
if isosx and osx_scanner is None:
raise RuntimeError('The Python extension usbobserver must be available on OS X.')
if not (isosx or iswindows) and (not os.access(_DEVICES, os.R_OK) and not libusb.has_library):
raise RuntimeError('DeviceScanner requires %s or libusb to work.'%_DEVICES)
if islinux and libusb1.libusb_err:
raise RuntimeError('DeviceScanner requires libusb1 to work.')
self.scanner = win_scanner if iswindows else osx_scanner if isosx else linux_scanner
self.devices = []
@ -67,8 +40,8 @@ def scan(self):
'''Fetch list of connected USB devices from operating system'''
self.devices = self.scanner()
def is_device_connected(self, device):
return device.is_usb_connected(self.devices)
def is_device_connected(self, device, debug=False):
return device.is_usb_connected(self.devices, debug=debug)
def main(args=sys.argv):

View file

@ -295,9 +295,12 @@ def matches_q(drive, attr):
# This is typically needed when the device has the same
# WINDOWS_MAIN_MEM and WINDOWS_CARD_A_MEM in which case
# if teh devices is connected without a crad, the above
# if the devices is connected without a crad, the above
# will incorrectly identify the main mem as carda
# See for example the driver for the Nook
if 'main' not in drives and 'carda' in drives:
drives['main'] = drives.pop('carda')
drives = self.windows_open_callback(drives)
if drives.get('main', None) is None:

View file

@ -9,6 +9,8 @@
from calibre.customize.conversion import InputFormatPlugin, OptionRecommendation
from calibre.ebooks.pdf.pdftohtml import pdftohtml
from calibre.ebooks.metadata.opf2 import OPFCreator
from calibre.constants import plugins
pdfreflow, pdfreflow_err = plugins['pdfreflow']
class PDFInput(InputFormatPlugin):
@ -24,12 +26,27 @@ class PDFInput(InputFormatPlugin):
help=_('Scale used to determine the length at which a line should '
'be unwrapped. Valid values are a decimal between 0 and 1. The '
'default is 0.5, this is the median line length.')),
OptionRecommendation(name='new_pdf_engine', recommended_value=False,
help=_('Use the new PDF conversion engine.'))
])
def convert_new(self, stream, accelerators):
from calibre.ebooks.pdf.reflow import PDFDocument
if pdfreflow_err:
raise RuntimeError('Failed to load pdfreflow: ' + pdfreflow_err)
pdfreflow.reflow(stream.read())
xml = open('index.xml', 'rb').read()
PDFDocument(xml, self.opts, self.log)
return os.path.join(os.getcwd(), 'metadata.opf')
def convert(self, stream, options, file_ext, log,
accelerators):
log.debug('Converting file to html...')
# The main html file will be named index.html
self.opts, self.log = options, log
if options.new_pdf_engine:
return self.convert_new(stream, accelerators)
pdftohtml(os.getcwd(), stream.name, options.no_images)
from calibre.ebooks.metadata.meta import get_metadata

View file

@ -6,9 +6,6 @@
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import sys, os
from copy import deepcopy
from lxml import etree
class Font(object):
@ -23,7 +20,7 @@ class Text(object):
A = etree.XPath('descendant::a[@href]')
def __init__(self, text, font_map, classes, opts, log):
def __init__(self, text, font_map, opts, log):
self.opts, self.log = opts, log
self.font_map = font_map
self.top, self.left, self.width, self.height = map(float, map(text.get,
@ -33,38 +30,23 @@ def __init__(self, text, font_map, classes, opts, log):
self.color = self.font.color
self.font_family = self.font.family
for a in self.A(text):
href = a.get('href')
if href.startswith('index.'):
href = href.split('#')[-1]
a.set('href', '#page'+href)
self.text = etree.Element('span')
css = {'font_size':'%.1fpt'%self.font_size, 'color': self.color}
if css not in classes:
classes.append(css)
idx = classes.index(css)
self.text.set('class', 't%d'%idx)
if text.text:
self.text.text = text.text
for x in text:
self.text.append(deepcopy(x))
#print etree.tostring(self.text, encoding='utf-8', with_tail=False)
self.text_as_string = etree.tostring(text, method='text',
encoding=unicode)
class Page(object):
def __init__(self, page, font_map, classes, opts, log):
def __init__(self, page, font_map, opts, log):
self.opts, self.log = opts, log
self.font_map = font_map
self.number = int(page.get('number'))
self.top, self.left, self.width, self.height = map(float, map(page.get,
('top', 'left', 'width', 'height')))
self.width, self.height = map(float, map(page.get,
('width', 'height')))
self.id = 'page%d'%self.number
self.texts = []
for text in page.xpath('descendant::text'):
self.texts.append(Text(text, self.font_map, classes, self.opts, self.log))
self.texts.append(Text(text, self.font_map, self.opts, self.log))
class PDFDocument(object):
@ -77,51 +59,18 @@ def __init__(self, xml, opts, log):
self.fonts = []
self.font_map = {}
for spec in self.root.xpath('//fontspec'):
for spec in self.root.xpath('//fonts'):
self.fonts.append(Font(spec))
self.font_map[self.fonts[-1].id] = self.fonts[-1]
self.pages = []
self.page_map = {}
self.classes = []
for page in self.root.xpath('//page'):
page = Page(page, self.font_map, self.classes, opts, log)
page = Page(page, self.font_map, opts, log)
self.page_map[page.id] = page
self.pages.append(page)
def run(opts, pathtopdf, log):
from calibre.constants import plugins
pdfreflow, err = plugins['pdfreflow']
if pdfreflow is None:
raise RuntimeError('Failed to load PDF Reflow plugin: '+err)
data = open(pathtopdf, 'rb').read()
pdfreflow.reflow(data)
index = os.path.join(os.getcwdu(), 'index.xml')
xml = open(index, 'rb').read()
PDFDocument(xml, opts, log)
def option_parser():
from optparse import OptionParser
p = OptionParser()
p.add_option('-v', '--verbose', action='count', default=0)
return p
def main(args=sys.argv):
p = option_parser()
opts, args = p.parse_args(args)
from calibre.utils.logging import default_log
if len(args) < 2:
p.print_help()
default_log('No input PDF file specified', file=sys.stderr)
return 1
run(opts, args[1], default_log)
return 0