Bring in dependencies from ffnet-chrome-cache-fetch

This commit is contained in:
Jim Miller 2021-01-31 16:32:34 -06:00
commit 5ce7c00ac3
23 changed files with 2934 additions and 0 deletions

View file

@ -0,0 +1,27 @@
import os
from .basebrowsercache import BrowserCacheException, BaseBrowserCache
from .simplecache import SimpleCache
from .chromediskcache import ChromeDiskCache
class BrowserCache:
"""Class to read web browser cache"""
def __init__(self, cache_dir=None):
"""Constructor for BrowserCache"""
# import of child classes have to be inside the def to avoid circular import error
for browser_cache_class in [SimpleCache, ChromeDiskCache]:
self.browser_cache = browser_cache_class.new_browser_cache(cache_dir)
if self.browser_cache is not None:
break
if self.browser_cache is None:
raise BrowserCacheException("Directory does not contain a known browser cache type: '%s",
os.path.abspath(cache_dir))
def get_data(self, url):
d = self.browser_cache.get_data(url)
if not d:
## newer browser caches separate by calling domain to not
## leak information about past visited pages by showing
## quick retrieval.
d = self.browser_cache.get_data("_dk_https://fanfiction.net https://fanfiction.net "+url)
return d

View file

@ -0,0 +1,37 @@
import os
class BrowserCacheException(Exception):
pass
class BaseBrowserCache:
"""Base class to read various formats of web browser cache file"""
def __init__(self, cache_dir=None):
"""Constructor for BaseBrowserCache"""
if cache_dir is None:
raise BrowserCacheException("BrowserCache must be initialized with a valid browser cache directory path")
self.cache_dir = os.path.realpath(os.path.expanduser(cache_dir))
if not os.path.isdir(self.cache_dir):
raise BrowserCacheException("BrowserCache cache_dir does not exist: '%s (%s)'" %
(cache_dir, self.cache_dir))
@staticmethod
def is_cache_dir(cache_dir):
return os.path.isdir(cache_dir) # This method only makes sense when overridden
@classmethod
def new_browser_cache(cls, cache_dir):
"""Return new instance of this BrowserCache class, or None if supplied directory not the correct cache type"""
cache_dir = os.path.realpath(os.path.expanduser(cache_dir))
if cls.is_cache_dir(cache_dir):
try:
return cls(cache_dir)
except BrowserCacheException:
return None
return None
def get_data(self, url):
""" Return decoded data for specified key (a URL string) or None """
return None # must be overridden

View file

@ -0,0 +1,47 @@
import os
import struct
from ..chromagnon.cacheParse import ChromeCache
from . import BrowserCacheException, BaseBrowserCache
class ChromeDiskCacheException(BrowserCacheException):
pass
INDEX_MAGIC_NUMBER = 0xC103CAC3
BLOCK_MAGIC_NUMBER = 0xC104CAC3
class ChromeDiskCache(BaseBrowserCache):
"""Class to access data stream in Chrome Disk Cache format cache files"""
def __init__(self, cache_dir=None):
"""Constructor for ChromeDiskCache"""
BaseBrowserCache.__init__(self,cache_dir)
if not self.is_cache_dir(cache_dir):
raise ChromeDiskCacheException("Directory does not contain a Chrome Disk Cache: '%s'" % cache_dir)
self.chromagnon_cache = ChromeCache(cache_dir)
@staticmethod
def is_cache_dir(cache_dir):
"""Return True only if a directory is a valid Cache for this class"""
if not os.path.isdir(cache_dir):
return False
index_path = os.path.join(cache_dir, "index")
if not os.path.isfile(index_path):
return False
with open(index_path, 'rb') as index_file:
if struct.unpack('I', index_file.read(4))[0] != INDEX_MAGIC_NUMBER:
return False
data0_path = os.path.join(cache_dir, "data_0")
if not os.path.isfile(data0_path):
return False
with open(data0_path, 'rb') as data0_file:
if struct.unpack('I', data0_file.read(4))[0] != BLOCK_MAGIC_NUMBER:
return False
return True
def get_data(self, url):
""" Return decoded data for specified key (a URL string) or None """
return self.chromagnon_cache.get_cached_file(url)

View file

@ -0,0 +1,178 @@
import os
import struct
import hashlib
import gzip
import zlib
import glob
from . import BaseBrowserCache, BrowserCacheException
from ..six import ensure_text
import logging
logger = logging.getLogger(__name__)
class SimpleCacheException(BrowserCacheException):
pass
try:
from brotli import decompress as brotli_decompress
except ImportError:
# Calibre doesn't include brotli, so use packaged brotlipython
# which is waaaay slower, but pure python.
from calibre_plugins.fanficfare_plugin.brotlidecpy import decompress as brotli_decompress
# def brotli_decompress(inbuf):
# # wants the output, too, but returns it
# return brotlidec(inbuf, [])
SIMPLE_EOF = struct.Struct('<QLLLL') # magic_number, flags, crc32, stream_size, padding
SIMPLE_EOF_SIZE = SIMPLE_EOF.size
FLAG_HAS_SHA256 = 2
META_HEADER = struct.Struct('<LLQQL')
META_HEADER_SIZE = META_HEADER.size
ENTRY_MAGIC_NUMBER = 0xfcfb6d1ba7725c30
EOF_MAGIC_NUMBER = 0xf4fa6f45970d41d8
THE_REAL_INDEX_MAGIC_NUMBER = 0x656e74657220796f
class SimpleCache(BaseBrowserCache):
"""Class to access data stream in Chrome Simple Cache format cache files"""
def __init__(self, cache_dir=None):
"""Constructor for SimpleCache"""
BaseBrowserCache.__init__(self,cache_dir)
## already called from parent.new_browser_cache()
# if not self.is_cache_dir(cache_dir):
# raise SimpleCacheException("Directory does not contain a Chrome Simple Cache: '%s'" % cache_dir)
@staticmethod
def is_cache_dir(cache_dir):
"""Return True only if a directory is a valid Cache for this class"""
if not os.path.isdir(cache_dir):
return False
index_file = os.path.join(cache_dir, "index")
if not (os.path.isfile(index_file) and os.path.getsize(index_file) == 24):
return False
real_index_file = os.path.join(cache_dir, "index-dir", "the-real-index")
if not os.path.isfile(real_index_file):
return False
with open(real_index_file, 'rb') as index_file:
if struct.unpack('QQ', index_file.read(16))[1] != THE_REAL_INDEX_MAGIC_NUMBER:
return False
try:
# logger.debug("\n\nStarting cache check\n\n")
for en_fl in glob.iglob(os.path.join(cache_dir, '????????????????_?')):
k = _validate_entry_file(en_fl)
# if b'fanfiction.net/' in k:
# logger.debug("file:%s"%en_fl)
# logger.debug("_validate_entry_file:%s"%k)
## Is this return meant to be inside the loop? Only
## checks one file as is; but checking every file
## seems excessive?
return True
except SimpleCacheException:
return False
return False
def get_data(self, url):
""" Return decoded data for specified key (a URL string) or None """
if isinstance(url, str):
url = url.encode('utf-8')
glob_pattern = os.path.join(self.cache_dir, _key_hash(url) + '_?')
# logger.debug("url key hash:%s"%_key_hash(url))
# logger.debug("glob pattern:%s"%glob_pattern)
# because hash collisions are so rare, this will usually only find zero or one file,
# so there is no real savings to be had by reading the index file instead of going straight to the entry files
for en_fl in glob.glob(glob_pattern):
try:
# logger.debug("en_fl:%s"%en_fl)
file_key = _validate_entry_file(en_fl)
if file_key == url:
return _get_decoded_data(en_fl)
except SimpleCacheException:
pass
return None
# Here come the utility functions for the class
import codecs
def _key_hash(key):
"""Compute hash of key as used to generate name of cache entry file"""
# py2 lacks convenient .hex() method on bytes
return ensure_text(codecs.encode(hashlib.sha1(key).digest()[7::-1],'hex'))
# return hashlib.sha1(key).digest()[7::-1].hex()
def _validate_entry_file(path):
"""Validate that a file is a cache entry file, return the URL (key) if valid"""
# read from path into SimpleFileHeader, use key_length field to determine size of key, return key as byte string
shformat = struct.Struct('<QLLLL')
shformat_size = shformat.size
with open(path, "rb") as entry_file:
data = entry_file.read(shformat_size)
(magic, version, key_length, key_hash, padding) = shformat.unpack(data)
if magic != ENTRY_MAGIC_NUMBER:
raise SimpleCacheException("Supposed cache entry file did not start with correct magic number: "
"'%s'" % path)
key = entry_file.read(key_length)
if _key_hash(key) != os.path.basename(path).split('_')[0]:
raise SimpleCacheException("Cache entry file name '%s' does not match hash of key '%s'" %
os.path.basename(path), key)
return key
def _skip_to_start_of_stream(entry_file):
"""Assuming reader is at end of a stream back up to beginning of stream, returning size of data in stream"""
entry_file.seek(-SIMPLE_EOF_SIZE, os.SEEK_CUR)
data = entry_file.read(SIMPLE_EOF_SIZE)
(magic, flags, crc32, stream_size, padding) = SIMPLE_EOF.unpack(data)
if magic != EOF_MAGIC_NUMBER:
raise SimpleCacheException("Supposed cache entry file did not end with EOF header with correct magic "
"number: '%s'" % entry_file.name)
seek_back = stream_size + SIMPLE_EOF_SIZE
if flags & FLAG_HAS_SHA256:
seek_back += 32
entry_file.seek(-seek_back, os.SEEK_CUR)
return stream_size
def _get_data_from_entry_file(path):
""" Read the contents portion (stream 1 data) from the instance's cache entry file. Return a byte string """
with open(path, "rb") as entry_file:
entry_file.seek(0, os.SEEK_END)
_skip_to_start_of_stream(entry_file)
stream_size = _skip_to_start_of_stream(entry_file)
ret = entry_file.read(stream_size)
return ret
def _get_headers(path):
""" Read the HTTP header (stream 0 data) from a cache entry file """
with open(path, "rb") as entry_file:
entry_file.seek(0, os.SEEK_END)
_skip_to_start_of_stream(entry_file)
# read stream 0 meta header:
# uint32 info_size, uint32 flags, uint64 request_time, uint64 response_time, uint32 header_size
data = entry_file.read(META_HEADER_SIZE)
(info_size, flags, request_time, response_time, header_size) = META_HEADER.unpack(data)
# read header_size bytes to get the raw bytes of the HTTP headers
# parse the raw bytes into a HttpHeader structure:
# It is a series of null terminated strings, first is status code,e.g., "HTTP/1.1 200"
# the rest are name:value pairs used to populate the headers dict.
strings = entry_file.read(header_size).decode('utf-8').split('\0')
headers = dict(s.split(':', 1) for s in strings[1:] if ':' in s)
return headers
def _get_decoded_data(path):
""" Read and decompress if necessary data from a cache entry file. Returns a byte string """
headers = _get_headers(path)
encoding = headers.get('content-encoding', '').strip().lower()
data = _get_data_from_entry_file(path)
if encoding == 'gzip':
return gzip.decompress(data)
elif encoding == 'br':
return brotli_decompress(data)
elif encoding == 'deflate':
return zlib.decompress(data)
return data

