"""This module provides the Consolidator model-controller."""
import dataclasses
from collections import Counter
from typing import List, Generator, Dict, Any, Optional, Union
from itertools import islice
from pathlib import Path
import tempfile
from urllib.parse import urlparse
try:
import tldextract
except Exception:
tldextract = None
try:
from tqdm import tqdm
except Exception:
def tqdm(iterable, **_kwargs):
return iterable
try:
from json_io import json_dumps, json_loads
from public_suffix import build_tldextract_extractor
except Exception:
from ..utils.json_io import json_dumps, json_loads
from ..utils.public_suffix import build_tldextract_extractor
if tldextract is not None:
_TLD_EXTRACTOR = build_tldextract_extractor(tldextract)
else:
_TLD_EXTRACTOR = None
# Constants
COLUMNS_KEEP = [
"domain", "identifier", "level", "website", "date", "path",
"phone", "email", "fax", "zipcode", "address", "text"
]
CONTACT = ["phone", "email", "fax", "zipcode", "address"]
[docs]
@dataclasses.dataclass
class Domain:
"""
A data class representing a domain with various attributes.
Attributes:
domain (str): The domain name.
identifier (str): The identifier of the domain.
phone (Counter): A counter for phone numbers.
email (Counter): A counter for email addresses.
fax (Counter): A counter for fax numbers.
zipcode (Counter): A counter for zip codes.
address (Counter): A counter for addresses.
kvk (Counter): A counter for KVK numbers.
btw (Counter): A counter for BTW numbers.
text (str): The text associated with the domain.
"""
domain: str
identifier: str
phone: Counter
email: Counter
fax: Counter
zipcode: Counter
address: Counter
kvk: Counter
btw: Counter
text: str
def __add__(self, other):
"""
Adds two Domain objects together, combining their attributes.
Args:
other (Domain): Another Domain object to add.
Returns:
Domain: A new Domain object with combined attributes.
Raises:
ValueError: If the domains of the two objects are different.
"""
if self.domain != other.domain:
raise ValueError("Cannot add domains with different names")
return Domain(
domain=self.domain,
identifier=self.identifier,
phone=self.phone + other.phone,
email=self.email + other.email,
fax=self.fax + other.fax,
zipcode=self.zipcode + other.zipcode,
address=self.address + other.address,
kvk=self.kvk + other.kvk,
btw=self.btw + other.btw,
text=self.text + " " + other.text,
)
def __post_init__(self):
"""
Initializes counters for the Domain object.
"""
self.phone = Counter(self.phone)
self.email = Counter(self.email)
self.fax = Counter(self.fax)
self.zipcode = Counter(self.zipcode)
self.address = Counter(self.address)
self.kvk = Counter(self.kvk)
self.btw = Counter(self.btw)
[docs]
def to_dict(self) -> Dict[str, Any]:
"""
Converts the Domain object into a dictionary.
Returns:
Dict[str, Any]: A dictionary representation of the Domain object.
"""
return {
"domain": self.domain,
"identifier": self.identifier,
"phone": dict(self.phone),
"email": dict(self.email),
"fax": dict(self.fax),
"zipcode": dict(self.zipcode),
"address": dict(self.address),
"kvk": dict(self.kvk),
"btw": dict(self.btw),
"text": self.text,
}
[docs]
@classmethod
def from_dict(cls, d: Dict[str, Any]):
"""
Creates a Domain object from a dictionary.
Args:
d (Dict[str, Any]): A dictionary containing Domain attributes.
Returns:
Domain: A new Domain object created from the dictionary.
"""
return cls(**d)
[docs]
class Consolidator:
"""
Process domain-level information from NDJSON files.
The consolidator reads extracted page-level records in chunks, aggregates
values per domain, and writes a merged domain-level output file.
"""
def __init__(
self,
input_file: Optional[Union[str, Path]] = None,
target_folder_path: Optional[Union[str, Path]] = None,
output_file: Optional[Union[str, Path]] = None,
chunk_size: int = 10000,
):
"""
Initialize consolidator path settings.
Args:
input_file: Optional extracted NDJSON path. If omitted, the latest
file in ``<target_folder_path>/extracted_data`` is used.
target_folder_path: Optional project output folder. Used to resolve
default input/output locations.
output_file: Optional consolidated NDJSON path. If omitted, defaults
to ``<target_folder_path>/consolidated_data/consolidated.ndjson``.
chunk_size: Number of extracted rows processed per chunk.
"""
self.input_file = Path(input_file) if input_file is not None else None
self.target_folder_path = (
Path(target_folder_path) if target_folder_path is not None else None
)
self.output_file = Path(output_file) if output_file is not None else None
self.chunk_size = max(1, int(chunk_size))
def _resolve_input_file(self) -> Path:
"""Resolve the extracted NDJSON input path."""
if self.input_file is not None:
input_path = Path(self.input_file)
if not input_path.exists() or not input_path.is_file():
raise FileNotFoundError(f"Input extracted NDJSON does not exist: {input_path}")
return input_path
if self.target_folder_path is None:
raise ValueError(
"Consolidator requires either input_file or target_folder_path."
)
extracted_dir = self.target_folder_path / "extracted_data"
extracted_files = sorted(
extracted_dir.glob("*.ndjson"),
key=lambda p: p.stat().st_mtime,
)
if not extracted_files:
raise FileNotFoundError(
f"No extracted NDJSON files found in: {extracted_dir}"
)
return extracted_files[-1]
def _resolve_output_file(
self,
input_path: Path,
final_output: Optional[Union[str, Path]] = None,
) -> Path:
"""Resolve the consolidated NDJSON output path."""
if final_output is not None:
return Path(final_output)
if self.output_file is not None:
return Path(self.output_file)
# Preferred default: target folder consolidated output.
if self.target_folder_path is not None:
return self.target_folder_path / "consolidated_data" / "consolidated.ndjson"
# Backward-compatible fallback when only input_file was provided.
# Expected input structure: <target_folder>/extracted_data/*.ndjson
if input_path.parent.name == "extracted_data":
return input_path.parent.parent / "consolidated_data" / "consolidated.ndjson"
raise ValueError(
"Consolidator could not infer output path. Provide output_file or final_output."
)
[docs]
def save_orjson_loads(self, line: str) -> Dict[str, Any]:
"""
Loads a line from an ndjson file using orjson.
Args:
line (str): A line from an ndjson file.
Returns:
Dict[str, Any]: A dictionary representing the line.
"""
try:
return json_loads(line)
except Exception:
return None
[docs]
def read_ndjson_in_chunks(self) -> Generator[List[Dict[str, Any]], None, None]:
"""
Reads an ndjson file in chunks.
Yields:
Generator[List[Dict[str, Any]], None, None]: A generator that yields lists of dictionaries, each representing a line in the ndjson file.
"""
input_file = self._resolve_input_file()
with input_file.open('rb') as f:
while True:
lines_gen = list(islice(f, self.chunk_size))
if not lines_gen:
break
jsons = [self.save_orjson_loads(line) for line in lines_gen]
yield [_ for _ in jsons if _ is not None]
[docs]
def create_domain_info(self, chunk: List[Dict[str, Any]], output_file: str):
"""
Creates domain information from sorted chunks and writes to an output file.
Args:
chunk (List[Dict[str, Any]]): A list of dictionaries, each representing a domain.
output_file (str): The path to the output file where the domain information will be written.
"""
domain = None
domain_info = None
with open(output_file, "wb") as f:
for site in tqdm(sorted(chunk, key=lambda d: self._clean_domain(d.get("domain", "")))):
new_domain = self._clean_domain(site["domain"])
if domain is None or new_domain != domain:
if domain_info is not None:
self._dump_domain_to_file(domain_info, f)
domain = new_domain
domain_info = self._initialize_domain_counters(site)
self._update_domain_counters(domain_info, site)
# except Exception as e:
# # Handle the timeout exception, perhaps by logging it
# print(f"Processing of {site['domain']} timed out.")
if domain_info is not None:
self._dump_domain_to_file(domain_info, f)
[docs]
def merge_domain_files(self, input_files: List[str], final_output: str):
"""
Merges multiple domain files into a single file.
Args:
input_files (List[str]): A list of file paths to be merged.
final_output (str): Path to the final output file.
"""
final_output_path = Path(final_output)
final_output_path.parent.mkdir(parents=True, exist_ok=True)
if not input_files:
final_output_path.write_bytes(b"")
return
files = [open(file, "rb") for file in input_files]
domains_objects = [Domain.from_dict(json_loads(file.readline())) for file in files]
domains = [domain.domain for domain in domains_objects]
smallest_value = "__first_domain"
d = None
with final_output_path.open("wb") as f:
while domains:
new_smallest_value = min(domains)
indexes = [i for i, x in enumerate(domains) if x == new_smallest_value]
if new_smallest_value != smallest_value:
if smallest_value != "__first_domain":
f.write(json_dumps(d.to_dict()) + b"\n")
smallest_value = new_smallest_value
smallest_index = indexes[0]
d = domains_objects[smallest_index]
if len(indexes) > 1:
for i in indexes[1:]:
d += domains_objects[i]
else:
for i in indexes:
d += domains_objects[i]
# Read new lines for those indexes and update domains and domains_objects
for i in indexes[::-1]:
line = files[i].readline()
if line:
domains_objects[i] = Domain.from_dict(json_loads(line))
domains[i] = domains_objects[i].domain
else:
domains.pop(i)
domains_objects.pop(i)
files[i].close()
files.pop(i)
f.write(json_dumps(d.to_dict()) + b"\n")
# # Delete temporary files
for file in input_files:
Path(file).unlink()
[docs]
def consolidate(self, final_output: Optional[Union[str, Path]] = None):
"""Run full consolidation: chunk, aggregate per chunk, then merge chunks."""
input_path = self._resolve_input_file()
output_path = self._resolve_output_file(input_path, final_output=final_output)
output_path.parent.mkdir(parents=True, exist_ok=True)
chunk_files: List[str] = []
with tempfile.TemporaryDirectory(prefix="websweep_consolidator_") as tmpdir:
# Read NDJSON in chunks and organize per domain
for i, chunk in enumerate(self.read_ndjson_in_chunks()):
chunk_file = str(Path(tmpdir) / f"temp_chunk_{i}.ndjson")
self.create_domain_info(chunk, chunk_file)
chunk_files.append(chunk_file)
# Merge sorted files
self.merge_domain_files(chunk_files, final_output=str(output_path))
def _clean_domain(self, domain: str) -> str:
"""
Cleans a domain name.
Args:
domain (str): The domain name to clean.
Returns:
str: The cleaned domain name.
"""
if _TLD_EXTRACTOR is not None:
extracted = _TLD_EXTRACTOR(domain)
registered = (
getattr(extracted, "top_domain_under_public_suffix", None)
or getattr(extracted, "registered_domain", "")
)
if registered:
return registered
host = urlparse(domain).netloc or str(domain)
host = host.split("/", 1)[0].split(":", 1)[0].lower().replace("www.", "")
parts = [part for part in host.split(".") if part]
if parts and all(part.isdigit() for part in parts):
return ".".join(parts)
if len(parts) >= 2:
return ".".join(parts[-2:])
return host
def _initialize_domain_counters(self, site: Dict[str, Any]) -> Domain:
"""
Initializes counters for a new domain.
Args:
site (Dict[str, Any]): A dictionary representing a site.
Returns:
Domain: A new Domain object with initialized counters.
"""
return Domain(
domain=self._clean_domain(site["domain"]),
identifier=site["identifier"],
phone=Counter(),
email=Counter(),
fax=Counter(),
zipcode=Counter(),
address=Counter(),
kvk=Counter(),
btw=Counter(),
text=""
)
def _update_domain_counters(self, domain: Domain, site: Dict[str, Any]):
"""
Updates the counters of a domain with information from a site.
Args:
domain (Domain): The Domain object to update.
site (Dict[str, Any]): A dictionary representing a site.
"""
domain.phone.update(site.get("phone") or [])
domain.email.update(site.get("email") or [])
domain.fax.update(site.get("fax") or [])
domain.zipcode.update(site.get("zipcode") or [])
domain.address.update(site.get("address") or [])
domain.btw.update(site.get("btw") or [])
domain.kvk.update(site.get("kvk") or [])
domain.text += " " + (site.get("text") or "")
def _dump_domain_to_file(self, domain: Domain, file_object):
"""
Writes a domain's information to a file.
Args:
domain (Domain): The Domain object to write to the file.
file_object: The file object to write to.
"""
file_object.write(json_dumps(domain.to_dict()) + b"\n")