After the in-memory graph + pickle are written, the new flag
mirrors the full graph into the Neo4j 5 container at
$LORE_NEO4J_URI (default bolt://127.0.0.1:7687). The flag
is opt-in (default off) so the existing test suite's
invocations of 01_ingest.py without Docker still work.
The mirror logic:
* Pre-pass for LoreSource nodes (full metadata via
add_lore_source so SOURCED_FROM links find them with
name, source_type, reliability, source_confidence).
* Pre-pass for bare names (entities registered without
any edge participation — keeps :Entity count in sync
with in-memory all_names()).
* Then the edges, add()-ed one by one.
Failure semantics:
* Neo4j unreachable at startup → log + exit 3.
* neo4j_graph not importable → log + exit 2.
* Pickle is always written before the mirror attempt, so
a flaky Neo4j container never loses the in-memory state.
Consistency runner stability:
_detect_contradictions Pattern 2 (same object, different
subjects) now sorts the two claims alphabetically so
claim_a / claim_b are stable across runs. The
graph.all_names() set iteration order is otherwise
non-deterministic across Python processes and across
the in-memory / Neo4j backends, and the original
dict-iteration insertion order broke when slice 5.4
migrated to all_names().
Tests:
* tests/test_scripts/test_ingest_neo4j.py — 5 docker-gated
tests (exits zero, entity count, relation count,
default-off untouched, fails loud on unreachable URI).
* tests/test_consistency/test_runner_categories.py — one
test updated to assert claim_a/claim_b as a set rather
than a specific order (matches the runner's new
lexicographic-sort contract).
Suite: 619 -> 624 passed (+5 ingest-neo4j tests, all 559
baseline + 32 Neo4j + consistency + ingest tests preserved).
285 lines
10 KiB
Python
285 lines
10 KiB
Python
"""01_ingest — load the codex and build the in-memory graph + (best-effort) Cognee index.
|
|
|
|
Run:
|
|
python3 scripts/01_ingest.py [--codex lore_engine_poc/seed] [--skip-cognee]
|
|
|
|
Slice 3 adds an optional LLM-extraction step. The deterministic
|
|
parsers (markdown + structured YAML) always run; the LLM path
|
|
runs only when ``LORE_INGEST_LLM=1``. The LLM path can use a
|
|
``FakeProvider`` for tests (``LORE_INGEST_FAKE_LLM=1`` +
|
|
``LORE_INGEST_FAKE_LLM_SCRIPT=...``) or the real
|
|
``OllamaCloudProvider`` (default).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Allow ``python3 scripts/01_ingest.py`` from the project root.
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from lore_engine_poc.parsers import iter_codex, extract_triples
|
|
from lore_engine_poc.parsers import iter_structured_yaml
|
|
from lore_engine_poc.tools import build_graph
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--codex", default=str(ROOT / "lore_engine_poc" / "seed"))
|
|
p.add_argument(
|
|
"--skip-cognee",
|
|
action="store_true",
|
|
help="Skip the Cognee cognify step (use when no LLM endpoint is configured).",
|
|
)
|
|
p.add_argument(
|
|
"--write-neo4j",
|
|
action="store_true",
|
|
help=(
|
|
"After building the in-memory graph + pickle, mirror it "
|
|
"into the Neo4j 5 container at $LORE_NEO4J_URI "
|
|
"(default bolt://127.0.0.1:7687). Default off — the "
|
|
"pickle path is the integration-test fixture bootstrap."
|
|
),
|
|
)
|
|
return p.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
|
|
print(f"[01_ingest] reading codex from {args.codex}")
|
|
if not os.path.isdir(args.codex):
|
|
print(f" ERROR: {args.codex} is not a directory", file=sys.stderr)
|
|
return 1
|
|
|
|
entities = list(iter_codex(args.codex))
|
|
md_triples = extract_triples(entities)
|
|
|
|
# Slice 1: also walk the structured-YAML files alongside the
|
|
# markdown codex. Markdown triples are the prose path (no
|
|
# bounds); YAML triples are the structured path (with bounds).
|
|
# ``build_graph`` merges them, and the structured bounds win
|
|
# when both paths produce the same fact.
|
|
yaml_root = os.path.join(args.codex, "yaml")
|
|
yaml_triples = []
|
|
if os.path.isdir(yaml_root):
|
|
yaml_triples = list(iter_structured_yaml(yaml_root))
|
|
print(f"[01_ingest] structured YAML triples: {len(yaml_triples)}")
|
|
else:
|
|
print(f"[01_ingest] no yaml/ subdir; structured path skipped")
|
|
|
|
triples = md_triples + yaml_triples
|
|
|
|
# Slice 3: optional LLM extraction. The deterministic path is
|
|
# authoritative; LLM triples are added on top. Failure of the
|
|
# LLM step is non-fatal (a missing key, a bad response, or a
|
|
# provider crash all degrade to "no LLM triples" and the script
|
|
# still builds a graph).
|
|
if os.environ.get("LORE_INGEST_LLM") == "1":
|
|
print("[01_ingest] LLM extraction: enabled")
|
|
llm_triples = _run_llm_extraction(entities)
|
|
triples = triples + llm_triples
|
|
print(f"[01_ingest] LLM triples added: {len(llm_triples)}")
|
|
else:
|
|
print("[01_ingest] LLM extraction: skipped (set LORE_INGEST_LLM=1 to enable)")
|
|
|
|
graph = build_graph(entities, triples)
|
|
|
|
# Dedupe triples for reporting.
|
|
seen = set()
|
|
uniq = []
|
|
for t in triples:
|
|
k = (t.subject, t.relation, t.object)
|
|
if k in seen:
|
|
continue
|
|
seen.add(k)
|
|
uniq.append(t)
|
|
|
|
by_type: dict[str, int] = {}
|
|
for t in uniq:
|
|
by_type[t.relation] = by_type.get(t.relation, 0) + 1
|
|
|
|
print(f"[01_ingest] entities: {len(entities)}")
|
|
print(f"[01_ingest] unique triples: {len(uniq)}")
|
|
for rel, n in sorted(by_type.items(), key=lambda kv: -kv[1]):
|
|
print(f" {rel:20s} {n}")
|
|
|
|
# Persist the graph for the demo script.
|
|
import pickle
|
|
out_path = ROOT / "lore_engine_poc" / ".graph.pkl"
|
|
with open(out_path, "wb") as f:
|
|
pickle.dump({"graph": graph, "entities": entities, "triples": uniq}, f)
|
|
print(f"[01_ingest] graph cached to {out_path}")
|
|
|
|
# Slice 5.6: optional Neo4j mirror. After the pickle is written
|
|
# (so the in-memory state is durable) we attempt to mirror the
|
|
# graph into the Neo4j 5 container. A failure here is loud and
|
|
# non-zero exit so CI catches a stale Neo4j state, but the
|
|
# pickle is already on disk.
|
|
if args.write_neo4j:
|
|
neo_uri = os.environ.get("LORE_NEO4J_URI", "bolt://127.0.0.1:7687")
|
|
try:
|
|
from lore_engine_poc.neo4j_graph import Neo4jGraph
|
|
except ImportError as exc:
|
|
print(
|
|
f"[01_ingest] --write-neo4j requested but neo4j_graph "
|
|
f"is not importable: {exc}",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
try:
|
|
neo = Neo4jGraph(neo_uri)
|
|
except Exception as exc:
|
|
print(
|
|
f"[01_ingest] --write-neo4j failed to connect to "
|
|
f"{neo_uri}: {exc}",
|
|
file=sys.stderr,
|
|
)
|
|
return 3
|
|
try:
|
|
neo.ensure_schema()
|
|
# Pre-pass: LoreSource nodes (so SOURCED_FROM links find
|
|
# them with full metadata when add() runs).
|
|
for path, src in graph.lore_sources.items():
|
|
neo.add_lore_source(src)
|
|
# Pre-pass: bare names (entities that were registered
|
|
# but never participate in any edge — the in-memory graph
|
|
# keeps them in ``names``; Neo4j needs them as :Entity
|
|
# nodes too).
|
|
for name in graph.all_names():
|
|
if name in graph.lore_sources:
|
|
continue
|
|
if not list(graph.edges_for_subject(name)):
|
|
neo.register_name(name)
|
|
# All edges.
|
|
n_edges = 0
|
|
for subject in graph.all_names():
|
|
for edge in graph.edges_for_subject(subject):
|
|
neo.add(edge)
|
|
n_edges += 1
|
|
print(
|
|
f"[01_ingest] neo4j mirror: {n_edges} edges written "
|
|
f"to {neo_uri}"
|
|
)
|
|
finally:
|
|
neo.close()
|
|
|
|
if args.skip_cognee:
|
|
print("[01_ingest] --skip-cognee set; skipping Cognee cognify.")
|
|
return 0
|
|
|
|
# Best-effort Cognee integration. If the install failed or the
|
|
# backend isn't available we report and continue — the in-memory
|
|
# graph is sufficient for the slice.
|
|
try:
|
|
import cognee
|
|
except ImportError as e:
|
|
print(f"[01_ingest] cognee not installed: {e}")
|
|
return 0
|
|
|
|
try:
|
|
import asyncio
|
|
|
|
async def _run() -> None:
|
|
await cognee.prune.prune_data()
|
|
await cognee.prune.prune_system(metadata=True)
|
|
md_files = []
|
|
for dirpath, _dirs, files in os.walk(args.codex):
|
|
for fn in files:
|
|
if fn.lower().endswith(".md"):
|
|
md_files.append(os.path.join(dirpath, fn))
|
|
for path in md_files:
|
|
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
|
await cognee.add(f.read(), dataset_name="codex")
|
|
await cognee.cognify()
|
|
print(f"[01_ingest] cognee.cognify() complete over {len(md_files)} files")
|
|
|
|
asyncio.run(_run())
|
|
except Exception as e:
|
|
print(f"[01_ingest] cognee step failed (non-fatal): {e}")
|
|
|
|
return 0
|
|
|
|
|
|
def _run_llm_extraction(entities) -> list:
|
|
"""Slice 3: run the LLM extractor over each entity's body
|
|
and return the resulting triples. Failure-tolerant: any
|
|
provider / parse / validation problem is logged and the
|
|
affected chunk is skipped. The deterministic graph is
|
|
untouched.
|
|
|
|
Provider selection (in order):
|
|
1. ``LORE_INGEST_FAKE_LLM=1`` → :class:`FakeProvider`,
|
|
script loaded from ``LORE_INGEST_FAKE_LLM_SCRIPT``
|
|
(a JSON file). Test-only.
|
|
2. Otherwise → :class:`OllamaCloudProvider`, which
|
|
requires ``$OLLAMA_API_KEY`` in the env. If unset,
|
|
the script logs and returns ``[]`` (no triples).
|
|
"""
|
|
from lore_engine_poc.extraction import extract_from_chunk
|
|
|
|
provider = _build_provider()
|
|
if provider is None:
|
|
return []
|
|
|
|
out: list = []
|
|
for ent in entities:
|
|
try:
|
|
triples = extract_from_chunk(ent, provider)
|
|
except Exception as exc:
|
|
print(
|
|
f"[01_ingest] LLM extract failed on {ent.slug}: {exc}",
|
|
file=sys.stderr,
|
|
)
|
|
continue
|
|
out.extend(triples)
|
|
return out
|
|
|
|
|
|
def _build_provider():
|
|
"""Build the LLM provider per the env-var contract. Returns
|
|
``None`` if no provider can be constructed (e.g. the user
|
|
asked for LLM extraction but didn't set a key)."""
|
|
from lore_engine_poc.llm import (
|
|
OllamaCloudProvider,
|
|
fake_provider_from_script_file,
|
|
)
|
|
|
|
if os.environ.get("LORE_INGEST_FAKE_LLM") == "1":
|
|
script_path = os.environ.get("LORE_INGEST_FAKE_LLM_SCRIPT")
|
|
if not script_path:
|
|
print(
|
|
"[01_ingest] LORE_INGEST_FAKE_LLM=1 but "
|
|
"LORE_INGEST_FAKE_LLM_SCRIPT is not set; "
|
|
"LLM extraction skipped.",
|
|
file=sys.stderr,
|
|
)
|
|
return None
|
|
if not os.path.isfile(script_path):
|
|
print(
|
|
f"[01_ingest] FakeProvider script not found: {script_path}",
|
|
file=sys.stderr,
|
|
)
|
|
return None
|
|
return fake_provider_from_script_file(script_path)
|
|
|
|
# Real provider path.
|
|
if not os.environ.get("OLLAMA_API_KEY"):
|
|
print(
|
|
"[01_ingest] LORE_INGEST_LLM=1 but $OLLAMA_API_KEY is not set; "
|
|
"LLM extraction skipped. Set OLLAMA_API_KEY (or "
|
|
"LORE_INGEST_FAKE_LLM=1 + LORE_INGEST_FAKE_LLM_SCRIPT=...) "
|
|
"to enable.",
|
|
file=sys.stderr,
|
|
)
|
|
return None
|
|
return OllamaCloudProvider()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|