View file

@ -0,0 +1,24 @@
Copyright (c) 2012, Jean-Rémy Bancel <jean-remy.bancel@telecom-paristech.org>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the Chromagon Project nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL Jean-Rémy Bancel BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -0,0 +1,89 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2012, Jean-Rémy Bancel <jean-remy.bancel@telecom-paristech.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Chromagon Project nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL Jean-Rémy Bancel BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Python implementation of SuperFastHash algorithm
Maybe it is better to use c_uint32 to limit the size of variables to 32bits
instead of using 0xFFFFFFFF mask.
"""
from __future__ import absolute_import
from __future__ import print_function
import binascii
import sys
def get16bits(data):
"""Returns the first 16bits of a string"""
return int(binascii.hexlify(data[1::-1]), 16)
def superFastHash(data):
hash = length = len(data)
if length == 0:
return 0
rem = length & 3
length >>= 2
while length > 0:
hash += get16bits(data) & 0xFFFFFFFF
tmp = (get16bits(data[2:])<< 11) ^ hash
hash = ((hash << 16) & 0xFFFFFFFF) ^ tmp
data = data[4:]
hash += hash >> 11
hash = hash & 0xFFFFFFFF
length -= 1
if rem == 3:
hash += get16bits (data)
hash ^= (hash << 16) & 0xFFFFFFFF
hash ^= (int(binascii.hexlify(data[2:]), 16) << 18) & 0xFFFFFFFF
hash += hash >> 11
elif rem == 2:
hash += get16bits (data)
hash ^= (hash << 11) & 0xFFFFFFFF
hash += hash >> 17
elif rem == 1:
hash += int(binascii.hexlify(data[0:]), 16)
hash ^= (hash << 10) & 0xFFFFFFFF
hash += hash >> 1
hash = hash & 0xFFFFFFFF
hash ^= (hash << 3) & 0xFFFFFFFF
hash += hash >> 5
hash = hash & 0xFFFFFFFF
hash ^= (hash << 4) & 0xFFFFFFFF
hash += hash >> 17
hash = hash & 0xFFFFFFFF
hash ^= (hash << 25) & 0xFFFFFFFF
hash += hash >> 6
hash = hash & 0xFFFFFFFF
return hash
if __name__ == "__main__":
print("%08x"%superFastHash(sys.argv[1]))

View file

View file

@ -0,0 +1,92 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2012, Jean-Rémy Bancel <jean-remy.bancel@telecom-paristech.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Chromagon Project nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL Jean-Rémy Bancel BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Chrome Cache Address
See /net/disk_cache/addr.h for design details
"""
class CacheAddressError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class CacheAddress():
"""
Object representing a Chrome Cache Address
"""
SEPARATE_FILE = 0
RANKING_BLOCK = 1
BLOCK_256 = 2
BLOCK_1024 = 3
BLOCK_4096 = 4
typeArray = [("Separate file", 0),
("Ranking block file", 36),
("256 bytes block file", 256),
("1k bytes block file", 1024),
("4k bytes block file", 4096)]
def __init__(self, uint_32, path):
"""
Parse the 32 bits of the uint_32
"""
if uint_32 == 0:
raise CacheAddressError("Null Address")
#XXX Is self.binary useful ??
self.addr = uint_32
self.path = path
# Checking that the MSB is set
self.binary = bin(uint_32)
if len(self.binary) != 34:
raise CacheAddressError("Uninitialized Address")
self.blockType = int(self.binary[3:6], 2)
# If it is an address of a separate file
if self.blockType == CacheAddress.SEPARATE_FILE:
self.fileSelector = "f_%06x" % int(self.binary[6:], 2)
elif self.blockType == CacheAddress.RANKING_BLOCK:
self.fileSelector = "data_" + str(int(self.binary[10:18], 2))
else:
self.entrySize = CacheAddress.typeArray[self.blockType][1]
self.contiguousBlock = int(self.binary[8:10], 2)
self.fileSelector = "data_" + str(int(self.binary[10:18], 2))
self.blockNumber = int(self.binary[18:], 2)
def __str__(self):
string = hex(self.addr) + " ("
if self.blockType >= CacheAddress.BLOCK_256:
string += str(self.contiguousBlock) +\
" contiguous blocks in "
string += CacheAddress.typeArray[self.blockType][0] +\
" : " + self.fileSelector + ")"
return string

View file

@ -0,0 +1,64 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2012, Jean-Rémy Bancel <jean-remy.bancel@telecom-paristech.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Chromagon Project nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL Jean-Rémy Bancel BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import
import unittest
from . import cacheAddress
class CacheAddressTest(unittest.TestCase):
def testFileType(self):
"""Parse Block Type From Address"""
address = cacheAddress.CacheAddress(0x8000002A)
self.assertEqual(address.blockType,
cacheAddress.CacheAddress.SEPARATE_FILE)
address = cacheAddress.CacheAddress(0x9DFF0000)
self.assertEqual(address.blockType,
cacheAddress.CacheAddress.RANKING_BLOCK)
address = cacheAddress.CacheAddress(0xA0010003)
self.assertEqual(address.blockType,
cacheAddress.CacheAddress.BLOCK_256)
address = cacheAddress.CacheAddress(0xBDFF0108)
self.assertEqual(address.blockType,
cacheAddress.CacheAddress.BLOCK_1024)
address = cacheAddress.CacheAddress(0xCDFF0108)
self.assertEqual(address.blockType,
cacheAddress.CacheAddress.BLOCK_4096)
def testFilename(self):
"""Parse Filename from Address"""
address = cacheAddress.CacheAddress(0x8000002A)
self.assertEqual(address.fileSelector,
"f_0002A")
address = cacheAddress.CacheAddress(0xA001135C)
self.assertEqual(address.fileSelector,
"data_1")
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,85 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2012, Jean-Rémy Bancel <jean-remy.bancel@telecom-paristech.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Chromagon Project nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL Jean-Rémy Bancel BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Parse the header of a Chrome Cache File
See http://www.chromium.org/developers/design-documents/network-stack/disk-cache
for design details
"""
from __future__ import absolute_import
import struct
from six.moves import range
class CacheBlock():
"""
Object representing a block of the cache. It can be the index file or any
other block type : 256B, 1024B, 4096B, Ranking Block.
See /net/disk_cache/disk_format.h for details.
"""
INDEX_MAGIC = 0xC103CAC3
BLOCK_MAGIC = 0xC104CAC3
INDEX = 0
BLOCK = 1
def __init__(self, filename):
"""
Parse the header of a cache file
"""
with open(filename, 'rb') as header:
# Read Magic Number
magic = struct.unpack('I', header.read(4))[0]
# print("magic number:%s"%hex(magic))
if magic == CacheBlock.BLOCK_MAGIC:
self.type = CacheBlock.BLOCK
header.seek(2, 1)
self.version = struct.unpack('h', header.read(2))[0]
self.header = struct.unpack('h', header.read(2))[0]
self.nextFile = struct.unpack('h', header.read(2))[0]
self.blockSize = struct.unpack('I', header.read(4))[0]
self.entryCount = struct.unpack('I', header.read(4))[0]
self.entryMax = struct.unpack('I', header.read(4))[0]
self.empty = []
for _ in range(4):
self.empty.append(struct.unpack('I', header.read(4))[0])
self.position = []
for _ in range(4):
self.position.append(struct.unpack('I', header.read(4))[0])
elif magic == CacheBlock.INDEX_MAGIC:
self.type = CacheBlock.INDEX
header.seek(2, 1)
self.version = struct.unpack('h', header.read(2))[0]
self.entryCount = struct.unpack('I', header.read(4))[0]
self.byteCount = struct.unpack('I', header.read(4))[0]
self.lastFileCreated = "f_%06x" % \
struct.unpack('I', header.read(4))[0]
header.seek(4*2, 1)
self.tableSize = struct.unpack('I', header.read(4))[0]
else:
raise Exception("Invalid Chrome Cache File")

View file

@ -0,0 +1,124 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2012, Jean-Rémy Bancel <jean-remy.bancel@telecom-paristech.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Chromagon Project nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL Jean-Rémy Bancel BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Stores the data fetched in the cache.
Parse the HTTP header if asked.
"""
from __future__ import absolute_import
import re
import shutil
import struct
import os
from . import cacheAddress
from six.moves import range
class CacheData():
"""
Retrieve data at the given address
Can save it to a separate file for export
"""
HTTP_HEADER = 0
UNKNOWN = 1
def __init__(self, address, size, isHTTPHeader=False):
"""
It is a lazy evaluation object : the file is open only if it is
needed. It can parse the HTTP header if asked to do so.
See net/http/http_util.cc LocateStartOfStatusLine and
LocateEndOfHeaders for details.
"""
self.size = size
self.address = address
self.type = CacheData.UNKNOWN
if isHTTPHeader and\
self.address.blockType != cacheAddress.CacheAddress.SEPARATE_FILE:
# Getting raw data
string = b""
with open(os.path.join(self.address.path,self.address.fileSelector), 'rb') as block:
block.seek(8192 + self.address.blockNumber*self.address.entrySize)
for _ in range(self.size):
string += struct.unpack('c', block.read(1))[0]
# Finding the beginning of the request
start = re.search(b"HTTP", string)
if start == None:
return
else:
string = string[start.start():]
# Finding the end (some null characters : verified by experience)
end = re.search(b"\x00\x00", string)
if end == None:
return
else:
string = string[:end.end()-2]
# Creating the dictionary of headers
self.headers = {}
for line in string.split(b'\0'):
stripped = line.split(b':')
self.headers[stripped[0].lower()] = \
b':'.join(stripped[1:]).strip()
self.type = CacheData.HTTP_HEADER
def save(self, filename=None):
"""Save the data to the specified filename"""
if self.address.blockType == cacheAddress.CacheAddress.SEPARATE_FILE:
shutil.copy(os.path.join(self.address.path,self.address.fileSelector),
filename)
else:
with open(filename, 'wb') as output, open(os.path.join(self.address.path,self.address.fileSelector), 'rb') as block:
block.seek(8192 + self.address.blockNumber*self.address.entrySize)
output.write(block.read(self.size))
def data(self):
"""Returns a string representing the data"""
if self.address.blockType == cacheAddress.CacheAddress.SEPARATE_FILE:
with open(os.path.join(self.address.path,self.address.fileSelector), 'rb') as infile:
data = infile.read()
else:
with open(os.path.join(self.address.path,self.address.fileSelector), 'rb') as block:
block.seek(8192 + self.address.blockNumber*self.address.entrySize)
data = block.read(self.size)#.decode('utf-8',errors='ignore')
return data
def __str__(self):
"""
Display the type of cacheData
"""
if self.type == CacheData.HTTP_HEADER:
if 'content-type' in self.headers:
return "HTTP Header %s" % self.headers['content-type']
else:
return "HTTP Header"
else:
return "Data"

