Files
lore-engine-poc-v3/scripts/01_ingest.py
Lore Engine Dev 299ad5c146 slice 5.6: 01_ingest.py --write-neo4j dual-write flag
After the in-memory graph + pickle are written, the new flag
mirrors the full graph into the Neo4j 5 container at
$LORE_NEO4J_URI (default bolt://127.0.0.1:7687). The flag
is opt-in (default off) so the existing test suite's
invocations of 01_ingest.py without Docker still work.

The mirror logic:

  * Pre-pass for LoreSource nodes (full metadata via
    add_lore_source so SOURCED_FROM links find them with
    name, source_type, reliability, source_confidence).
  * Pre-pass for bare names (entities registered without
    any edge participation — keeps :Entity count in sync
    with in-memory all_names()).
  * Then the edges, add()-ed one by one.

Failure semantics:

  * Neo4j unreachable at startup → log + exit 3.
  * neo4j_graph not importable → log + exit 2.
  * Pickle is always written before the mirror attempt, so
    a flaky Neo4j container never loses the in-memory state.

Consistency runner stability:

  _detect_contradictions Pattern 2 (same object, different
  subjects) now sorts the two claims alphabetically so
  claim_a / claim_b are stable across runs. The
  graph.all_names() set iteration order is otherwise
  non-deterministic across Python processes and across
  the in-memory / Neo4j backends, and the original
  dict-iteration insertion order broke when slice 5.4
  migrated to all_names().

Tests:

  * tests/test_scripts/test_ingest_neo4j.py — 5 docker-gated
    tests (exits zero, entity count, relation count,
    default-off untouched, fails loud on unreachable URI).
  * tests/test_consistency/test_runner_categories.py — one
    test updated to assert claim_a/claim_b as a set rather
    than a specific order (matches the runner's new
    lexicographic-sort contract).

Suite: 619 -> 624 passed (+5 ingest-neo4j tests, all 559
baseline + 32 Neo4j + consistency + ingest tests preserved).
2026-06-18 23:01:35 -04:00

285 lines
10 KiB
Python

"""01_ingest — load the codex and build the in-memory graph + (best-effort) Cognee index.
Run:
python3 scripts/01_ingest.py [--codex lore_engine_poc/seed] [--skip-cognee]
Slice 3 adds an optional LLM-extraction step. The deterministic
parsers (markdown + structured YAML) always run; the LLM path
runs only when ``LORE_INGEST_LLM=1``. The LLM path can use a
``FakeProvider`` for tests (``LORE_INGEST_FAKE_LLM=1`` +
``LORE_INGEST_FAKE_LLM_SCRIPT=...``) or the real
``OllamaCloudProvider`` (default).
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
# Allow ``python3 scripts/01_ingest.py`` from the project root.
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from lore_engine_poc.parsers import iter_codex, extract_triples
from lore_engine_poc.parsers import iter_structured_yaml
from lore_engine_poc.tools import build_graph
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser()
p.add_argument("--codex", default=str(ROOT / "lore_engine_poc" / "seed"))
p.add_argument(
"--skip-cognee",
action="store_true",
help="Skip the Cognee cognify step (use when no LLM endpoint is configured).",
)
p.add_argument(
"--write-neo4j",
action="store_true",
help=(
"After building the in-memory graph + pickle, mirror it "
"into the Neo4j 5 container at $LORE_NEO4J_URI "
"(default bolt://127.0.0.1:7687). Default off — the "
"pickle path is the integration-test fixture bootstrap."
),
)
return p.parse_args()
def main() -> int:
args = parse_args()
print(f"[01_ingest] reading codex from {args.codex}")
if not os.path.isdir(args.codex):
print(f" ERROR: {args.codex} is not a directory", file=sys.stderr)
return 1
entities = list(iter_codex(args.codex))
md_triples = extract_triples(entities)
# Slice 1: also walk the structured-YAML files alongside the
# markdown codex. Markdown triples are the prose path (no
# bounds); YAML triples are the structured path (with bounds).
# ``build_graph`` merges them, and the structured bounds win
# when both paths produce the same fact.
yaml_root = os.path.join(args.codex, "yaml")
yaml_triples = []
if os.path.isdir(yaml_root):
yaml_triples = list(iter_structured_yaml(yaml_root))
print(f"[01_ingest] structured YAML triples: {len(yaml_triples)}")
else:
print(f"[01_ingest] no yaml/ subdir; structured path skipped")
triples = md_triples + yaml_triples
# Slice 3: optional LLM extraction. The deterministic path is
# authoritative; LLM triples are added on top. Failure of the
# LLM step is non-fatal (a missing key, a bad response, or a
# provider crash all degrade to "no LLM triples" and the script
# still builds a graph).
if os.environ.get("LORE_INGEST_LLM") == "1":
print("[01_ingest] LLM extraction: enabled")
llm_triples = _run_llm_extraction(entities)
triples = triples + llm_triples
print(f"[01_ingest] LLM triples added: {len(llm_triples)}")
else:
print("[01_ingest] LLM extraction: skipped (set LORE_INGEST_LLM=1 to enable)")
graph = build_graph(entities, triples)
# Dedupe triples for reporting.
seen = set()
uniq = []
for t in triples:
k = (t.subject, t.relation, t.object)
if k in seen:
continue
seen.add(k)
uniq.append(t)
by_type: dict[str, int] = {}
for t in uniq:
by_type[t.relation] = by_type.get(t.relation, 0) + 1
print(f"[01_ingest] entities: {len(entities)}")
print(f"[01_ingest] unique triples: {len(uniq)}")
for rel, n in sorted(by_type.items(), key=lambda kv: -kv[1]):
print(f" {rel:20s} {n}")
# Persist the graph for the demo script.
import pickle
out_path = ROOT / "lore_engine_poc" / ".graph.pkl"
with open(out_path, "wb") as f:
pickle.dump({"graph": graph, "entities": entities, "triples": uniq}, f)
print(f"[01_ingest] graph cached to {out_path}")
# Slice 5.6: optional Neo4j mirror. After the pickle is written
# (so the in-memory state is durable) we attempt to mirror the
# graph into the Neo4j 5 container. A failure here is loud and
# non-zero exit so CI catches a stale Neo4j state, but the
# pickle is already on disk.
if args.write_neo4j:
neo_uri = os.environ.get("LORE_NEO4J_URI", "bolt://127.0.0.1:7687")
try:
from lore_engine_poc.neo4j_graph import Neo4jGraph
except ImportError as exc:
print(
f"[01_ingest] --write-neo4j requested but neo4j_graph "
f"is not importable: {exc}",
file=sys.stderr,
)
return 2
try:
neo = Neo4jGraph(neo_uri)
except Exception as exc:
print(
f"[01_ingest] --write-neo4j failed to connect to "
f"{neo_uri}: {exc}",
file=sys.stderr,
)
return 3
try:
neo.ensure_schema()
# Pre-pass: LoreSource nodes (so SOURCED_FROM links find
# them with full metadata when add() runs).
for path, src in graph.lore_sources.items():
neo.add_lore_source(src)
# Pre-pass: bare names (entities that were registered
# but never participate in any edge — the in-memory graph
# keeps them in ``names``; Neo4j needs them as :Entity
# nodes too).
for name in graph.all_names():
if name in graph.lore_sources:
continue
if not list(graph.edges_for_subject(name)):
neo.register_name(name)
# All edges.
n_edges = 0
for subject in graph.all_names():
for edge in graph.edges_for_subject(subject):
neo.add(edge)
n_edges += 1
print(
f"[01_ingest] neo4j mirror: {n_edges} edges written "
f"to {neo_uri}"
)
finally:
neo.close()
if args.skip_cognee:
print("[01_ingest] --skip-cognee set; skipping Cognee cognify.")
return 0
# Best-effort Cognee integration. If the install failed or the
# backend isn't available we report and continue — the in-memory
# graph is sufficient for the slice.
try:
import cognee
except ImportError as e:
print(f"[01_ingest] cognee not installed: {e}")
return 0
try:
import asyncio
async def _run() -> None:
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
md_files = []
for dirpath, _dirs, files in os.walk(args.codex):
for fn in files:
if fn.lower().endswith(".md"):
md_files.append(os.path.join(dirpath, fn))
for path in md_files:
with open(path, "r", encoding="utf-8", errors="replace") as f:
await cognee.add(f.read(), dataset_name="codex")
await cognee.cognify()
print(f"[01_ingest] cognee.cognify() complete over {len(md_files)} files")
asyncio.run(_run())
except Exception as e:
print(f"[01_ingest] cognee step failed (non-fatal): {e}")
return 0
def _run_llm_extraction(entities) -> list:
"""Slice 3: run the LLM extractor over each entity's body
and return the resulting triples. Failure-tolerant: any
provider / parse / validation problem is logged and the
affected chunk is skipped. The deterministic graph is
untouched.
Provider selection (in order):
1. ``LORE_INGEST_FAKE_LLM=1`` → :class:`FakeProvider`,
script loaded from ``LORE_INGEST_FAKE_LLM_SCRIPT``
(a JSON file). Test-only.
2. Otherwise → :class:`OllamaCloudProvider`, which
requires ``$OLLAMA_API_KEY`` in the env. If unset,
the script logs and returns ``[]`` (no triples).
"""
from lore_engine_poc.extraction import extract_from_chunk
provider = _build_provider()
if provider is None:
return []
out: list = []
for ent in entities:
try:
triples = extract_from_chunk(ent, provider)
except Exception as exc:
print(
f"[01_ingest] LLM extract failed on {ent.slug}: {exc}",
file=sys.stderr,
)
continue
out.extend(triples)
return out
def _build_provider():
"""Build the LLM provider per the env-var contract. Returns
``None`` if no provider can be constructed (e.g. the user
asked for LLM extraction but didn't set a key)."""
from lore_engine_poc.llm import (
OllamaCloudProvider,
fake_provider_from_script_file,
)
if os.environ.get("LORE_INGEST_FAKE_LLM") == "1":
script_path = os.environ.get("LORE_INGEST_FAKE_LLM_SCRIPT")
if not script_path:
print(
"[01_ingest] LORE_INGEST_FAKE_LLM=1 but "
"LORE_INGEST_FAKE_LLM_SCRIPT is not set; "
"LLM extraction skipped.",
file=sys.stderr,
)
return None
if not os.path.isfile(script_path):
print(
f"[01_ingest] FakeProvider script not found: {script_path}",
file=sys.stderr,
)
return None
return fake_provider_from_script_file(script_path)
# Real provider path.
if not os.environ.get("OLLAMA_API_KEY"):
print(
"[01_ingest] LORE_INGEST_LLM=1 but $OLLAMA_API_KEY is not set; "
"LLM extraction skipped. Set OLLAMA_API_KEY (or "
"LORE_INGEST_FAKE_LLM=1 + LORE_INGEST_FAKE_LLM_SCRIPT=...) "
"to enable.",
file=sys.stderr,
)
return None
return OllamaCloudProvider()
if __name__ == "__main__":
raise SystemExit(main())