Source code for websweep.extractor.extractor

"""This module provides the Extracter model-controller."""
import os
import shutil
import sqlite3 as sql
import time
import unicodedata
from collections import deque
from datetime import date as datelib
from pathlib import Path
from typing import Optional

import zipfile
from bs4 import BeautifulSoup
try:
    from multiprocess import Pool
except Exception:
    from multiprocessing import Pool

try:
    import tqdm
except Exception:
    class _NoOpProgress:
        def __init__(self, total=None, **_kwargs):
            self.total = total

        def __enter__(self):
            return self

        def __exit__(self, exc_type, exc, tb):
            return False

        def update(self, *_args, **_kwargs):
            return None

    class _NoOpTqdmModule:
        @staticmethod
        def tqdm(*_args, **kwargs):
            return _NoOpProgress(total=kwargs.get("total"))

    tqdm = _NoOpTqdmModule()

try:
    from json_io import append_jsonl
    from backend import resolve_overview_backend
except Exception:
    from ..utils.json_io import append_jsonl
    from ..utils.backend import resolve_overview_backend

try:
    import re2 as re
except Exception:
    import regex as re

ADDRESS_PATTERN = re.compile(
    r"\b([ a-zA-ZÀ-ÿ\-]+\s+[\s0-9-_a-zA-Z]{1,9})[\s\-,\|]{0,5}"
)
ADDRESS_NOISE_PATTERN = re.compile(r"(?i)\b(?:gevestigd|aan|te)\b")


def _parse_html(markup: str) -> BeautifulSoup:
    """Parse HTML with lxml when available, otherwise fall back to html.parser."""
    try:
        return BeautifulSoup(markup, "lxml")
    except Exception:
        return BeautifulSoup(markup, "html.parser")