View file

@ -0,0 +1,152 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2012, Jean-Rémy Bancel <jean-remy.bancel@telecom-paristech.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Chromagon Project nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL Jean-Rémy Bancel BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Chrome Cache Entry
See http://www.chromium.org/developers/design-documents/network-stack/disk-cache
for design details
"""
from __future__ import absolute_import
import datetime
import struct
import os
import re
from . import cacheAddress
from . import cacheData
from six.moves import range
class CacheEntry():
"""
See /net/disk_cache/disk_format.h for details.
"""
STATE = ["Normal",
"Evicted (data were deleted)",
"Doomed (shit happened)"]
def __init__(self, address):
"""
Parse a Chrome Cache Entry at the given address
"""
self.httpHeader = None
with open(os.path.join(address.path,address.fileSelector), 'rb') as block:
# Going to the right entry
block.seek(8192 + address.blockNumber*address.entrySize)
# Parsing basic fields
self.hash = struct.unpack('I', block.read(4))[0]
self.next = struct.unpack('I', block.read(4))[0]
self.rankingNode = struct.unpack('I', block.read(4))[0]
self.usageCounter = struct.unpack('I', block.read(4))[0]
self.reuseCounter = struct.unpack('I', block.read(4))[0]
self.state = struct.unpack('I', block.read(4))[0]
self.creationTime = datetime.datetime(1601, 1, 1) + \
datetime.timedelta(microseconds=\
struct.unpack('Q', block.read(8))[0])
self.keyLength = struct.unpack('I', block.read(4))[0]
self.keyAddress = struct.unpack('I', block.read(4))[0]
dataSize = []
for _ in range(4):
dataSize.append(struct.unpack('I', block.read(4))[0])
self.data = []
for index in range(4):
addr = struct.unpack('I', block.read(4))[0]
try:
addr = cacheAddress.CacheAddress(addr, address.path)
self.data.append(cacheData.CacheData(addr, dataSize[index],
True))
except cacheAddress.CacheAddressError:
pass
# Find the HTTP header if there is one
for data in self.data:
if data.type == cacheData.CacheData.HTTP_HEADER:
self.httpHeader = data
break
self.flags = struct.unpack('I', block.read(4))[0]
# Skipping pad
block.seek(5*4, 1)
# Reading local key
if self.keyAddress == 0:
self.key = block.read(self.keyLength).decode('ascii')
# Key stored elsewhere
else:
addr = cacheAddress.CacheAddress(self.keyAddress, address.path)
# It is probably an HTTP header
self.key = cacheData.CacheData(addr, self.keyLength, True)
# print("cacheEntry key:%s"%self.key)
# try:
# # Some keys seem to be '_dk_http://example.com https://example.com https://www.example.com/full/url/path'
# # fix those up so the actual URL will work as a hash key
# # in our table if key has whitespace followed by final
# # http[s]://something, substitute, otherwise this leaves
# # it unchanged
# self.key = re.sub(r'^.*\s(https?://\S+)$', r'\1', self.key)
# except TypeError:
# ## Some 'keys' are not bytes or text types. No idea why
# ## not.
# # print(self.key)
# pass
def keyToStr(self):
"""
Since the key can be a string or a CacheData object, this function is an
utility to display the content of the key whatever type is it.
"""
if self.keyAddress == 0:
return self.key
else:
return self.key.data().decode('utf-8')
def __str__(self):
string = "Hash: 0x%08x" % self.hash + '\n'
if self.next != 0:
string += "Next: 0x%08x" % self.next + '\n'
string += "Usage Counter: %d" % self.usageCounter + '\n'\
"Reuse Counter: %d" % self.reuseCounter + '\n'\
"Creation Time: %s" % self.creationTime + '\n'
if self.keyAddress != 0:
string += "Key Address: 0x%08x" % self.keyAddress + '\n'
string += "Key: %s" % self.key + '\n'
if self.flags != 0:
string += "Flags: 0x%08x" % self.flags + '\n'
string += "State: %s" % CacheEntry.STATE[self.state]
for data in self.data:
string += "\nData (%d bytes) at 0x%08x : %s" % (data.size,
data.address.addr,
data)
return string

View file

@ -0,0 +1,126 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2012, Jean-Rémy Bancel <jean-remy.bancel@telecom-paristech.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Chromagon Project nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL Jean-Rémy Bancel BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Parse the Chrome Cache File
See http://www.chromium.org/developers/design-documents/network-stack/disk-cache
for design details
"""
from __future__ import absolute_import
from __future__ import print_function
import gzip
import os
import struct
import sys
import re
import time
# def do_cprofile(func):
# def profiled_func(*args, **kwargs):
# t=0
# try:
# t = time.time()
# result = func(*args, **kwargs)
# t = time.time() - t
# return result
# finally:
# print("time:%s"%t)
# return profiled_func
try:
from brotli import decompress
except:
# Calibre doesn't include brotli, so use packaged brotlipython
# which is waaaay slower, but pure python.
from calibre_plugins.fanficfare_plugin.brotlidecpy import decompress
#as brotli_decompress
#@do_cprofile
def brotli_decompress(inbuf):
return decompress(inbuf)
from . import SuperFastHash
from .cacheAddress import CacheAddress
from .cacheBlock import CacheBlock
from .cacheData import CacheData
from .cacheEntry import CacheEntry
from ..six.moves import range
from ..six import ensure_binary, ensure_text
class ChromeCache(object):
def __init__(self,path):
self.path = os.path.abspath(path)
self.cacheBlock = CacheBlock(os.path.join(path, "index"))
# Checking type
if self.cacheBlock.type != CacheBlock.INDEX:
raise Exception("Invalid Index File")
def get_cache_entry(self,url):
url = ensure_binary(url,'utf8')
# Compute the key and seeking to it
# print("url:%s"%url)
hash = SuperFastHash.superFastHash(url)
# print("superFastHash:%s"%hash)
key = hash & (self.cacheBlock.tableSize - 1)
with open(os.path.join(self.path, "index"), 'rb') as index:
index.seek(92*4 + key*4)
addr = struct.unpack('I', index.read(4))[0]
# Checking if the address is initialized (i.e. used)
if addr & 0x80000000 == 0:
print("%s is not in the cache" % url, file=sys.stderr)
# Follow the chained list in the bucket
else:
entry = CacheEntry(CacheAddress(addr, path=self.path))
while entry.hash != hash and entry.next != 0:
entry = CacheEntry(CacheAddress(entry.next, path=self.path))
if entry.hash == hash:
return entry
def get_cached_file(self,url):
entry = self.get_cache_entry(url)
if entry:
# entry = self.hash_cache[url]
for i in range(len(entry.data)):
if entry.data[i].type == CacheData.UNKNOWN:
# Extracting data into a file
data = entry.data[i].data()
# print("content-encoding:%s"%entry.httpHeader.headers.get(b'content-encoding',''))
if entry.httpHeader != None and \
b'content-encoding' in entry.httpHeader.headers:
if entry.httpHeader.headers[b'content-encoding'] == b"gzip":
data = gzip.decompress(data)
elif entry.httpHeader.headers[b'content-encoding'] == b"br":
data = brotli_decompress(data)
return data
return None

View file

@ -0,0 +1,19 @@
Copyright (c) 2021 by Sidney Markowitz.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View file

@ -0,0 +1,6 @@
from __future__ import absolute_import
__version__ = "1.0.0"
# noinspection PyUnresolvedReferences
from .decode import brotli_decompress_buffer as decompress

View file

