1
0
Fork 0
mirror of https://github.com/kemayo/leech synced 2025-12-06 08:22:56 +01:00

Merge pull request #9 from Zomega/clickify

Switch from using raw argparser to using click.
This commit is contained in:
David Lynch 2018-08-17 21:33:23 -05:00 committed by GitHub
commit fb8d6cf0d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 243 additions and 87 deletions

View file

@ -19,10 +19,20 @@ My recommended setup process is:
Usage
---
Basic
$ python3 leech.py [[URL]]
A new file will appear named `Title of the Story.epub`.
This is equivalent to the slightly longer
$ python3 leech.py download [[URL]]
Flushing the cache
$ python3 leech.py flush
If you want to put it on a Kindle you'll have to convert it. I'd recommend [Calibre](http://calibre-ebook.com/), though you could also try using [kindlegen](http://www.amazon.com/gp/feature.html?docId=1000765211) directly.
Supports

183
leech.py
View file

@ -1,84 +1,39 @@
#!/usr/bin/env python
#!/usr/bin/env python3
import argparse
import sys
import json
import click
import http.cookiejar
import json
import logging
import requests
import requests_cache
import sqlite3
from click_default_group import DefaultGroup
from functools import reduce
import sites
import ebook
import requests
import requests_cache
__version__ = 1
__version__ = 2
USER_AGENT = 'Leech/%s +http://davidlynch.org' % __version__
logger = logging.getLogger(__name__)
def leech(url, session, filename=None, args=None):
# we have: a page, which could be absolutely any part of a story, or not a story at all
# check a bunch of things which are completely ff.n specific, to get text from it
site, url = sites.get(url)
if not site:
raise Exception("No site handler found")
logger.info("Handler: %s (%s)", site, url)
handler = site(session, args=args)
with open('leech.json') as config_file:
config = json.load(config_file)
login = config.get('logins', {}).get(site.__name__, False)
if login:
handler.login(login)
cover_options = config.get('cover', {})
story = handler.extract(url)
if not story:
raise Exception("Couldn't extract story")
return ebook.generate_epub(story, filename, cover_options=cover_options)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('url', help="url of a story to fetch", nargs='?')
parser.add_argument('--filename', help="output filename (the title is used if this isn't provided)")
parser.add_argument('--no-cache', dest='cache', action='store_false')
parser.add_argument('--flush', dest='flush', action='store_true')
parser.add_argument('-v', '--verbose', help="verbose output", action='store_true', dest='verbose')
parser.set_defaults(cache=True, flush=False, verbose=False)
args, extra_args = parser.parse_known_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
def configure_logging(verbose):
if verbose:
logging.basicConfig(
level=logging.DEBUG,
format="[%(name)s @ %(levelname)s] %(message)s"
)
else:
logging.basicConfig(
level=logging.INFO,
format="[%(name)s] %(message)s"
)
if args.flush:
requests_cache.install_cache('leech')
requests_cache.clear()
conn = sqlite3.connect('leech.sqlite')
conn.execute("VACUUM")
conn.close()
logger.info("Flushed cache")
sys.exit()
if not args.url:
sys.exit("URL is required")
if args.cache:
def create_session(cache):
if cache:
session = requests_cache.CachedSession('leech', expire_after=4 * 3600)
else:
session = requests.Session()
@ -92,6 +47,108 @@ if __name__ == '__main__':
session.headers.update({
'User-agent': USER_AGENT
})
return session
filename = leech(args.url, filename=args.filename, session=session, args=extra_args)
logger.info("File created: %s", filename)
def load_on_disk_options(site):
try:
with open('leech.json') as store_file:
store = json.load(store_file)
login = store.get('logins', {}).get(site.__name__, False)
configured_site_options = store.get('site_options', {}).get(site.__name__, {})
except FileNotFoundError:
logger.info("Unable to locate leech.json. Continuing assuming it does not exist.")
login = False
configured_site_options = {}
return configured_site_options, login
def create_options(site, site_options, unused_flags):
"""Compiles options provided from multiple different sources
(e.g. on disk, via flags, via defaults, via JSON provided as a flag value)
into a single options object."""
default_site_options = site.get_default_options()
flag_specified_site_options = site.interpret_site_specific_options(**unused_flags)
configured_site_options, login = load_on_disk_options(site)
overridden_site_options = json.loads(site_options)
# The final options dictionary is computed by layering the default, configured,
# and overridden, and flag-specified options together in that order.
options = dict(
list(default_site_options.items()) +
list(configured_site_options.items()) +
list(overridden_site_options.items()) +
list(flag_specified_site_options.items())
)
return options, login
def open_story(site, url, session, login, options):
handler = site(
session,
options=options
)
if login:
handler.login(login)
story = handler.extract(url)
if not story:
raise Exception("Couldn't extract story")
return story
def site_specific_options(f):
option_list = sites.list_site_specific_options()
return reduce(lambda cmd, decorator: decorator(cmd), [f] + option_list)
@click.group(cls=DefaultGroup, default='download', default_if_no_args=True)
def cli():
"""Top level click group. Uses click-default-group to preserve most behavior from leech v1."""
pass
@cli.command()
@click.option('--verbose', '-v', is_flag=True, help="verbose output")
def flush(verbose):
"""Flushes the contents of the cache."""
configure_logging(verbose)
requests_cache.install_cache('leech')
requests_cache.clear()
conn = sqlite3.connect('leech.sqlite')
conn.execute("VACUUM")
conn.close()
logger.info("Flushed cache")
@cli.command()
@click.argument('url')
@click.option(
'--site-options',
default='{}',
help='JSON object encoding any site specific option.'
)
@click.option('--cache/--no-cache', default=True)
@click.option('--verbose', '-v', is_flag=True, help="Verbose debugging output")
@site_specific_options # Includes other click.options specific to sites
def download(url, site_options, cache, verbose, **other_flags):
"""Downloads a story and saves it on disk as a ebpub ebook."""
configure_logging(verbose)
session = create_session(cache)
site, url = sites.get(url)
options, login = create_options(site, site_options, other_flags)
story = open_story(site, url, session, login, options)
filename = ebook.generate_epub(story)
logger.info("File created: " + filename)
if __name__ == '__main__':
cli()

View file

@ -15,3 +15,5 @@ requests-cache==0.4.13
six==1.10.0
urllib3==1.22
webencodings==0.5.1
click==6.7
click-default-group==1.2

View file

@ -1,7 +1,7 @@
import click
import glob
import os
import argparse
import uuid
import time
import logging
@ -66,11 +66,44 @@ class Site:
extracting the content of a story from said site.
"""
session = attr.ib()
args = attr.ib()
footnotes = attr.ib(default=attr.Factory(list), init=False)
options = attr.ib(default=attr.Factory(
lambda site: site.get_default_options(),
True
))
def __attrs_post_init__(self):
self.options = self._parse_args(self.args)
@staticmethod
def get_site_specific_option_defs():
"""Returns a list of click.option objects to add to CLI commands.
It is best practice to ensure that these names are reasonably unique
to ensure that they do not conflict with the core options, or other
sites' options. It is OK for different site's options to have the
same name, but pains should be taken to ensure they remain semantically
similar in meaning.
"""
return []
@classmethod
def get_default_options(cls):
options = {}
for option in cls.get_site_specific_option_defs():
options[option.name] = option.default
return options
@classmethod
def interpret_site_specific_options(cls, **kwargs):
"""Returns options summarizing CLI flags provided.
Only includes entries the user has explicitly provided as flags
/ will not contain default values. For that, use get_default_options().
"""
options = {}
for option in cls.get_site_specific_option_defs():
option_value = kwargs[option.name]
if option_value is not None:
options[option.name] = option_value
return options
@staticmethod
def matches(url):
@ -92,14 +125,6 @@ class Site:
def login(self, login_details):
raise NotImplementedError()
def _parse_args(self, args):
parser = argparse.ArgumentParser()
self._add_arguments(parser)
return parser.parse_args(args)
def _add_arguments(self, parser):
pass
def _soup(self, url, method='html5lib', retry=3, retry_delay=10, **kw):
page = self.session.get(url, **kw)
if not page:
@ -153,6 +178,32 @@ class Site:
return spoiler_link
@attr.s(hash=True)
class SiteSpecificOption:
"""Represents a site-specific option that can be configured.
Will be added to the CLI as a click.option -- many of these
fields correspond to click.option arguments."""
name = attr.ib()
flag_pattern = attr.ib()
type = attr.ib(default=None)
help = attr.ib(default=None)
default = attr.ib(default=None)
def as_click_option(self):
return click.option(
str(self.name),
str(self.flag_pattern),
type=self.type,
# Note: This default not matching self.default is intentional.
# It ensures that we know if a flag was explicitly provided,
# which keeps it from overriding options set in leech.json etc.
# Instead, default is used in site_cls.get_default_options()
default=None,
help=self.help if self.help is not None else ""
)
class SiteException(Exception):
pass
@ -166,10 +217,23 @@ def get(url):
for site_class in _sites:
match = site_class.matches(url)
if match:
logger.info("Handler: %s (%s)", site_class, match)
return site_class, match
raise NotImplementedError("Could not find a handler for " + url)
def list_site_specific_options():
"""Returns a list of all site's click options, which will be presented to the user."""
# Ensures that duplicate options are not added twice.
# Especially important for subclassed sites (e.g. Xenforo sites)
options = set()
for site_class in _sites:
options.update(site_class.get_site_specific_option_defs())
return [option.as_click_option() for option in options]
# And now, a particularly hacky take on a plugin system:
# Make an __all__ out of all the python files in this directory that don't start
# with __. Then import * them.

View file

@ -3,7 +3,7 @@
import datetime
import re
import logging
from . import register, Site, SiteException, Section, Chapter
from . import register, Site, SiteException, SiteSpecificOption, Section, Chapter
logger = logging.getLogger(__name__)
@ -13,6 +13,35 @@ class XenForo(Site):
domain = False
@staticmethod
def get_site_specific_option_defs():
return [
SiteSpecificOption(
'include_index',
'--include-index/--no-include-index',
default=False,
help="If true, the post marked as an index will be included as a chapter."
),
SiteSpecificOption(
'skip_spoilers',
'--skip-spoilers/--include-spoilers',
default=True,
help="If true, do not transcribe any tags that are marked as a spoiler."
),
SiteSpecificOption(
'offset',
'--offset',
type=int,
help="The chapter index to start in the chapter marks."
),
SiteSpecificOption(
'limit',
'--limit',
type=int,
help="The chapter to end at at in the chapter marks."
),
]
@classmethod
def matches(cls, url):
match = re.match(r'^(https?://%s/threads/[^/]*\d+)/?.*' % cls.domain, url)
@ -43,7 +72,7 @@ class XenForo(Site):
mark for mark in self._chapter_list(url)
if '/members' not in mark.get('href') and '/threadmarks' not in mark.get('href')
]
marks = marks[self.options.offset:self.options.limit]
marks = marks[self.options['offset']:self.options['limit']]
for idx, mark in enumerate(marks, 1):
href = mark.get('href')
@ -101,7 +130,7 @@ class XenForo(Site):
if not links:
raise SiteException("No links in index?")
if self.options.include_index:
if self.options['include_index']:
fake_link = self._new_tag('a', href=url)
fake_link.string = "Index"
links.insert(0, fake_link)
@ -157,7 +186,7 @@ class XenForo(Site):
# spoilers don't work well, so turn them into epub footnotes
for idx, spoiler in enumerate(post.find_all(class_='ToggleTriggerAnchor')):
spoiler_title = spoiler.find(class_='SpoilerTitle')
if self.options.spoilers:
if self.options['skip_spoilers']:
link = self._footnote(spoiler.find(class_='SpoilerTarget').extract(), chapterid)
if spoiler_title:
link.string = spoiler_title.get_text()
@ -180,12 +209,6 @@ class XenForo(Site):
return datetime.datetime.strptime(maybe_date['title'], "%b %d, %Y at %I:%M %p")
raise SiteException("No date", maybe_date)
def _add_arguments(self, parser):
parser.add_argument('--include-index', dest='include_index', action='store_true', default=False)
parser.add_argument('--offset', dest='offset', type=int, default=None)
parser.add_argument('--limit', dest='limit', type=int, default=None)
parser.add_argument('--skip-spoilers', dest='spoilers', action='store_false', default=True)
class XenForoIndex(XenForo):
@classmethod
@ -204,8 +227,8 @@ class SpaceBattles(XenForo):
@register
class SpaceBattlesIndex(XenForoIndex):
domain = 'forums.spacebattles.com'
class SpaceBattlesIndex(SpaceBattles, XenForoIndex):
pass
@register