mirror of
https://github.com/JimmXinu/FanFicFare.git
synced 2025-12-15 21:32:28 +01:00
63 lines
3 KiB
Python
63 lines
3 KiB
Python
# Copyright 2021 Sidney Markowitz All Rights Reserved.
|
|
# Distributed under MIT license.
|
|
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
|
|
|
|
|
|
class BrotliBitReader:
|
|
"""Wrap a bytes buffer to enable reading 0 < n <=24 bits at a time, or transfer of arbitrary number of bytes"""
|
|
|
|
kBitMask = [
|
|
0x000000, 0x000001, 0x000003, 0x000007, 0x00000f, 0x00001f, 0x00003f, 0x00007f,
|
|
0x0000ff, 0x0001ff, 0x0003ff, 0x0007ff, 0x000fff, 0x001fff, 0x003fff, 0x007fff,
|
|
0x00ffff, 0x01ffff, 0x03ffff, 0x07ffff, 0x0fffff, 0x1fffff, 0x3fffff, 0x7fffff,
|
|
0xffffff
|
|
]
|
|
|
|
def __init__(self, input_buffer):
|
|
self.buf_ = bytearray(input_buffer)
|
|
self.buf_len_ = len(input_buffer)
|
|
self.pos_ = 0 # byte position in stream
|
|
self.bit_pos_ = 0 # current bit-reading position in current byte (number bits already read from byte, 0-7)
|
|
|
|
def reset(self):
|
|
"""Reset an initialized BrotliBitReader to start of input buffer"""
|
|
self.pos_ = 0
|
|
self.bit_pos_ = 0
|
|
|
|
def read_bits(self, n_bits, bits_to_skip=None):
|
|
"""Get n_bits unsigned integer treating input as little-endian byte stream, maybe advancing input buffer pointer
|
|
n_bits: is number of bits to read from input buffer. Set to None or 0 to seek ahead ignoring the value
|
|
bits_to_skip: number of bits to advance in input_buffer, defaults to n_bits if it is None
|
|
pass in 0 to peek at the next n_bits of value without advancing
|
|
It is ok to have n_bits and bits_to_skip be different non-zero values if that is what is wanted
|
|
Returns: the next n_bits from the buffer as a little-endian integer, 0 if n_bits is None or 0
|
|
"""
|
|
val = 0
|
|
if bits_to_skip is None:
|
|
bits_to_skip = n_bits
|
|
if n_bits:
|
|
bytes_shift = 0
|
|
buf_pos = self.pos_
|
|
bit_pos_when_done = n_bits + self.bit_pos_
|
|
while bytes_shift < bit_pos_when_done:
|
|
if buf_pos >= self.buf_len_:
|
|
break # if hit end of buffer, this simulates zero padding after end, which is correct
|
|
val |= self.buf_[buf_pos] << bytes_shift
|
|
bytes_shift += 8
|
|
buf_pos += 1
|
|
val = (val >> self.bit_pos_) & self.kBitMask[n_bits]
|
|
if bits_to_skip:
|
|
next_in_bits = self.bit_pos_ + bits_to_skip
|
|
self.bit_pos_ = next_in_bits & 7
|
|
self.pos_ += next_in_bits >> 3
|
|
return val
|
|
|
|
def copy_bytes(self, dest_buffer, dest_pos, n_bytes):
|
|
"""Copy bytes from input buffer. This will first skip to next byte boundary if not already on one"""
|
|
if self.bit_pos_ != 0:
|
|
self.bit_pos_ = 0
|
|
self.pos_ += 1
|
|
if n_bytes > 0: # call with n_bytes == 0 to just skip to next byte boundary
|
|
new_pos = self.pos_ + n_bytes
|
|
memoryview(dest_buffer)[dest_pos:dest_pos+n_bytes] = self.buf_[self.pos_:new_pos]
|
|
self.pos_ = new_pos
|