@ -0,0 +1,96 @@
# Copyright 2021 Sidney Markowitz All Rights Reserved.
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
BROTLI_READ_SIZE = 4096
BROTLI_IBUF_SIZE = (2 * BROTLI_READ_SIZE + 32)
BROTLI_IBUF_MASK = (2 * BROTLI_READ_SIZE - 1)
kBitMask = [
0, 1, 3, 7, 15, 31, 63, 127, 255, 511, 1023, 2047, 4095, 8191, 16383, 32767,
65535, 131071, 262143, 524287, 1048575, 2097151, 4194303, 8388607, 16777215
]
class BrotliBitReader:
def __init__(self, input_stream):
self.buf_ = bytearray([0] * BROTLI_IBUF_SIZE)
self.input_ = input_stream # input stream
self.buf_ptr_ = 0 # next input will write here
self.val_ = 0 # pre-fetched bits
self.pos_ = 0 # byte position in stream
self.bit_pos_ = 0 # current bit-reading position in val_
self.bit_end_pos_ = 0 # bit-reading end position from LSB of val_
self.eos_ = 0 # input stream is finished
self.reset()
READ_SIZE = BROTLI_READ_SIZE
IBUF_MASK = BROTLI_IBUF_MASK
def reset(self):
self.buf_ptr_ = 0 # next input will write here
self.val_ = 0 # pre-fetched bits
self.pos_ = 0 # byte position in stream
self.bit_pos_ = 0 # current bit-reading position in val_
self.bit_end_pos_ = 0 # bit-reading end position from LSB of val_
self.eos_ = 0 # input stream is finished
self.read_more_input()
for i in range(0, 4):
self.val_ |= self.buf_[self.pos_] << (8 * i)
self.pos_ += 1
return self.bit_end_pos_ > 0
def read_more_input(self):
""" Fills up the input ringbuffer by calling the input callback.
Does nothing if there are at least 32 bytes present after current position.
Returns 0 if either:
- the input callback returned an error, or
- there is no more input and the position is past the end of the stream.
After encountering the end of the input stream, 32 additional zero bytes are
copied to the ringbuffer, therefore it is safe to call this function after
every 32 bytes of input is read"""
if self.bit_end_pos_ > 256:
return
elif self.eos_:
if self.bit_pos_ > self.bit_end_pos_:
raise Exception('Unexpected end of input %s %s' % (self.bit_pos_, self.bit_end_pos_))
else:
dst = self.buf_ptr_
bytes_read = self.input_.readinto(memoryview(self.buf_)[dst:dst+BROTLI_READ_SIZE])
if bytes_read < 0:
raise Exception('Unexpected end of input')
if bytes_read < BROTLI_READ_SIZE:
self.eos_ = 1
# Store 32 bytes of zero after the stream end
for p in range(0, 32):
self.buf_[dst + bytes_read + p] = 0
if dst == 0:
# Copy the head of the ringbuffer to the slack region
for p in range(0, 32):
self.buf_[(BROTLI_READ_SIZE << 1) + p] = self.buf_[p]
self.buf_ptr_ = BROTLI_READ_SIZE
else:
self.buf_ptr_ = 0
self.bit_end_pos_ += bytes_read << 3
def fill_bit_window(self):
"""Guarantees that there are at least 24 bits in the buffer"""
while self.bit_pos_ >= 8:
self.val_ >>= 8
self.val_ |= self.buf_[self.pos_ & BROTLI_IBUF_MASK] << 24
self.pos_ += 1
self.bit_pos_ -= 8
self.bit_end_pos_ -= 8
def read_bits(self, n_bits):
if 32 - self.bit_pos_ < n_bits:
self.fill_bit_window()
val = ((self.val_ >> self.bit_pos_) & kBitMask[n_bits])
self.bit_pos_ += n_bits
return val

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,243 @@
# Copyright 2021 Sidney Markowitz All Rights Reserved.
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
# Much of the following is copied from content copyright 2013 Google Inc, also distributed under MIT license
class Context:
""" Lookup table to map the previous two bytes to a context id.
There are four different context modeling modes defined here:
CONTEXT_LSB6: context id is the least significant 6 bits of the last byte,
CONTEXT_MSB6: context id is the most significant 6 bits of the last byte,
CONTEXT_UTF8: second-order context model tuned for UTF8-encoded text,
CONTEXT_SIGNED: second-order context model tuned for signed integers.
The context id for the UTF8 context model is calculated as follows. If p1
and p2 are the previous two bytes, we calculate the context as
context = kContextLookup[p1] | kContextLookup[p2 + 256].
If the previous two bytes are ASCII characters (i.e. < 128), this will be
equivalent to
context = 4 * context1(p1) + context2(p2),
where context1 is based on the previous byte in the following way:
0 : non-ASCII control
1 : \t, \n, \r
2 : space
3 : other punctuation
4 : " '
5 : %
6 : ( < [ {
7 : ) > ] }
8 : , ; :
9 : .
10 : =
11 : number
12 : upper-case vowel
13 : upper-case consonant
14 : lower-case vowel
15 : lower-case consonant
and context2 is based on the second last byte:
0 : control, space
1 : punctuation
2 : upper-case letter, number
3 : lower-case letter
If the last byte is ASCII, and the second last byte is not (in a valid UTF8
stream it will be a continuation byte, value between 128 and 191), the
context is the same as if the second last byte was an ASCII control or space.
If the last byte is a UTF8 lead byte (value >= 192), then the next byte will
be a continuation byte and the context id is 2 or 3 depending on the LSB of
the last byte and to a lesser extent on the second last byte if it is ASCII.
If the last byte is a UTF8 continuation byte, the second last byte can be:
- continuation byte: the next byte is probably ASCII or lead byte (assuming
4-byte UTF8 characters are rare) and the context id is 0 or 1.
- lead byte (192 - 207): next byte is ASCII or lead byte, context is 0 or 1
- lead byte (208 - 255): next byte is continuation byte, context is 2 or 3
The possible value combinations of the previous two bytes, the range of
context ids and the type of the next byte is summarized in the table below:
|--------|-----------------------------------------------------------------|
| | Last byte |
| Second |---------------------------------------------------------------|
| last byte | ASCII | cont. byte | lead byte |
| | (0-127) | (128-191) | (192-) |
|=============|===================|=====================|==================|
| ASCII | next: ASCII/lead | not valid | next: cont. |
| (0-127) | context: 4 - 63 | | context: 2 - 3 |
|-------------|-------------------|---------------------|------------------|
| cont. byte | next: ASCII/lead | next: ASCII/lead | next: cont. |
| (128-191) | context: 4 - 63 | context: 0 - 1 | context: 2 - 3 |
|-------------|-------------------|---------------------|------------------|
| lead byte | not valid | next: ASCII/lead | not valid |
| (192-207) | | context: 0 - 1 | |
|-------------|-------------------|---------------------|------------------|
| lead byte | not valid | next: cont. | not valid |
| (208-) | | context: 2 - 3 | |
|-------------|-------------------|---------------------|------------------|
The context id for the signed context mode is calculated as:
context = (kContextLookup[512 + p1] << 3) | kContextLookup[512 + p2].
For any context modeling modes, the context ids can be calculated by |-ing
together two lookups from one table using context model dependent offsets:
context = kContextLookup[offset1 + p1] | kContextLookup[offset2 + p2].
where offset1 and offset2 are dependent on the context mode.
"""
def __init__(self):
pass
CONTEXT_LSB6 = 0
CONTEXT_MSB6 = 1
CONTEXT_UTF8 = 2
CONTEXT_SIGNED = 3
lookup = bytearray([
# CONTEXT_UTF8, last byte
# ASCII range
0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 4, 0, 0, 4, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
8, 12, 16, 12, 12, 20, 12, 16, 24, 28, 12, 12, 32, 12, 36, 12,
44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 32, 32, 24, 40, 28, 12,
12, 48, 52, 52, 52, 48, 52, 52, 52, 48, 52, 52, 52, 52, 52, 48,
52, 52, 52, 52, 52, 48, 52, 52, 52, 52, 52, 24, 12, 28, 12, 12,
12, 56, 60, 60, 60, 56, 60, 60, 60, 56, 60, 60, 60, 60, 60, 56,
60, 60, 60, 60, 60, 56, 60, 60, 60, 60, 60, 24, 12, 28, 12, 0,
# UTF8 continuation byte range
0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
# UTF8 lead byte range
2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3,
2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3,
2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3,
2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3, 2, 3,
# CONTEXT_UTF8 second last byte
# ASCII range
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1,
1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1,
1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 1, 1, 1, 1, 0,
# UTF8 continuation byte range
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
# UTF8 lead byte range
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
# CONTEXT_SIGNED, second last byte
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 7,
# CONTEXT_SIGNED, last byte, same as the above values shifted by 3 bits
0, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24,
24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24,
24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24,
24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24,
32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32,
32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32,
32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32,
32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32,
40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40,
40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40,
40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40,
48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 56,
# CONTEXT_LSB6, last byte
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47,
48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63,
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47,
48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63,
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47,
48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63,
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47,
48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63,
# CONTEXT_MSB6, last byte
0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3,
4, 4, 4, 4, 5, 5, 5, 5, 6, 6, 6, 6, 7, 7, 7, 7,
8, 8, 8, 8, 9, 9, 9, 9, 10, 10, 10, 10, 11, 11, 11, 11,
12, 12, 12, 12, 13, 13, 13, 13, 14, 14, 14, 14, 15, 15, 15, 15,
16, 16, 16, 16, 17, 17, 17, 17, 18, 18, 18, 18, 19, 19, 19, 19,
20, 20, 20, 20, 21, 21, 21, 21, 22, 22, 22, 22, 23, 23, 23, 23,
24, 24, 24, 24, 25, 25, 25, 25, 26, 26, 26, 26, 27, 27, 27, 27,
28, 28, 28, 28, 29, 29, 29, 29, 30, 30, 30, 30, 31, 31, 31, 31,
32, 32, 32, 32, 33, 33, 33, 33, 34, 34, 34, 34, 35, 35, 35, 35,
36, 36, 36, 36, 37, 37, 37, 37, 38, 38, 38, 38, 39, 39, 39, 39,
40, 40, 40, 40, 41, 41, 41, 41, 42, 42, 42, 42, 43, 43, 43, 43,
44, 44, 44, 44, 45, 45, 45, 45, 46, 46, 46, 46, 47, 47, 47, 47,
48, 48, 48, 48, 49, 49, 49, 49, 50, 50, 50, 50, 51, 51, 51, 51,
52, 52, 52, 52, 53, 53, 53, 53, 54, 54, 54, 54, 55, 55, 55, 55,
56, 56, 56, 56, 57, 57, 57, 57, 58, 58, 58, 58, 59, 59, 59, 59,
60, 60, 60, 60, 61, 61, 61, 61, 62, 62, 62, 62, 63, 63, 63, 63,
# CONTEXT_{M,L}SB6, second last byte
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
])
lookupOffsets = [
# CONTEXT_LSB6
1024, 1536,
# CONTEXT_MSB6
1280, 1536,
# CONTEXT_UTF8
0, 256,
# CONTEXT_SIGNED
768, 512
]

View file

