FanFicFare/fanficfare/browsercache/browsercache_blockfile.py
2023-01-01 13:06:03 -06:00

144 lines
6 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2022 FanFicFare team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import os
import struct
import time, datetime
# note share_open (on windows CLI) is implicitly readonly.
from .share_open import share_open
from .chromagnon import SuperFastHash
from .chromagnon.cacheAddress import CacheAddress
from .chromagnon.cacheBlock import CacheBlock
from .chromagnon.cacheData import CacheData
from .chromagnon.cacheEntry import CacheEntry
from .chromagnon.cacheParse import parse
from ..six.moves import range
from ..six import ensure_text
from .base_chromium import BaseChromiumCache
import logging
logger = logging.getLogger(__name__)
INDEX_MAGIC_NUMBER = 0xC103CAC3
BLOCK_MAGIC_NUMBER = 0xC104CAC3
class BlockfileCache(BaseChromiumCache):
"""Class to access data stream in Chrome Disk Blockfile Cache format cache files"""
def __init__(self, *args, **kargs):
"""Constructor for BlockfileCache"""
super(BlockfileCache,self).__init__(*args, **kargs)
self.cacheBlock = CacheBlock(os.path.join(self.cache_dir, "index"))
# Checking type
if self.cacheBlock.type != CacheBlock.INDEX:
raise Exception("Invalid Index File")
logger.debug("Using BlockfileCache")
# self.scan_cache_keys()
# 1/0
def scan_cache_keys(self):
"""
Scan index file and cache entries to save entries in this cache.
Saving uint32 address as key--hashing to find key later proved
unreliable.
"""
logger.debug("scan_cache_keys")
with share_open(os.path.join(self.cache_dir, "index"), 'rb') as index:
# Skipping Header
index.seek(92*4)
self.cache_keys = set()
for key in range(self.cacheBlock.tableSize):
raw = struct.unpack('I', index.read(4))[0]
if raw != 0:
## 0 == unused hash index slot. I think.
cacheaddr = CacheAddress(raw, path=self.cache_dir)
# logger.debug("cacheaddr? %s"%cacheaddr)
entry = CacheEntry(cacheaddr)
# Checking if there is a next item in the bucket because
# such entries are not stored in the Index File so they will
# be ignored during iterative lookup in the hash table
while entry.next != 0:
# logger.debug("spinning on entry linked list?")
self.add_key_mapping_entry(entry)
cacheaddr = CacheAddress(entry.next, path=self.cache_dir)
# logger.debug("cacheaddr? %s"%cacheaddr)
entry = CacheEntry(cacheaddr)
self.add_key_mapping_entry(entry)
def add_key_mapping_entry(self,entry):
if '/14161667/' in entry.keyToStr():
logger.debug(entry)
logger.debug("data length:%s"%len(entry.data))
@staticmethod
def is_cache_dir(cache_dir):
"""Return True only if a directory is a valid Cache for this class"""
if not os.path.isdir(cache_dir):
return False
index_path = os.path.join(cache_dir, "index")
if not os.path.isfile(index_path):
return False
with share_open(index_path, 'rb') as index_file:
if struct.unpack('I', index_file.read(4))[0] != INDEX_MAGIC_NUMBER:
return False
data0_path = os.path.join(cache_dir, "data_0")
if not os.path.isfile(data0_path):
return False
with share_open(data0_path, 'rb') as data0_file:
if struct.unpack('I', data0_file.read(4))[0] != BLOCK_MAGIC_NUMBER:
return False
return True
def get_data_key_impl(self, url, key):
entry = None
entrys = parse(self.cache_dir,[key.encode('utf8')])
logger.debug(entrys)
for entry in entrys:
entry_name = entry.keyToStr()
logger.debug("Name: %s"%entry_name)
logger.debug("Hash: 0x%08x"%entry.hash)
logger.debug("Usage Counter: %d"%entry.usageCounter)
logger.debug("Reuse Counter: %d"%entry.reuseCounter)
logger.debug("Creation Time: %s"%entry.creationTime)
# logger.debug("Creation Time: %s"%datetime.datetime.fromtimestamp(int(entry.creationTime/1000000)-EPOCH_DIFFERENCE))
logger.debug("Key: %s"%entry.keyToStr())
logger.debug("b'location':%s"%entry.httpHeader.headers.get(b'location','(no location)'))
if entry_name == key:
location = ensure_text(entry.httpHeader.headers.get(b'location',''))
ensure_text(entry.httpHeader.headers.get(b'content-encoding',''))
rawdata = None if location else self.get_raw_data(entry)
return (
location,
self.make_age(entry.creationTime),
ensure_text(entry.httpHeader.headers.get(b'content-encoding','')),
rawdata)
return None
def get_raw_data(self,entry):
for i in range(len(entry.data)):
# logger.debug("data loop i:%s"%i)
# logger.debug("entry.data[i].type:%s"%entry.data[i].type)
if entry.data[i].type == CacheData.UNKNOWN:
# Extracting data into a file
data = entry.data[i].data()
# logger.debug("type = UNKNOWN, data len:%s"%len(data))
# logger.debug("entry.httpHeader:%s"%entry.httpHeader)
return data