Source code for websweep.utils.json_io

import json
from pathlib import Path
from typing import Iterable

try:
    import orjson
except Exception:
    orjson = None


[docs] def json_dumps(obj) -> bytes: """Serialize an object to UTF-8 JSON bytes (orjson when available).""" if orjson is not None: return orjson.dumps(obj) return json.dumps(obj, ensure_ascii=False).encode("utf-8")
[docs] def json_loads(value): """Parse JSON from bytes or text using the active JSON backend.""" if orjson is not None: return orjson.loads(value) if isinstance(value, (bytes, bytearray)): value = value.decode("utf-8") return json.loads(value)
[docs] def append_jsonl(path, records: Iterable[dict]) -> None: """Append dictionaries to a NDJSON file, one JSON object per line.""" path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with path.open("ab") as handle: for row in records: handle.write(json_dumps(row) + b"\n")