@ -0,0 +1,684 @@
# Copyright 2021 Sidney Markowitz All Rights Reserved.
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
from .huffman import HuffmanCode, brotli_build_huffman_table
from .prefix import Prefix, kBlockLengthPrefixCode, kInsertLengthPrefixCode, kCopyLengthPrefixCode
from .bit_reader import BrotliBitReader
from .dictionary import BrotliDictionary
from .context import Context
from .transform import Transform, kNumTransforms
from io import BytesIO
kDefaultCodeLength = 8
kCodeLengthRepeatCode = 16
kNumLiteralCodes = 256
kNumInsertAndCopyCodes = 704
kNumBlockLengthCodes = 26
kLiteralContextBits = 6
kDistanceContextBits = 2
HUFFMAN_TABLE_BITS = 8
HUFFMAN_TABLE_MASK = 0xff
# Maximum possible Huffman table size for an alphabet size of 704, max code length 15 and root table bits 8.
HUFFMAN_MAX_TABLE_SIZE = 1080
CODE_LENGTH_CODES = 18
kCodeLengthCodeOrder = bytearray([1, 2, 3, 4, 0, 5, 17, 6, 16, 7, 8, 9, 10, 11, 12, 13, 14, 15])
NUM_DISTANCE_SHORT_CODES = 16
kDistanceShortCodeIndexOffset = bytearray([3, 2, 1, 0, 3, 3, 3, 3, 3, 3, 2, 2, 2, 2, 2, 2])
kDistanceShortCodeValueOffset = [0, 0, 0, 0, -1, 1, -2, 2, -3, 3, -1, 1, -2, 2, -3, 3]
kMaxHuffmanTableSize = [256, 402, 436, 468, 500, 534, 566, 598, 630, 662, 694, 726, 758, 790, 822, 854, 886, 920, 952,
984, 1016, 1048, 1080]
def decode_window_bits(br):
if br.read_bits(1) == 0:
return 16
n = br.read_bits(3)
if n > 0:
return 17 + n
n = br.read_bits(3)
if n > 0:
return 8 + n
return 17
def decode_var_len_uint8(br):
"""Decodes a number in the range [0..255], by reading 1 - 11 bits"""
if br.read_bits(1):
nbits = br.read_bits(3)
if nbits == 0:
return 1
return br.read_bits(nbits) + (1 << nbits)
return 0
class MetaBlockLength:
def __init__(self):
self.meta_block_length = 0
self.input_end = 0
self.is_uncompressed = 0
self.is_metadata = False
def decode_meta_block_length(br):
out = MetaBlockLength()
out.input_end = br.read_bits(1)
if out.input_end and br.read_bits(1):
return out
size_nibbles = br.read_bits(2) + 4
if size_nibbles == 7:
out.is_metadata = True
if br.read_bits(1) != 0:
raise Exception('Invalid reserved bit')
size_bytes = br.read_bits(2)
if size_bytes == 0:
return out
for i in range(0, size_bytes):
next_byte = br.read_bits(8)
if i + 1 == size_bytes and size_bytes > 1 and next_byte == 0:
raise Exception('Invalid size byte')
out.meta_block_length |= next_byte << (i * 8)
else:
for i in range(0, size_nibbles):
next_nibble = br.read_bits(4)
if i + 1 == size_nibbles and size_nibbles > 4 and next_nibble == 0:
raise Exception('Invalid size nibble')
out.meta_block_length |= next_nibble << (i * 4)
out.meta_block_length += 1
if not out.input_end and not out.is_metadata:
out.is_uncompressed = br.read_bits(1)
return out
def read_symbol(table, index, br):
"""Decodes the next Huffman code from bit-stream."""
br.fill_bit_window()
index += (br.val_ >> br.bit_pos_) & HUFFMAN_TABLE_MASK
nbits = table[index].bits - HUFFMAN_TABLE_BITS
if nbits > 0:
br.bit_pos_ += HUFFMAN_TABLE_BITS
index += table[index].value
index += (br.val_ >> br.bit_pos_) & ((1 << nbits) - 1)
br.bit_pos_ += table[index].bits
return table[index].value
def read_huffman_code_lengths(code_length_code_lengths, num_symbols, code_lengths, br):
symbol = 0
prev_code_len = kDefaultCodeLength
repeat = 0
repeat_code_len = 0
space = 32768
table = [HuffmanCode(0, 0) for _ in range(0, 32)]
brotli_build_huffman_table(table, 0, 5, code_length_code_lengths, CODE_LENGTH_CODES)
while (symbol < num_symbols) and (space > 0):
p = 0
br.read_more_input()
br.fill_bit_window()
p += (br.val_ >> br.bit_pos_) & 31
br.bit_pos_ += table[p].bits
code_len = table[p].value & 0xff
if code_len < kCodeLengthRepeatCode:
repeat = 0
code_lengths[symbol] = code_len
symbol += 1
if code_len != 0:
prev_code_len = code_len
space -= 32768 >> code_len
else:
extra_bits = code_len - 14
new_len = 0
if code_len == kCodeLengthRepeatCode:
new_len = prev_code_len
if repeat_code_len != new_len:
repeat = 0
repeat_code_len = new_len
old_repeat = repeat
if repeat > 0:
repeat -= 2
repeat <<= extra_bits
repeat += br.read_bits(extra_bits) + 3
repeat_delta = repeat - old_repeat
if symbol + repeat_delta > num_symbols:
raise Exception('[read_huffman_code_lengths] symbol + repeat_delta > num_symbols')
for x in range(0, repeat_delta):
code_lengths[symbol + x] = repeat_code_len
symbol += repeat_delta
if repeat_code_len != 0:
space -= repeat_delta << (15 - repeat_code_len)
if space != 0:
raise Exception('[read_huffman_code_lengths] space = %s' % space)
for i in range(symbol, num_symbols):
code_lengths[i] = 0
def read_huffman_code(alphabet_size, tables, table, br):
code_lengths = bytearray([0] * alphabet_size)
br.read_more_input()
# simple_code_or_skip is used as follows:
# 1 for simple code
# 0 for no skipping, 2 skips 2 code lengths, 3 skips 3 code lengths
simple_code_or_skip = br.read_bits(2)
if simple_code_or_skip == 1:
# Read symbols, codes & code lengths directly.
max_bits_counter = alphabet_size - 1
max_bits = 0
symbols = [0, 0, 0, 0]
num_symbols = br.read_bits(2) + 1
while max_bits_counter:
max_bits_counter >>= 1
max_bits += 1
for i in range(0, num_symbols):
symbols[i] = br.read_bits(max_bits) % alphabet_size
code_lengths[symbols[i]] = 2
code_lengths[symbols[0]] = 1
if num_symbols == 2:
if symbols[0] == symbols[1]:
raise Exception('[read_huffman_code] invalid symbols')
code_lengths[symbols[1]] = 1
elif num_symbols == 3:
if symbols[0] == symbols[1] or symbols[0] == symbols[2] or symbols[1] == symbols[2]:
raise Exception('[read_huffman_code] invalid symbols')
elif num_symbols == 4:
if symbols[0] == symbols[1] or symbols[0] == symbols[2] or symbols[0] == symbols[3] or symbols[1] == \
symbols[2] or symbols[1] == symbols[3] or symbols[2] == symbols[3]:
raise Exception('[read_huffman_code] invalid symbols')
if br.read_bits(1):
code_lengths[symbols[2]] = 3
code_lengths[symbols[3]] = 3
else:
code_lengths[symbols[0]] = 2
else: # Decode Huffman-coded code lengths
code_length_code_lengths = bytearray([0] * CODE_LENGTH_CODES)
space = 32
num_codes = 0
# Static Huffman code for the code length code lengths
huff = [HuffmanCode(2, 0), HuffmanCode(2, 4), HuffmanCode(2, 3), HuffmanCode(3, 2),
HuffmanCode(2, 0), HuffmanCode(2, 4), HuffmanCode(2, 3), HuffmanCode(4, 1),
HuffmanCode(2, 0), HuffmanCode(2, 4), HuffmanCode(2, 3), HuffmanCode(3, 2),
HuffmanCode(2, 0), HuffmanCode(2, 4), HuffmanCode(2, 3), HuffmanCode(4, 5)]
for i in range(simple_code_or_skip, CODE_LENGTH_CODES):
if space <= 0:
break
code_len_idx = kCodeLengthCodeOrder[i]
p = 0
br.fill_bit_window()
p += (br.val_ >> br.bit_pos_) & 15
br.bit_pos_ += huff[p].bits
v = huff[p].value
code_length_code_lengths[code_len_idx] = v
if v != 0:
space -= (32 >> v)
num_codes += 1
if num_codes != 1 and space != 0:
raise Exception('[read_huffman_code] invalid num_codes or space')
read_huffman_code_lengths(code_length_code_lengths, alphabet_size, code_lengths, br)
table_size = brotli_build_huffman_table(tables, table, HUFFMAN_TABLE_BITS, code_lengths, alphabet_size)
if table_size == 0:
raise Exception('[read_huffman_code] BuildHuffmanTable failed: ')
return table_size
def read_block_length(table, index, br):
code = read_symbol(table, index, br)
nbits = kBlockLengthPrefixCode[code].nbits
return kBlockLengthPrefixCode[code].offset + br.read_bits(nbits)
def translate_short_codes(code, ringbuffer, index):
if code < NUM_DISTANCE_SHORT_CODES:
index += kDistanceShortCodeIndexOffset[code]
index &= 3
val = ringbuffer[index] + kDistanceShortCodeValueOffset[code]
else:
val = code - NUM_DISTANCE_SHORT_CODES + 1
return val
def move_to_front(v, index):
v.insert(0, v.pop(index))
def inverse_move_to_front_transform(v, v_len):
mtf = list(range(0, 256))
for i in range(0, v_len):
index = v[i]
v[i] = mtf[index]
if index:
move_to_front(mtf, index)
# Contains a collection of huffman trees with the same alphabet size.
class HuffmanTreeGroup:
def __init__(self, alphabet_size, num_huff_trees):
self.alphabet_size = alphabet_size
self.num_huff_trees = num_huff_trees
self.codes = [0] * (num_huff_trees + num_huff_trees * kMaxHuffmanTableSize[(alphabet_size + 31) >> 5])
self.huff_trees = [0] * num_huff_trees
def decode(self, br):
next_entry = 0
for i in range(0, self.num_huff_trees):
self.huff_trees[i] = next_entry
table_size = read_huffman_code(self.alphabet_size, self.codes, next_entry, br)
next_entry += table_size
class DecodeContextMap:
def __init__(self, context_map_size, br):
max_run_length_prefix = 0
br.read_more_input()
self.num_huff_trees = decode_var_len_uint8(br) + 1
self.context_map = bytearray([0] * context_map_size)
if self.num_huff_trees <= 1:
return
use_rle_for_zeros = br.read_bits(1)
if use_rle_for_zeros:
max_run_length_prefix = br.read_bits(4) + 1
table = [HuffmanCode(0, 0) for _ in range(0, HUFFMAN_MAX_TABLE_SIZE)]
read_huffman_code(self.num_huff_trees + max_run_length_prefix, table, 0, br)
i = 0
while i < context_map_size:
br.read_more_input()
code = read_symbol(table, 0, br)
if code == 0:
self.context_map[i] = 0
i += 1
elif code <= max_run_length_prefix:
for reps in range((1 << code) + br.read_bits(code), 0, -1):
if i >= context_map_size:
raise Exception('[DecodeContextMap] i >= context_map_size')
self.context_map[i] = 0
i += 1
else:
self.context_map[i] = code - max_run_length_prefix
i += 1
if br.read_bits(1):
inverse_move_to_front_transform(self.context_map, context_map_size)
def decode_block_type(max_block_type, trees, tree_type, block_types, ring_buffers, indexes, br):
ringbuffer = tree_type * 2
index = tree_type
type_code = read_symbol(trees, tree_type * HUFFMAN_MAX_TABLE_SIZE, br)
if type_code == 0:
block_type = ring_buffers[ringbuffer + (indexes[index] & 1)]
elif type_code == 1:
block_type = ring_buffers[ringbuffer + ((indexes[index] - 1) & 1)] + 1
else:
block_type = type_code - 2
if block_type >= max_block_type:
block_type -= max_block_type
block_types[tree_type] = block_type
ring_buffers[ringbuffer + (indexes[index] & 1)] = block_type
indexes[index] += 1
def copy_uncompressed_block_to_output(output, length, pos, ringbuffer, ringbuffer_mask, br):
rb_size = ringbuffer_mask + 1
rb_pos = pos & ringbuffer_mask
br_pos = br.pos_ & BrotliBitReader.IBUF_MASK
# For short lengths copy byte-by-byte
if (length < 8) or (br.bit_pos_ + (length << 3) < br.bit_end_pos_):
for i in range(0, length):
br.read_more_input()
ringbuffer[rb_pos] = br.read_bits(8)
rb_pos += 1
if rb_pos == rb_size:
output.write(ringbuffer[:rb_size])
rb_pos = 0
return
if br.bit_end_pos_ < 32:
raise Exception('[copy_uncompressed_block_to_output] br.bit_end_pos_ < 32')
# Copy remaining 0-4 bytes from br.val_ to ringbuffer.
while br.bit_pos_ < 32:
ringbuffer[rb_pos] = (br.val_ >> br.bit_pos_)
br.bit_pos_ += 8
rb_pos += 1
length -= 1
# Copy remaining bytes from br.buf_ to ringbuffer.
num_bytes = (br.bit_end_pos_ - br.bit_pos_) >> 3
if br_pos + num_bytes > BrotliBitReader.IBUF_MASK:
tail = BrotliBitReader.IBUF_MASK + 1 - br_pos
for x in range(0, tail):
ringbuffer[rb_pos + x] = br.buf_[br_pos + x]
num_bytes -= tail
rb_pos += tail
length -= tail
br_pos = 0
for x in range(0, num_bytes):
ringbuffer[rb_pos + x] = br.buf_[br_pos + x]
rb_pos += num_bytes
length -= num_bytes
# If we wrote past the logical end of the ringbuffer, copy the tail of the
# ringbuffer to its beginning and flush the ringbuffer to the output.
if rb_pos >= rb_size:
output.write(ringbuffer[:rb_size])
rb_pos -= rb_size
for x in range(0, rb_pos):
ringbuffer[x] = ringbuffer[rb_size + x]
# If we have more to copy than the remaining size of the ringbuffer, then we first
# fill the ringbuffer from the input and then flush the ringbuffer to the output
while rb_pos + length >= rb_size:
num_bytes = rb_size - rb_pos
if br.input_.readinto(memoryview(ringbuffer)[rb_pos:rb_pos+num_bytes]) < num_bytes:
raise Exception('[copy_uncompressed_block_to_output] not enough bytes')
output.write(ringbuffer[:rb_size])
length -= num_bytes
rb_pos = 0
# Copy straight from the input onto the ringbuffer. The ringbuffer will be flushed to the output at a later time.
if br.input_.readinto(memoryview(ringbuffer)[rb_pos:rb_pos+length]) < length:
raise Exception('[copy_uncompressed_block_to_output] not enough bytes')
# Restore the state of the bit reader.
br.reset()
def jump_to_byte_boundary(br):
"""Advances the bit reader position to the next byte boundary and verifies that any skipped bits are set to zero"""
new_bit_pos = (br.bit_pos_ + 7) & ~7
pad_bits = br.read_bits(new_bit_pos - br.bit_pos_)
return pad_bits == 0
def brotli_decompressed_size(input_buffer):
with BytesIO(input_buffer) as input_stream:
br = BrotliBitReader(input_stream)
decode_window_bits(br)
out = decode_meta_block_length(br)
return out.meta_block_length
def brotli_decompress_buffer(input_buffer):
with BytesIO(input_buffer) as input_stream:
with BytesIO() as output_stream:
brotli_decompress(input_stream, output_stream)
return output_stream.getvalue()
def brotli_decompress(input_stream, output_stream):
pos = 0
input_end = 0
max_distance = 0
# This ring buffer holds a few past copy distances that will be used by some special distance codes.
dist_rb = [16, 15, 11, 4]
dist_rb_idx = 0
hgroup = [HuffmanTreeGroup(0, 0), HuffmanTreeGroup(0, 0), HuffmanTreeGroup(0, 0)]
# We need the slack region for the following reasons:
# - always doing two 8-byte copies for fast backward copying
# - transforms
# - flushing the input ringbuffer when decoding uncompressed blocks
_ring_buffer_write_ahead_slack = 128 + BrotliBitReader.READ_SIZE
br = BrotliBitReader(input_stream)
# Decode window size.
window_bits = decode_window_bits(br)
max_backward_distance = (1 << window_bits) - 16
ringbuffer_size = 1 << window_bits
ringbuffer_mask = ringbuffer_size - 1
ringbuffer = bytearray(
[0] * (ringbuffer_size + _ring_buffer_write_ahead_slack + BrotliDictionary.maxDictionaryWordLength))
ringbuffer_end = ringbuffer_size
block_type_trees = [HuffmanCode(0, 0) for _ in range(0, 3 * HUFFMAN_MAX_TABLE_SIZE)]
block_len_trees = [HuffmanCode(0, 0) for _ in range(0, 3 * HUFFMAN_MAX_TABLE_SIZE)]
while not input_end:
block_length = [1 << 28, 1 << 28, 1 << 28]
block_type = [0] * 3
num_block_types = [1] * 3
block_type_rb = [0, 1, 0, 1, 0, 1]
block_type_rb_index = [0] * 3
for i in range(0, 3):
hgroup[i].codes = None
hgroup[i].huff_trees = None
br.read_more_input()
_out = decode_meta_block_length(br)
meta_block_remaining_len = _out.meta_block_length
input_end = _out.input_end
is_uncompressed = _out.is_uncompressed
if _out.is_metadata:
jump_to_byte_boundary(br)
while meta_block_remaining_len > 0:
br.read_more_input()
# Read one byte and ignore it
br.read_bits(8)
meta_block_remaining_len -= 1
continue
if meta_block_remaining_len == 0:
continue
if is_uncompressed:
br.bit_pos_ = (br.bit_pos_ + 7) & ~7
copy_uncompressed_block_to_output(output_stream, meta_block_remaining_len, pos, ringbuffer,
ringbuffer_mask, br)
pos += meta_block_remaining_len
continue
for i in range(0, 3):
num_block_types[i] = decode_var_len_uint8(br) + 1
if num_block_types[i] >= 2:
read_huffman_code(num_block_types[i] + 2, block_type_trees, i * HUFFMAN_MAX_TABLE_SIZE, br)
read_huffman_code(kNumBlockLengthCodes, block_len_trees, i * HUFFMAN_MAX_TABLE_SIZE, br)
block_length[i] = read_block_length(block_len_trees, i * HUFFMAN_MAX_TABLE_SIZE, br)
block_type_rb_index[i] = 1
br.read_more_input()
distance_postfix_bits = br.read_bits(2)
num_direct_distance_codes = NUM_DISTANCE_SHORT_CODES + (br.read_bits(4) << distance_postfix_bits)
distance_postfix_mask = (1 << distance_postfix_bits) - 1
num_distance_codes = (num_direct_distance_codes + (48 << distance_postfix_bits))
context_modes = bytearray([0] * num_block_types[0])
for i in range(0, num_block_types[0]):
br.read_more_input()
context_modes[i] = (br.read_bits(2) << 1)
_o1 = DecodeContextMap(num_block_types[0] << kLiteralContextBits, br)
num_literal_huff_trees = _o1.num_huff_trees
context_map = _o1.context_map
_o2 = DecodeContextMap(num_block_types[2] << kDistanceContextBits, br)
num_dist_huff_trees = _o2.num_huff_trees
dist_context_map = _o2.context_map
hgroup[0] = HuffmanTreeGroup(kNumLiteralCodes, num_literal_huff_trees)
hgroup[1] = HuffmanTreeGroup(kNumInsertAndCopyCodes, num_block_types[1])
hgroup[2] = HuffmanTreeGroup(num_distance_codes, num_dist_huff_trees)
for i in range(0, 3):
hgroup[i].decode(br)
context_map_slice = 0
dist_context_map_slice = 0
context_mode = context_modes[block_type[0]]
context_lookup_offset1 = Context.lookupOffsets[context_mode]
context_lookup_offset2 = Context.lookupOffsets[context_mode + 1]
huff_tree_command = hgroup[1].huff_trees[0]
while meta_block_remaining_len > 0:
br.read_more_input()
if block_length[1] == 0:
decode_block_type(num_block_types[1], block_type_trees, 1, block_type, block_type_rb,
block_type_rb_index, br)
block_length[1] = read_block_length(block_len_trees, HUFFMAN_MAX_TABLE_SIZE, br)
huff_tree_command = hgroup[1].huff_trees[block_type[1]]
block_length[1] -= 1
cmd_code = read_symbol(hgroup[1].codes, huff_tree_command, br)
range_idx = cmd_code >> 6
distance_code = 0
if range_idx >= 2:
range_idx -= 2
distance_code = -1
insert_code = Prefix.kInsertRangeLut[range_idx] + ((cmd_code >> 3) & 7)
copy_code = Prefix.kCopyRangeLut[range_idx] + (cmd_code & 7)
insert_length = kInsertLengthPrefixCode[insert_code].offset + br.read_bits(
kInsertLengthPrefixCode[insert_code].nbits)
copy_length = kCopyLengthPrefixCode[copy_code].offset + br.read_bits(
kCopyLengthPrefixCode[copy_code].nbits)
prev_byte1 = ringbuffer[pos - 1 & ringbuffer_mask]
prev_byte2 = ringbuffer[pos - 2 & ringbuffer_mask]
for j in range(0, insert_length):
br.read_more_input()
if block_length[0] == 0:
decode_block_type(num_block_types[0], block_type_trees, 0, block_type, block_type_rb,
block_type_rb_index, br)
block_length[0] = read_block_length(block_len_trees, 0, br)
context_offset = block_type[0] << kLiteralContextBits
context_map_slice = context_offset
context_mode = context_modes[block_type[0]]
context_lookup_offset1 = Context.lookupOffsets[context_mode]
context_lookup_offset2 = Context.lookupOffsets[context_mode + 1]
context = Context.lookup[context_lookup_offset1 + prev_byte1] | Context.lookup[
context_lookup_offset2 + prev_byte2]
literal_huff_tree_index = context_map[context_map_slice + context]
block_length[0] -= 1
prev_byte2 = prev_byte1
prev_byte1 = read_symbol(hgroup[0].codes, hgroup[0].huff_trees[literal_huff_tree_index], br)
ringbuffer[pos & ringbuffer_mask] = prev_byte1
if (pos & ringbuffer_mask) == ringbuffer_mask:
output_stream.write(ringbuffer[:ringbuffer_size])
pos += 1
meta_block_remaining_len -= insert_length
if meta_block_remaining_len <= 0:
break
if distance_code < 0:
br.read_more_input()
if block_length[2] == 0:
decode_block_type(num_block_types[2], block_type_trees, 2, block_type, block_type_rb,
block_type_rb_index, br)
block_length[2] = read_block_length(block_len_trees, 2 * HUFFMAN_MAX_TABLE_SIZE, br)
dist_context_offset = block_type[2] << kDistanceContextBits
dist_context_map_slice = dist_context_offset
block_length[2] -= 1
context = (3 if copy_length > 4 else copy_length - 2) & 0xff
dist_huff_tree_index = dist_context_map[dist_context_map_slice + context]
distance_code = read_symbol(hgroup[2].codes, hgroup[2].huff_trees[dist_huff_tree_index], br)
if distance_code >= num_direct_distance_codes:
distance_code -= num_direct_distance_codes
postfix = distance_code & distance_postfix_mask
distance_code >>= distance_postfix_bits
nbits = (distance_code >> 1) + 1
offset = ((2 + (distance_code & 1)) << nbits) - 4
distance_code = num_direct_distance_codes + (
(offset + br.read_bits(nbits)) << distance_postfix_bits) + postfix
# Convert distance code to actual distance by possibly looking up past distances from the ringbuffer
distance = translate_short_codes(distance_code, dist_rb, dist_rb_idx)
if distance < 0:
raise Exception('[brotli_decompress] invalid distance')
if pos < max_backward_distance and max_distance != max_backward_distance:
max_distance = pos
else:
max_distance = max_backward_distance
copy_dst = pos & ringbuffer_mask
if distance > max_distance:
if BrotliDictionary.minDictionaryWordLength <= copy_length <= BrotliDictionary.maxDictionaryWordLength:
offset = BrotliDictionary.offsetsByLength[copy_length]
word_id = distance - max_distance - 1
shift = BrotliDictionary.sizeBitsByLength[copy_length]
mask = (1 << shift) - 1
word_idx = word_id & mask
transform_idx = word_id >> shift
offset += word_idx * copy_length
if transform_idx < kNumTransforms:
length = Transform.transformDictionaryWord(ringbuffer, copy_dst, offset, copy_length,
transform_idx)
copy_dst += length
pos += length
meta_block_remaining_len -= length
if copy_dst >= ringbuffer_end:
output_stream.write(ringbuffer[:ringbuffer_size])
for _x in range(0, copy_dst - ringbuffer_end):
ringbuffer[_x] = ringbuffer[ringbuffer_end + _x]
else:
raise Exception("Invalid backward reference. pos: %s distance: %s len: %s bytes left: %s" % (
pos, distance, copy_length, meta_block_remaining_len))
else:
raise Exception("Invalid backward reference. pos: %s distance: %s len: %s bytes left: %s" % (
pos, distance, copy_length, meta_block_remaining_len))
else:
if distance_code > 0:
dist_rb[dist_rb_idx & 3] = distance
dist_rb_idx += 1
if copy_length > meta_block_remaining_len:
raise Exception("Invalid backward reference. pos: %s distance: %s len: %s bytes left: %s" % (
pos, distance, copy_length, meta_block_remaining_len))
for j in range(0, copy_length):
ringbuffer[pos & ringbuffer_mask] = ringbuffer[(pos - distance) & ringbuffer_mask]
if (pos & ringbuffer_mask) == ringbuffer_mask:
output_stream.write(ringbuffer[:ringbuffer_size])
pos += 1
meta_block_remaining_len -= 1
# Protect pos from overflow, wrap it around at every GB of input data
pos &= 0x3fffffff
output_stream.write(ringbuffer[:pos & ringbuffer_mask])