[docs] class FileExtractor: """ A class for extracting data from one specific file. This class is used by the extractor pipeline. Custom FileExtractor subclasses can be build, extending the data extracting functionalities. Parameters: info: tuple A tuple containing metadata about the file to extract data from, including the domain, level, website, date and path. Methods: extracting() Initiates the extracting of data from a file at the specified file path. Calls extracting_default_metadata() and extract_extended_metadata(). extract_default_metadata() Defines methods that include the default extracting functionalities. extract_extended_metadata() Defines methods that include the extendable extracting functionalities in subclasses. """ def __init__(self, info): self.metadata = dict() ( self.metadata["domain"], self.metadata["identifier"], self.metadata["level"], self.metadata["website"], self.metadata["date"], self.metadata["path"], ) = info # Filter and include only those methods from self's attributes (dir(self)) # that are callable (functions) and start with "_extract_", but exclude methods # defined in the FileExtractor class (dir(FileExtractor)), meaning that only the custom child methods are included self.child_methods = [method for method in dir(self) if callable(getattr(self, method)) and method.startswith('_extract_') and method not in [method for method in dir(FileExtractor) if callable(getattr(FileExtractor, method)) and method.startswith('_extract_')]] if isinstance(info[-1], BeautifulSoup): self.soup = info[-1] self.metadata["path"] = "" else: # Read HTML from archived domain zip. stored_path = str(self.metadata["path"]) next_slash = stored_path.find( "/", stored_path.find("/crawled_data/") + len("/crawled_data/"), ) zip_from_path = Path(stored_path[:next_slash] + ".zip") member_from_path = stored_path[next_slash + 1:] if not zip_from_path.exists(): raise FileNotFoundError( f"Could not resolve archived page in zip for path '{stored_path}'." ) with zipfile.ZipFile(zip_from_path, "r") as zip_file: with zip_file.open(member_from_path) as file: self.text = file.read().decode("utf-8", "ignore") self.soup = _parse_html(self.text)
[docs] def extracting(self): """Extract text and metadata for one page and return a record dictionary.""" # Get metadata self.metadata.update(self._extract_metadata()) #future self.metadata |= self._extract_metadata() # Clean the HTML to raw text self.text = self._clean_html() # Call the method which defines which actions should be taken to extract data self.extract_default_metadata() # Call all methods that are customly created in any child classes of this basic File Extractor class self.extract_custom_metadata() # Add the raw text to the metadata at last self.metadata["text"] = self.text return self.metadata
[docs] def extract_default_metadata(self): """Populate the built-in default metadata fields.""" # Core defaults are intentionally conservative. # Contact fields (phone/email/fax) are opt-in via custom _extract_* methods. self.metadata["zipcode"] = self._extract_zipcode() self.metadata["address"] = self._extract_address()
[docs] def extract_custom_metadata(self): """Run ``_extract_*`` methods defined by custom subclasses.""" # Execute all the methods that start with "_extract_" in the name in the child class for method in self.child_methods: self.metadata[method.split("_extract_")[1]] = getattr(self, method)()
def _extract_zipcode(self) -> list: """ Extract the zipcode from the input file, and add found zipcodes to self.zipcode in set form """ pattern = re.compile(r"\b(?:NL-)?\d{4}\s?[A-Z]{2}\b") # Remove non-feasible endings zipcodes = [zipcode for zipcode in set(re.findall(pattern, self.text)) if zipcode[-2:] not in {'SS', 'SD', 'SA'}] return zipcodes def _extract_address(self) -> list: """ Extract the adres from the input file, and add found adres to self.adres in set form( """ add_found = [] for zipcode in self.metadata["zipcode"]: add, *_ = self.text.partition(zipcode) if _[0] == "": continue add = add[-100:].rstrip().rsplit("\n", 2) if (len(add[-1]) < 5) and (len(add)>1): #sometimes the postcode is NL-1933XX add = add[-2] else: add = add[-1] matches = re.findall(ADDRESS_PATTERN, add.strip()) if len(matches) > 0: # Remove unwanted words from matches filtered_matches = [re.sub(ADDRESS_NOISE_PATTERN, "", match) for match in matches] filtered_matches = [match.strip() for match in filtered_matches if match.strip()] if filtered_matches: add_found.append(filtered_matches[-1]) return add_found def _extract_metadata(self) -> dict: """ This function is used to extract the metadata from the file, and return it as a dictionary. """ # keep only the most used tags (>10% of a random subset of pages) options = {'og:title', 'article:publisher', 'msapplication-TileImage', 'robots', 'og:description', 'author', 'og:image:type', 'format-detection', 'og:image:width', 'msapplication-TileColor', 'twitter:title', 'og:site_name', 'twitter:label1', 'theme-color', 'og:type', 'twitter:description', 'og:image', 'generator', 'encoding', 'twitter:data1', 'og:url', 'twitter:card', 'viewport', 'article:modified_time', 'keywords', 'description', 'og:locale', 'twitter:image', 'google-site-verification', 'og:image:height'} metadata = dict() for el in self.soup("meta"): el = el.attrs encoding = el.get("charset") if encoding is not None: metadata["meta_encoding"] = encoding for type in ["name","property"]: nam = el.get(type) if (nam is not None) and (nam in options): cont = el.get("content") if cont is not None: metadata[f"meta_{nam}"] = cont break #continue to next element, it won't have name and property return metadata def _clean_html(self) -> str: """Return plain normalized text extracted from BeautifulSoup HTML.""" text = self.soup.get_text(separator="\n", strip=True) return unicodedata.normalize("NFKD", text)
[docs] class Extractor: """ A class for extracting data from files and storing it in the target folder. Parameters: target_folder_path: str The path to the folder where the extracted data is stored. use_database: bool, optional Whether or not to use a database backend (duckdb/sqlite) for the overview file. If False, TSV is used. Default is True. extractor_delete_files: bool, optional Whether or not to delete the original files after extracting data. Default is False. file_extractor: FileExtractor, optional An custom instance of a FileExtractor class used to extract data from files. Default is None, in which case it will use the default FileExtractor class. Methods: _create_results(path) Extracts the data from one specific file extract_urls() Start the extracting of all data from the files """ def __init__( self, target_folder_path, use_database=True, extractor_delete_files=False, start_date="0000-01-01", end_date="9999-01-01", file_extractor: FileExtractor = None, overview_backend: Optional[str] = None, workers: Optional[int] = None, imap_chunksize: int = 50, maxtasksperchild: int = 1000, extract_timeout_seconds: int = 10, **kwargs, ): self.target_folder_path = Path(target_folder_path) legacy_use_sqlite = kwargs.pop("use_sqlite", None) if kwargs: raise TypeError(f"Unexpected keyword argument(s): {', '.join(kwargs.keys())}") if legacy_use_sqlite is not None: use_database = bool(legacy_use_sqlite) self.use_database = bool(use_database) self.overview_backend = resolve_overview_backend( base_folder=self.target_folder_path, use_database=self.use_database, override_backend=overview_backend, ) self.extractor_delete_files = extractor_delete_files self.file_extractor = file_extractor self.start_date = str(start_date) self.end_date = str(end_date) self.number_error = 0 self.workers = max(1, int(workers)) if workers is not None else max(1, (os.cpu_count() or 1)) self.imap_chunksize = max(1, int(imap_chunksize)) self.maxtasksperchild = max(1, int(maxtasksperchild)) self.extract_timeout_seconds = max(1, int(extract_timeout_seconds)) def _connect_overview_db(self): """Open a connection to the configured overview backend database.""" if self.overview_backend == "duckdb": try: import duckdb except Exception as exc: raise RuntimeError( "DuckDB backend requested, but dependency 'duckdb' is not installed." ) from exc return duckdb.connect(os.path.join(self.target_folder_path, "overview_urls.duckdb")) return sql.connect(os.path.join(self.target_folder_path, "overview_urls.db")) @staticmethod def _error_metadata(path, reason="Error extracting"): """Build a minimal metadata record for failed extraction attempts.""" domain, identifier, level, url, date, _ = path return { "domain": domain, "identifier": identifier, "level": level, "website": url, "date": date, "path": reason, } @staticmethod def _is_error_metadata(metadata: dict) -> bool: """Return whether a metadata record represents an extraction failure.""" return metadata.get("path") in {"Error extracting", "Timeout extracting"}
[docs] def _create_results(self, path): """Extract one result record from a crawled page metadata tuple.""" [domain, identifier, level, url, date, stored_path] = path try: extractor_class = self.file_extractor or FileExtractor extractor_instance = extractor_class( [domain, identifier, level, url, date, stored_path] ) metadata = extractor_instance.extracting() if metadata is None: return self._error_metadata(path) return metadata except Exception: return self._error_metadata(path)
def _iter_chunk_results(self, chunk): """Yield extracted records for a chunk with per-task timeout enforcement.""" if not chunk: return pending = {} to_submit = deque(chunk) pool = None try: pool = Pool( processes=self.workers, maxtasksperchild=self.maxtasksperchild, ) while to_submit or pending: while to_submit and len(pending) < self.workers: item = to_submit.popleft() async_result = pool.apply_async(self._create_results, (item,)) pending[async_result] = (item, time.monotonic()) yielded_result = False for async_result, (item, _) in list(pending.items()): if not async_result.ready(): continue try: metadata = async_result.get() except Exception: metadata = self._error_metadata(item) del pending[async_result] yield metadata yielded_result = True if yielded_result: continue now = time.monotonic() timed_out = [ async_result for async_result, (_, started_at) in pending.items() if (now - started_at) > self.extract_timeout_seconds ] if timed_out: timed_out_set = set(timed_out) survivors = [] for async_result, (item, _) in list(pending.items()): if async_result in timed_out_set: yield self._error_metadata(item, reason="Timeout extracting") else: survivors.append(item) del pending[async_result] # Reset the worker pool to reliably kill stuck tasks and keep # processing the remaining queue without global monkey patching. pool.terminate() pool.join() pool = Pool( processes=self.workers, maxtasksperchild=self.maxtasksperchild, ) for item in reversed(survivors): to_submit.appendleft(item) continue time.sleep(0.01) finally: if pool is not None: try: pool.close() except Exception: pool.terminate() pool.join()
[docs] def extract_urls(self): """Extract all successful crawl rows for the configured date window.""" start = time.time() if self.overview_backend in {"sqlite", "duckdb"}: connection = self._connect_overview_db() cursor = connection.cursor() query = ( "SELECT domain, identifier, level, url, session_date, path " "FROM Overview WHERE (session_date >= ?) AND (session_date <= ?) AND (status == '200')" ) results = cursor.execute(query, (self.start_date, self.end_date)).fetchall() connection.close() else: with open(os.path.join(self.target_folder_path, "overview_urls.tsv")) as f: f.readline() # header results = [] for line in f: domain, identifier, level, url, status, date, _, path = line.split("\t") if ( (date >= self.start_date) and (date <= self.end_date) and (status == "200") ): results.append([domain, identifier, level, url, date, path.strip()]) if len(results) == 0: print("Extracted data from 0 pages (0 errors) in 0.0 seconds.") return # chunking in 1M files n = 1000000 with tqdm.tqdm(total=len(results), leave=True, miniters=1) as pbar: # chunk output in files of n lines for i in range(0, len(results), n): file_res = ( self.target_folder_path / "extracted_data" / ( "extracted_data_" + str(datelib.today()) + f"_{i}-{i+n}.ndjson" ) ) Path(file_res).parent.mkdir(parents=True, exist_ok=True) chunk = results[i : i + n] for json_dict in self._iter_chunk_results(chunk): if self._is_error_metadata(json_dict): self.number_error += 1 append_jsonl(file_res, [json_dict]) pbar.update() if self.extractor_delete_files: # Loop through all subdirectories in the given folder for root, dirs, files in os.walk(self.target_folder_path / "crawled_data"): # Delete all files in the current subdirectory for dir in dirs: if re.match(r"\d{4}-\d{2}-\d{2}", dir) and dir >= self.start_date and dir <= self.end_date: shutil.rmtree(os.path.join(root, dir)) print( f"Extracted data from {len(results)} pages ({self.number_error} errors) in {time.time() - start:2.1f} seconds." )