View file

@ -0,0 +1,27 @@
# Copyright 2021 Sidney Markowitz All Rights Reserved.
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
import pkgutil
class BrotliDictionary:
def __init__(self):
pass
offsetsByLength = [
0, 0, 0, 0, 0, 4096, 9216, 21504, 35840, 44032,
53248, 63488, 74752, 87040, 93696, 100864, 104704, 106752, 108928, 113536,
115968, 118528, 119872, 121280, 122016
]
sizeBitsByLength = bytearray([
0, 0, 0, 0, 10, 10, 11, 11, 10, 10,
10, 10, 10, 9, 9, 8, 7, 7, 8, 7,
7, 6, 6, 5, 5
])
minDictionaryWordLength = 4
maxDictionaryWordLength = 24
dictionary = pkgutil.get_data('brotlidecpy', 'brotli-dict')

View file

@ -0,0 +1,121 @@
# Copyright 2021 Sidney Markowitz All Rights Reserved.
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
MAX_LENGTH = 15
def _get_next_key(key, length):
"""Returns reverse(reverse(key, len) + 1, len), where reverse(key, len) is the
bit-wise reversal of the length least significant bits of key"""
step = 1 << (length - 1)
while key & step:
step >>= 1
return (key & (step - 1)) + step
def _replicate_value(table, i, step, end, code):
"""Stores code in table[0], table[step], table[2*step], ..., table[end] Assumes end is integer multiple of step"""
for index in range(i+end-step, i - step, -step):
table[index] = HuffmanCode(code.bits, code.value)
def _next_table_bit_size(count, length, root_bits):
"""Returns the table width of the next 2nd level table. count is the histogram of bit lengths for the
remaining symbols, len is the code length of the next processed symbol"""
left = 1 << (length - root_bits)
while length < MAX_LENGTH:
left -= count[length]
if left <= 0:
break
length += 1
left <<= 1
return length - root_bits
class HuffmanCode:
def __init__(self, bits, value):
self.bits = bits # number of bits used for this symbol
self.value = value # symbol value or table offset
def brotli_build_huffman_table(root_table, table, root_bits, code_lengths, code_lengths_size):
start_table = table
# Local variables used
# code current table entry
# len; current code length
# symbol; symbol index in original or sorted table
# key; reversed prefix code
# step; step size to replicate values in current table
# low; low bits for current root entry
# mask; mask for low bits
# table_bits; key length of current table
# table_size; size of current table
# total_size; sum of root table size and 2nd level table sizes
# sorted_symbols; symbols sorted by code length
count = [0] * (MAX_LENGTH + 1) # number of codes of each length
offset = [0] * (MAX_LENGTH + 1) # offsets in sorted table for each length
sorted_symbols = [0] * code_lengths_size
# build histogram of code lengths
for symbol in range(0, code_lengths_size):
count[code_lengths[symbol]] += 1
# generate offsets into sorted symbol table by code length
offset[1] = 0
for length in range(1, MAX_LENGTH):
offset[length + 1] = offset[length] + count[length]
# sort symbols by length, by symbol order within each length
for symbol in range(0, code_lengths_size):
length = code_lengths[symbol]
if length != 0:
sorted_symbols[offset[length]] = symbol
offset[length] += 1
table_bits = root_bits
table_size = 1 << table_bits
total_size = table_size
# special case code with only one value
if offset[MAX_LENGTH] == 1:
for key in range(0, total_size):
root_table[table + key] = HuffmanCode(0, sorted_symbols[0] & 0xffff)
return total_size
# fill in root table
key = 0
symbol = 0
step = 2
for length in range(1, root_bits+1):
while count[length] > 0:
code = HuffmanCode(length & 0xff, sorted_symbols[symbol] & 0xffff)
symbol += 1
_replicate_value(root_table, table + key, step, table_size, code)
key = _get_next_key(key, length)
count[length] -= 1
step <<= 1
# fill in 2nd level tables and add pointers to root table
mask = total_size - 1
low = -1
step = 2
for length in range(root_bits + 1, MAX_LENGTH+1):
while count[length] > 0:
if (key & mask) != low:
table += table_size
table_bits = _next_table_bit_size(count, length, root_bits)
table_size = 1 << table_bits
total_size += table_size
low = key & mask
root_table[start_table + low] = HuffmanCode((table_bits + root_bits) & 0xff,
((table - start_table) - low) & 0xffff)
code = HuffmanCode((length - root_bits) & 0xff, sorted_symbols[symbol] & 0xffff)
symbol += 1
_replicate_value(root_table, table + (key >> root_bits), step, table_size, code)
key = _get_next_key(key, length)
count[length] -= 1
step <<= 1
return total_size

View file

@ -0,0 +1,39 @@
# Copyright 2021 Sidney Markowitz All Rights Reserved.
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
class Prefix:
"""Represents the range of values belonging to a prefix code: [offset, offset + 2^nbits)"""
def __init__(self, offset, nbits):
self.offset = offset
self.nbits = nbits
kInsertRangeLut = [0, 0, 8, 8, 0, 16, 8, 16, 16]
kCopyRangeLut = [0, 8, 0, 8, 16, 0, 16, 8, 16]
kBlockLengthPrefixCode = [
Prefix(1, 2), Prefix(5, 2), Prefix(9, 2), Prefix(13, 2),
Prefix(17, 3), Prefix(25, 3), Prefix(33, 3), Prefix(41, 3),
Prefix(49, 4), Prefix(65, 4), Prefix(81, 4), Prefix(97, 4),
Prefix(113, 5), Prefix(145, 5), Prefix(177, 5), Prefix(209, 5),
Prefix(241, 6), Prefix(305, 6), Prefix(369, 7), Prefix(497, 8),
Prefix(753, 9), Prefix(1265, 10), Prefix(2289, 11), Prefix(4337, 12),
Prefix(8433, 13), Prefix(16625, 24)]
kInsertLengthPrefixCode = [
Prefix(0, 0), Prefix(1, 0), Prefix(2, 0), Prefix(3, 0),
Prefix(4, 0), Prefix(5, 0), Prefix(6, 1), Prefix(8, 1),
Prefix(10, 2), Prefix(14, 2), Prefix(18, 3), Prefix(26, 3),
Prefix(34, 4), Prefix(50, 4), Prefix(66, 5), Prefix(98, 5),
Prefix(130, 6), Prefix(194, 7), Prefix(322, 8), Prefix(578, 9),
Prefix(1090, 10), Prefix(2114, 12), Prefix(6210, 14), Prefix(22594, 24)]
kCopyLengthPrefixCode = [
Prefix(2, 0), Prefix(3, 0), Prefix(4, 0), Prefix(5, 0),
Prefix(6, 0), Prefix(7, 0), Prefix(8, 0), Prefix(9, 0),
Prefix(10, 1), Prefix(12, 1), Prefix(14, 2), Prefix(18, 2),
Prefix(22, 3), Prefix(30, 3), Prefix(38, 4), Prefix(54, 4),
Prefix(70, 5), Prefix(102, 5), Prefix(134, 6), Prefix(198, 7),
Prefix(326, 8), Prefix(582, 9), Prefix(1094, 10), Prefix(2118, 24)]

View file

@ -0,0 +1,222 @@
# Copyright 2021 Sidney Markowitz All Rights Reserved.
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
from .dictionary import BrotliDictionary
"""
Transformations on dictionary words
"""
class Transform:
def __init__(self, prefix, transform, suffix):
self.prefix = bytearray(prefix)
self.transform = transform
self.suffix = bytearray(suffix)
@staticmethod
def transformDictionaryWord(dst, idx, word, length, transform):
prefix = kTransforms[transform].prefix
suffix = kTransforms[transform].suffix
t = kTransforms[transform].transform
skip = t < (0 if kOmitFirst1 else (t - (kOmitFirst1 - 1)))
start_idx = idx
if skip > length:
skip = length
prefix_pos = 0
while prefix_pos < len(prefix):
dst[idx] = prefix[prefix_pos]
idx += 1
prefix_pos += 1
word += skip
length -= skip
if t <= kOmitLast9:
length -= t
for i in range(0, length):
dst[idx] = BrotliDictionary.dictionary[word + i]
idx += 1
uppercase = idx - length
if t == kUppercaseFirst:
_to_upper_case(dst, uppercase)
elif t == kUppercaseAll:
while length > 0:
step = _to_upper_case(dst, uppercase)
uppercase += step
length -= step
suffix_pos = 0
while suffix_pos < len(suffix):
dst[idx] = suffix[suffix_pos]
idx += 1
suffix_pos += 1
return idx - start_idx
kIdentity = 0
kOmitLast1 = 1
kOmitLast2 = 2
kOmitLast3 = 3
kOmitLast4 = 4
kOmitLast5 = 5
kOmitLast6 = 6
kOmitLast7 = 7
kOmitLast8 = 8
kOmitLast9 = 9
kUppercaseFirst = 10
kUppercaseAll = 11
kOmitFirst1 = 12
kOmitFirst2 = 13
kOmitFirst3 = 14
kOmitFirst4 = 15
kOmitFirst5 = 16
kOmitFirst6 = 17
kOmitFirst7 = 18
kOmitFirst8 = 19
kOmitFirst9 = 20
kTransforms = [
Transform(b"", kIdentity, b""),
Transform(b"", kIdentity, b" "),
Transform(b" ", kIdentity, b" "),
Transform(b"", kOmitFirst1, b""),
Transform(b"", kUppercaseFirst, b" "),
Transform(b"", kIdentity, b" the "),
Transform(b" ", kIdentity, b""),
Transform(b"s ", kIdentity, b" "),
Transform(b"", kIdentity, b" of "),
Transform(b"", kUppercaseFirst, b""),
Transform(b"", kIdentity, b" and "),
Transform(b"", kOmitFirst2, b""),
Transform(b"", kOmitLast1, b""),
Transform(b", ", kIdentity, b" "),
Transform(b"", kIdentity, b", "),
Transform(b" ", kUppercaseFirst, b" "),
Transform(b"", kIdentity, b" in "),
Transform(b"", kIdentity, b" to "),
Transform(b"e ", kIdentity, b" "),
Transform(b"", kIdentity, b"\""),
Transform(b"", kIdentity, b"."),
Transform(b"", kIdentity, b"\">"),
Transform(b"", kIdentity, b"\n"),
Transform(b"", kOmitLast3, b""),
Transform(b"", kIdentity, b"]"),
Transform(b"", kIdentity, b" for "),
Transform(b"", kOmitFirst3, b""),
Transform(b"", kOmitLast2, b""),
Transform(b"", kIdentity, b" a "),
Transform(b"", kIdentity, b" that "),
Transform(b" ", kUppercaseFirst, b""),
Transform(b"", kIdentity, b". "),
Transform(b".", kIdentity, b""),
Transform(b" ", kIdentity, b", "),
Transform(b"", kOmitFirst4, b""),
Transform(b"", kIdentity, b" with "),
Transform(b"", kIdentity, b"'"),
Transform(b"", kIdentity, b" from "),
Transform(b"", kIdentity, b" by "),
Transform(b"", kOmitFirst5, b""),
Transform(b"", kOmitFirst6, b""),
Transform(b" the ", kIdentity, b""),
Transform(b"", kOmitLast4, b""),
Transform(b"", kIdentity, b". The "),
Transform(b"", kUppercaseAll, b""),
Transform(b"", kIdentity, b" on "),
Transform(b"", kIdentity, b" as "),
Transform(b"", kIdentity, b" is "),
Transform(b"", kOmitLast7, b""),
Transform(b"", kOmitLast1, b"ing "),
Transform(b"", kIdentity, b"\n\t"),
Transform(b"", kIdentity, b":"),
Transform(b" ", kIdentity, b". "),
Transform(b"", kIdentity, b"ed "),
Transform(b"", kOmitFirst9, b""),
Transform(b"", kOmitFirst7, b""),
Transform(b"", kOmitLast6, b""),
Transform(b"", kIdentity, b"("),
Transform(b"", kUppercaseFirst, b", "),
Transform(b"", kOmitLast8, b""),
Transform(b"", kIdentity, b" at "),
Transform(b"", kIdentity, b"ly "),
Transform(b" the ", kIdentity, b" of "),
Transform(b"", kOmitLast5, b""),
Transform(b"", kOmitLast9, b""),
Transform(b" ", kUppercaseFirst, b", "),
Transform(b"", kUppercaseFirst, b"\""),
Transform(b".", kIdentity, b"("),
Transform(b"", kUppercaseAll, b" "),
Transform(b"", kUppercaseFirst, b"\">"),
Transform(b"", kIdentity, b"=\""),
Transform(b" ", kIdentity, b"."),
Transform(b".com/", kIdentity, b""),
Transform(b" the ", kIdentity, b" of the "),
Transform(b"", kUppercaseFirst, b"'"),
Transform(b"", kIdentity, b". This "),
Transform(b"", kIdentity, b","),
Transform(b".", kIdentity, b" "),
Transform(b"", kUppercaseFirst, b"("),
Transform(b"", kUppercaseFirst, b"."),
Transform(b"", kIdentity, b" not "),
Transform(b" ", kIdentity, b"=\""),
Transform(b"", kIdentity, b"er "),
Transform(b" ", kUppercaseAll, b" "),
Transform(b"", kIdentity, b"al "),
Transform(b" ", kUppercaseAll, b""),
Transform(b"", kIdentity, b"='"),
Transform(b"", kUppercaseAll, b"\""),
Transform(b"", kUppercaseFirst, b". "),
Transform(b" ", kIdentity, b"("),
Transform(b"", kIdentity, b"ful "),
Transform(b" ", kUppercaseFirst, b". "),
Transform(b"", kIdentity, b"ive "),
Transform(b"", kIdentity, b"less "),
Transform(b"", kUppercaseAll, b"'"),
Transform(b"", kIdentity, b"est "),
Transform(b" ", kUppercaseFirst, b"."),
Transform(b"", kUppercaseAll, b"\">"),
Transform(b" ", kIdentity, b"='"),
Transform(b"", kUppercaseFirst, b","),
Transform(b"", kIdentity, b"ize "),
Transform(b"", kUppercaseAll, b"."),
Transform(b"\xc2\xa0", kIdentity, b""),
Transform(b" ", kIdentity, b","),
Transform(b"", kUppercaseFirst, b"=\""),
Transform(b"", kUppercaseAll, b"=\""),
Transform(b"", kIdentity, b"ous "),
Transform(b"", kUppercaseAll, b", "),
Transform(b"", kUppercaseFirst, b"='"),
Transform(b" ", kUppercaseFirst, b","),
Transform(b" ", kUppercaseAll, b"=\""),
Transform(b" ", kUppercaseAll, b", "),
Transform(b"", kUppercaseAll, b","),
Transform(b"", kUppercaseAll, b"("),
Transform(b"", kUppercaseAll, b". "),
Transform(b" ", kUppercaseAll, b"."),
Transform(b"", kUppercaseAll, b"='"),
Transform(b" ", kUppercaseAll, b". "),
Transform(b" ", kUppercaseFirst, b"=\""),
Transform(b" ", kUppercaseAll, b"='"),
Transform(b" ", kUppercaseFirst, b"='")
]
kNumTransforms = len(kTransforms)
def _to_upper_case(p, i):
"""Overly simplified model of uppercase in utf-8, but what RFC7932 specifies to use"""
if p[i] < 0xc0:
if 97 <= p[i] <= 122:
p[i] ^= 32
return 1
if p[i] < 0xe0:
p[i + 1] ^= 32
return 2
p[i + 2] ^= 5
return 3