Files
Lore Engine Dev 81cca56e51 slice 6.5: setting filter on 6 read tools + MCP schema
Per AC 6.5, 6.6 — adds a keyword-only 'setting' parameter to:
  - lookup        (filter on matched name's setting membership)
  - entity_context (entity-level filter; empty shape if excluded)
  - was_true_at   (both subject and object must be in setting)
  - true_during   (subject must be in setting)
  - entities_present (located entity must be in setting)
  - events_during (event subject must be in setting)

The filter resolves via graph.setting_entities(setting_id) — O(1)
reverse lookup from slice 6.2's Protocol methods. An unknown
setting returns empty results (defensive). Omitting 'setting'
preserves slice 4 / 9 behaviour (back-compat fence).

MCP tool schemas updated for all 6 entries to expose 'setting'
as an optional [string, null] parameter; opt_string_params
toggled so 'null' is coerced to None by the dispatcher.

The cross-setting fact test (Roland ENCOUNTERED The Wanderer)
is the canonical LLM-target: with setting='mardonari', Roland's
home setting, the answer is was_true=False because The Wanderer
is in the_wild_dream.

+8 tests (739 → 747). All green. No regressions.
2026-06-19 14:26:05 -04:00

837 lines
30 KiB
Python

"""Lore Engine POC — read-side tools (slice 4).
The 16 read tools promised by slice 4 (`docs/04-slice-tools.md`).
Each is a pure-Python function of (:class:`Graph`, ...) — no
side effects, no global state. Response shapes go through
:mod:`lore_engine_poc.responses` so the contract is centralised.
The slice-0 ``was_true_at`` tool stays in ``tools.py`` for
backward compatibility; everything new lives here.
Tool groups:
* Group 1 (Identity): :func:`lookup`, :func:`entity_context`
* Group 2 (Time-aware): :func:`true_during`, :func:`entities_present`,
:func:`timeline`. ``state_at`` deferred (4.6+).
* Group 3 (Lineage): :func:`list_lineage`, :func:`list_offspring`,
:func:`ancestors_of`, :func:`descendants_of`,
:func:`location_hierarchy`
* Group 4 (Events): :func:`event_chain`, :func:`events_during`
* Group 5 (Lore): :func:`lore_about`. ``cite`` deferred (4.7).
The slice-4.7 write tools live in :mod:`lore_engine_poc.write_tools`.
"""
from __future__ import annotations
from typing import Optional
from .responses import edge_to_fact, entity_summary
from .time_model import time_in_window
from .tools import Edge, Graph
# ---------------------------------------------------------------------------
# Group 1 — Identity & disambiguation
# ---------------------------------------------------------------------------
def lookup(
graph: Graph,
query: str,
type_: Optional[str] = None,
*,
setting: Optional[str] = None,
) -> list[dict]:
"""Find entities whose name matches ``query`` (case-insensitive,
substring OK).
Returns ``[{name, type, match_confidence}]``. The
``match_confidence`` is 1.0 for an exact (case-insensitive)
match; partial matches score lower (we use the substring
overlap ratio: ``min(len(query), len(name)) / max(len(query),
len(name))``). When ``type_`` is supplied, the result is
filtered to entities of that type.
Slice 6.5 — ``setting`` (keyword-only) restricts the
result to entities that ``EXISTS_IN`` the named
setting. ``None`` (default) means no setting filter —
slice 4 / 9 behaviour is preserved. An unknown setting
returns ``[]`` (defensive — the LLM caller can
disambiguate by also calling ``find_setting``).
"""
if not query:
return []
q = query.strip().lower()
if not q:
return []
# Slice 6.5 — resolve the setting's membership once.
# An unknown setting (``find_setting`` returns None) is
# treated as "no entities" → the result is empty.
if setting is not None and graph.find_setting(setting) is None:
return []
setting_members = (
graph.setting_entities(setting) if setting is not None else None
)
out: list[dict] = []
# If ``query`` matches exactly (case-insensitive), every name
# passes; otherwise we substring-match against ``graph.all_names()``.
for name in graph.all_names():
# Slice 6.5 — drop names that aren't in the named setting.
if setting_members is not None and name not in setting_members:
continue
low = name.lower()
if low == q:
score = 1.0
elif q in low:
score = len(q) / len(low)
else:
continue
# Resolve the entity's type from the index. We try the
# requested ``type_`` first (which might be the canonical
# PascalCase label like ``"Person"``); if that doesn't
# match, we fall through to *any* type. The filter is
# applied *after* scoring so a wrong-type match is still
# reported (with ``match_confidence``) — just ranked lower.
entity_type = _lookup_entity_type(graph, name)
if type_ is not None and entity_type != type_:
continue
out.append({"name": name, "type": entity_type, "match_confidence": round(score, 4)})
# Sort: exact match first, then highest score, then alphabetical.
out.sort(key=lambda d: (-d["match_confidence"], d["name"]))
return out
def _lookup_entity_type(graph: Graph, name: str) -> str:
"""The canonical type for ``name`` from ``entities_by_type``.
The type index can hold the same name under multiple keys
(e.g. ``"npc"`` and ``"Person"``). We pick the PascalCase
variant when present — that's what LLM-facing tools want.
"""
# Prefer a PascalCase type over the lowercase markdown-style.
candidates = [
t for t in graph.all_entity_types()
if name in graph.entities_of_type(t)
]
if not candidates:
return ""
pascal = [t for t in candidates if t[:1].isupper()]
return pascal[0] if pascal else candidates[0]
def _empty_entity_context(at_time: Optional[str]) -> dict:
"""The empty-context response shape.
Used by :func:`entity_context` when the entity is unknown
OR when a ``setting`` filter excludes the entity. Keeping
the shape centralised prevents the two branches from
drifting apart.
"""
return {
"entity": None,
"at_time": at_time,
"factions": [],
"locations": [],
"items_possessed": [],
"alive": False,
"lifespan": {"from": None, "until": None},
}
def entity_context(
graph: Graph,
name: str,
at_time: Optional[str] = None,
*,
setting: Optional[str] = None,
) -> dict:
"""One-hop summary for an entity, optionally time-bucketed.
The result shape::
{
"entity": {"name": str, "type": str} | None,
"at_time": str | None,
"factions": [{"name", "valid_from", "valid_until", "sources"}],
"locations": [{"name", "valid_from", "valid_until", "sources"}],
"items_possessed": [...],
"alive": bool,
"lifespan": {"from": str | None, "until": str | None},
}
When ``at_time`` is provided, only edges whose
``[valid_from, valid_until]`` window contains the time are
counted (per :func:`time_in_window`). When omitted, all
edges are listed (this is the "current state" view, but
without a fixed current time it spans the whole timeline).
Unknown entity → ``entity: None`` and empty lists. Never raises.
Slice 6.5 — ``setting`` (keyword-only) restricts the
response to entities that ``EXISTS_IN`` the named
setting. An entity outside the setting returns the same
shape as "unknown entity" (``entity: None``). ``None``
(default) means no filter — slice 4 / 9 behaviour.
"""
canonical = graph.by_name(name)
# Slice 6.5 — entity-level setting filter.
if canonical is None:
return _empty_entity_context(at_time)
if setting is not None:
if graph.find_setting(setting) is None:
return _empty_entity_context(at_time)
if canonical not in graph.setting_entities(setting):
return _empty_entity_context(at_time)
# Outgoing edges from the entity (subject side). Slice 5.4
# routes through the GraphBackend Protocol method: pull all
# edges for the subject, then group by relation in-process.
from collections import defaultdict
rel_map: dict[str, list] = defaultdict(list)
for e in graph.edges_for_subject(canonical):
rel_map[e.relation].append(e)
factions = _collect_relations(graph, rel_map, "MEMBER_OF", at_time, canonical)
locations = _collect_relations(graph, rel_map, "LOCATED_IN", at_time, canonical)
items = _collect_relations(graph, rel_map, "POSSESSES", at_time, canonical)
# Lifespan from MEMBER_OF(Lineage) edges (slice-2 helper).
from .consistency_runner import _infer_person_lifespan
birth, death = _infer_person_lifespan(
[e for sub_edges in rel_map.values() for e in sub_edges]
)
alive = death is None # no inferred death ⇒ still alive
return {
"entity": entity_summary(canonical, _lookup_entity_type(graph, canonical)),
"at_time": at_time,
"factions": factions,
"locations": locations,
"items_possessed": items,
"alive": alive,
"lifespan": {"from": birth, "until": death},
}
def _collect_relations(graph, rel_map, relation, at_time, subject):
"""Helper for ``entity_context``: pull edges of one relation type,
time-filtered when ``at_time`` is given.
"""
out = []
for e in rel_map.get(relation, []):
if at_time is not None and not time_in_window(
at_time, e.valid_from, e.valid_until
):
continue
out.append({
"name": e.object,
"valid_from": e.valid_from,
"valid_until": e.valid_until,
"sources": list(e.sources),
})
return out
# ---------------------------------------------------------------------------
# Group 2 — Time-aware queries (partial; state_at deferred to 4.6)
# ---------------------------------------------------------------------------
def true_during(
graph: Graph,
relation: str,
subject: str,
era: str,
object_: Optional[str] = None,
*,
setting: Optional[str] = None,
) -> list[dict]:
"""Edges of ``relation`` from ``subject`` that were active
*somewhere* inside ``era``.
The era string is a canonical time atom (e.g. ``"3rd_age"``
or ``"3rd_age.year_345"``). An edge qualifies when its
``[valid_from, valid_until]`` window intersects the era's
own window (via :func:`time_in_window`). The half-open
semantics inherit from the time model.
When ``object_`` is provided, only edges to that object are
returned; otherwise all objects for the subject+relation.
Slice 6.5 — ``setting`` (keyword-only) restricts the
result to edges whose ``subject`` ``EXISTS_IN`` the
named setting. An unknown setting returns ``[]``. ``None``
(default) means no filter — slice 4 / 9 behaviour.
"""
canonical = graph.by_name(subject)
if canonical is None:
return []
# Slice 6.5 — subject-level setting filter. Unlike
# ``was_true_at`` (which checks both endpoints), this
# filter is on the subject only — the object is the
# answer's payload.
if setting is not None:
if graph.find_setting(setting) is None:
return []
if canonical not in graph.setting_entities(setting):
return []
out = []
for e in graph.edges_for_subject(canonical, relation):
if object_ is not None:
obj = graph.by_name(object_)
if obj is None or e.object != obj:
continue
# The era qualifies the edge if its window contains *some*
# point inside the era. We use ``era`` as the ``at_time``
# against the edge's window — when the era is at the
# top of the tree (e.g. ``"3rd_age"``), ``time_in_window``
# accepts any edge that touches that subtree.
if not time_in_window(era, e.valid_from, e.valid_until):
continue
out.append(edge_to_fact(e))
return out
def entities_present(
graph: Graph,
location: str,
at_time: str,
type_: Optional[str] = None,
*,
setting: Optional[str] = None,
) -> list[dict]:
"""Entities in ``location`` at ``at_time``, time-filtered.
Returns entities via ``LOCATED_IN`` edges (Person/Creature/Item)
or ``CONTROLS`` (Faction) whose window contains ``at_time``.
When ``type_`` is supplied, the result is filtered to entities
of that type.
Slice 6.5 — ``setting`` (keyword-only) restricts the
result to entities that ``EXISTS_IN`` the named setting.
The filter is on the *located entity*, not the location
itself — Roland is in mardonari, so a search for
entities in Mardsville under
``setting="the_wild_dream"`` returns ``[]``. ``None``
(default) means no filter — slice 4 / 9 behaviour.
"""
loc = graph.by_name(location)
if loc is None:
return []
# Slice 6.5 — resolve setting membership once. An
# unknown setting is treated as "no entities".
if setting is not None and graph.find_setting(setting) is None:
return []
setting_members = (
graph.setting_entities(setting) if setting is not None else None
)
out: list[dict] = []
# Slice 5.4: route through the GraphBackend Protocol. Edges
# pointing at ``loc`` are the incoming edges; filter to the
# two relations of interest.
for e in graph.edges_for_object(loc):
if e.relation not in ("LOCATED_IN", "CONTROLS"):
continue
# Slice 6.5 — filter on the located entity's setting
# membership. ``setting_members`` is None when no
# filter is requested, so the membership check is
# a no-op in that case.
if setting_members is not None and e.subject not in setting_members:
continue
if not time_in_window(at_time, e.valid_from, e.valid_until):
continue
if type_ is not None and _lookup_entity_type(graph, e.subject) != type_:
continue
out.append({
"name": e.subject,
"type": _lookup_entity_type(graph, e.subject),
"via": e.relation,
"valid_from": e.valid_from,
"valid_until": e.valid_until,
})
return out
def timeline(
graph: Graph,
entity: str,
relation_type: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
) -> list[dict]:
"""All edges touching ``entity`` (as subject or object), optionally
time- and relation-type-filtered, sorted chronologically by
``valid_from``.
Each entry is an ``edge_to_fact`` dict plus the direction the
entity participates in (``"outgoing"`` or ``"incoming"``).
"""
canonical = graph.by_name(entity)
if canonical is None:
return []
facts: list[dict] = []
# Outgoing edges (entity as subject). Slice 5.4 routes through
# the GraphBackend Protocol method.
for e in graph.edges_for_subject(canonical):
if relation_type is not None and e.relation != relation_type:
continue
if not _within_range(e.valid_from, e.valid_until, start_time, end_time):
continue
f = edge_to_fact(e)
f["direction"] = "outgoing"
facts.append(f)
# Incoming edges (entity as object).
for e in graph.edges_for_object(canonical):
if relation_type is not None and e.relation != relation_type:
continue
# Skip the outgoing-side duplicate: an edge indexed at
# *both* endpoints in ``edges_by_object`` would double-
# count. Filter by direction.
if e.subject == canonical:
continue
if not _within_range(e.valid_from, e.valid_until, start_time, end_time):
continue
f = edge_to_fact(e)
f["direction"] = "incoming"
facts.append(f)
# Sort by valid_from (None sorts first to keep undated facts visible).
facts.sort(key=lambda f: (f["valid_from"] or "", f["valid_until"] or ""))
return facts
def _within_range(valid_from, valid_until, start_time, end_time) -> bool:
"""The edge's window overlaps ``[start_time, end_time]``.
Used by ``timeline`` and ``events_during``. Empty bounds on
either side are open. We don't use the canonical
``time_in_window`` here because that helper answers "is X
inside [lo, hi]?" — we need the symmetric interval-overlap
test instead.
"""
if start_time is None and end_time is None:
return True
# An edge entirely before ``start_time`` is excluded.
if valid_until is not None and start_time is not None:
from .time_model import _cmp_atoms
if _cmp_atoms(valid_until, start_time) < 0:
return False
# An edge entirely after ``end_time`` is excluded.
if valid_from is not None and end_time is not None:
from .time_model import _cmp_atoms
if _cmp_atoms(valid_from, end_time) > 0:
return False
return True
# ---------------------------------------------------------------------------
# Group 3 — Lineage & hierarchy
# ---------------------------------------------------------------------------
def list_lineage(graph: Graph, person: str, depth: int = 2) -> dict:
"""The lineage ``person`` belongs to, plus members within ``depth``.
Returns ``{lineage: {name, founding_ancestor} | None, members:
[{name, relation}], cadet_branches: [...], depth_covered: int}``.
"""
canonical = graph.by_name(person)
if canonical is None:
return {"lineage": None, "members": [], "cadet_branches": [], "depth_covered": 0}
# The person is in a Lineage via MEMBER_OF(Lineage).
lineage_name = None
for e in graph.edges_for_subject(canonical, "MEMBER_OF"):
if _lookup_entity_type(graph, e.object) == "Lineage":
lineage_name = e.object
break
if lineage_name is None:
return {"lineage": None, "members": [], "cadet_branches": [], "depth_covered": 0}
# Find the founding ancestor.
founding = None
for e in graph.edges_for_subject(lineage_name, "FOUNDED_BY"):
founding = e.object
break
# Members via reverse MEMBER_OF edges.
members: list[dict] = []
seen: set[str] = set()
for e in graph.edges_for_object(lineage_name):
if e.relation != "MEMBER_OF":
continue
if e.subject == canonical or e.subject in seen:
continue
seen.add(e.subject)
members.append({"name": e.subject, "relation": "MEMBER_OF"})
return {
"lineage": {"name": lineage_name, "founding_ancestor": founding},
"members": members,
"cadet_branches": [], # slice 4 keeps this empty; richer output is slice 4.x
"depth_covered": depth,
}
def list_offspring(graph: Graph, person: str) -> list[str]:
"""Direct children of ``person`` via ``PARENT_OF``.
Returns the children's canonical names. The in-memory graph
doesn't carry ``generations`` (each ``PARENT_OF`` is one hop);
for multi-hop use :func:`descendants_of`.
"""
canonical = graph.by_name(person)
if canonical is None:
return []
out = []
seen: set[str] = set()
for e in graph.edges_for_subject(canonical, "PARENT_OF"):
if e.object in seen:
continue
seen.add(e.object)
out.append(e.object)
return out
def ancestors_of(graph: Graph, person: str, generations: int = 3) -> list[str]:
"""Ancestors of ``person`` via reverse ``PARENT_OF`` walks.
Bounded by ``generations`` (default 3). Returns ancestors in
BFS order (closest first). Cycle-safe via ``seen``.
"""
canonical = graph.by_name(person)
if canonical is None:
return []
seen: set[str] = {canonical}
frontier: list[str] = [canonical]
out: list[str] = []
for _ in range(generations):
next_frontier: list[str] = []
for n in frontier:
for e in graph.edges_for_object(n):
if e.relation != "PARENT_OF":
continue
if e.subject in seen:
continue
seen.add(e.subject)
out.append(e.subject)
next_frontier.append(e.subject)
frontier = next_frontier
if not frontier:
break
return out
def descendants_of(graph: Graph, person: str, generations: int = 3) -> list[str]:
"""Descendants of ``person`` via forward ``PARENT_OF`` walks.
Bounded by ``generations`` (default 3). Returns descendants
in BFS order.
"""
canonical = graph.by_name(person)
if canonical is None:
return []
seen: set[str] = {canonical}
frontier: list[str] = [canonical]
out: list[str] = []
for _ in range(generations):
next_frontier: list[str] = []
for n in frontier:
for e in graph.edges_for_subject(n, "PARENT_OF"):
if e.object in seen:
continue
seen.add(e.object)
out.append(e.object)
next_frontier.append(e.object)
frontier = next_frontier
if not frontier:
break
return out
def location_hierarchy(
graph: Graph,
location: str,
direction: str = "up",
) -> list[dict]:
"""The ``PART_OF`` chain above or below ``location``.
``direction="up"`` walks ``PART_OF`` edges (location →
parent). ``direction="down"`` walks reverse ``PART_OF``
(location → children). For ``up``, the result is a chain
(one parent per level); for ``down``, every direct child
is listed at its level before descending further.
Returns ``[{name, valid_from, valid_until}]``.
"""
canonical = graph.by_name(location)
if canonical is None:
return []
if direction not in ("up", "down"):
raise ValueError(f"direction must be 'up' or 'down', got {direction!r}")
out: list[dict] = []
seen: set[str] = {canonical}
if direction == "up":
# Chain walk: location → parent → grandparent ...
current = canonical
while True:
edges = graph.edges_for_subject(current, "PART_OF")
if not edges:
break
e = edges[0]
if e.object in seen:
break
seen.add(e.object)
out.append({
"name": e.object,
"valid_from": e.valid_from,
"valid_until": e.valid_until,
})
current = e.object
else:
# Tree walk: list every direct child of ``canonical``
# before descending. This is the "what's in region X?" query.
current = canonical
direct_children = [
e.subject
for e in graph.edges_for_object(current)
if e.relation == "PART_OF" and e.subject != current
]
for child in direct_children:
if child in seen:
continue
seen.add(child)
# Find the edge for valid_from/until.
child_edge = next(
(e for e in graph.edges_for_object(current)
if e.relation == "PART_OF" and e.subject == child),
None,
)
out.append({
"name": child,
"valid_from": child_edge.valid_from if child_edge else None,
"valid_until": child_edge.valid_until if child_edge else None,
})
return out
# ---------------------------------------------------------------------------
# Group 4 — Causal & event chains
# ---------------------------------------------------------------------------
# Edges that imply "this thing led to that thing" for event_chain.
CAUSAL_RELATIONS = frozenset({"CAUSED", "PRECEDED", "CONCURRENT_WITH"})
def event_chain(graph: Graph, event: str, depth: int = 2) -> dict:
"""Bounded multi-hop walk of ``CAUSED`` / ``PRECEDED`` /
``CONCURRENT_WITH`` edges from ``event``.
Returns ``{event: str, causes: [...], effects: [...],
concurrent: [...], depth_covered: int}``. Each cause /
effect / concurrent entry is a chain node::
{"event": str, "via": "CAUSED", "depth": int, "sources": [...]}
"""
canonical = graph.by_name(event)
if canonical is None:
return {"event": None, "causes": [], "effects": [], "concurrent": [], "depth_covered": 0}
def walk(direction: str) -> list[dict]:
out: list[dict] = []
seen: set[str] = {canonical}
frontier = [(canonical, 0)]
for _ in range(depth):
next_frontier: list[tuple[str, int]] = []
for current, d in frontier:
if direction == "effects":
edges_iter = (
(e, e.object)
for e in graph.edges_for_subject(current)
)
else:
edges_iter = (
(e, e.subject)
for e in graph.edges_for_object(current)
if e.subject != current
)
for e, other in edges_iter:
if e.relation not in CAUSAL_RELATIONS:
continue
if other in seen:
continue
seen.add(other)
out.append({
"event": other,
"via": e.relation,
"depth": d + 1,
"sources": list(e.sources),
})
next_frontier.append((other, d + 1))
frontier = next_frontier
if not frontier:
break
return out
causes = walk("causes")
effects = walk("effects")
# CONCURRENT_WITH is symmetric: any node it points at is a peer.
concurrent: list[dict] = []
for e in graph.edges_for_subject(canonical, "CONCURRENT_WITH"):
concurrent.append({
"event": e.object,
"via": "CONCURRENT_WITH",
"depth": 0,
"sources": list(e.sources),
})
return {
"event": canonical,
"causes": causes,
"effects": effects,
"concurrent": concurrent,
"depth_covered": depth,
}
def events_during(
graph: Graph,
era: str,
location: Optional[str] = None,
type_: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
*,
setting: Optional[str] = None,
) -> list[dict]:
"""Events whose ``OCCURRED_DURING`` window intersects ``era``.
Filters: ``location`` (events must have ``OCCURRED_AT`` to
that location), ``type_`` (Event / Battle / Ceremony /
etc.), and an optional ``[start_time, end_time]`` range that
narrows the era window. Returns events sorted by
``valid_from``.
Slice 6.5 — ``setting`` (keyword-only) restricts the
result to events whose subject ``EXISTS_IN`` the named
setting. The filter is on the *event subject*, not on
the era — eras are not setting-scoped (the design
allows the same era id to exist in multiple settings).
An unknown setting returns ``[]``. ``None`` (default)
means no filter — slice 4 / 9 behaviour.
"""
# Locate the era node.
era_name = graph.by_name(era) or era
# Slice 6.5 — resolve setting membership once. An
# unknown setting is treated as "no events".
if setting is not None and graph.find_setting(setting) is None:
return []
setting_members = (
graph.setting_entities(setting) if setting is not None else None
)
# Events are nodes that have an outgoing ``OCCURRED_DURING``
# edge to the era (or any descendant of it).
out: list[dict] = []
# Slice 5.4: route through the GraphBackend Protocol. For
# each name in the graph, look at its OCCURRED_DURING /
# OCCURRED_AT edges.
for subject in graph.all_names():
# Slice 6.5 — drop events whose subject isn't in the
# named setting.
if setting_members is not None and subject not in setting_members:
continue
sub_edges = graph.edges_for_subject(subject)
occurred_during = [e for e in sub_edges if e.relation == "OCCURRED_DURING"]
if not occurred_during:
continue
for e in occurred_during:
if e.object != era_name:
# Descendant check via the time tree would be
# cleaner; for the POC we accept exact-match only.
continue
# Pull the event's other metadata: OCCURRED_AT for
# the location filter, type for the type filter.
event_loc = None
for e2 in sub_edges:
if e2.relation == "OCCURRED_AT":
event_loc = e2.object
break
event_type = _lookup_entity_type(graph, subject)
if location is not None:
loc_canonical = graph.by_name(location)
if loc_canonical is None or event_loc != loc_canonical:
continue
if type_ is not None and event_type != type_:
continue
if not _within_range(e.valid_from, e.valid_until, start_time, end_time):
continue
out.append({
"name": subject,
"type": event_type,
"location": event_loc,
"valid_from": e.valid_from,
"valid_until": e.valid_until,
})
out.sort(key=lambda d: (d["valid_from"] or "", d["valid_until"] or ""))
return out
# ---------------------------------------------------------------------------
# Group 5 — Knowledge & lore
# ---------------------------------------------------------------------------
def lore_about(
graph: Graph,
entity: str,
type_: Optional[str] = None,
limit: int = 10,
) -> list[dict]:
"""``LoreSource`` documents that mention ``entity``.
The in-memory graph stores a ``SOURCED_FROM`` edge per typed
edge in the structured-YAML path, and every markdown entity
carries a ``LoreSource`` on ``entity.sources``. We pull the
unique source paths from those edges, join against
``graph.lore_sources``, and return one entry per source.
Sort order: ``source_confidence`` descending (we trust the
canonical sources most). Capped at ``limit`` (default 10).
"""
canonical = graph.by_name(entity)
if canonical is None:
return []
paths: set[str] = set()
# Outgoing edges. Slice 5.4: use the GraphBackend Protocol.
for e in graph.edges_for_subject(canonical):
for src in e.sources:
paths.add(src)
# Incoming edges.
for e in graph.edges_for_object(canonical):
for src in e.sources:
paths.add(src)
out: list[dict] = []
for path in paths:
ls = graph.lore_source(path)
if ls is None:
continue
if type_ is not None and ls.source_type != type_:
continue
out.append({
"path": ls.path,
"title": ls.name,
"source_type": ls.source_type,
"reliability": ls.reliability,
"source_confidence": ls.source_confidence,
})
out.sort(key=lambda d: (-d["source_confidence"], d["path"]))
return out[:limit]
__all__ = [
"lookup",
"entity_context",
"true_during",
"entities_present",
"timeline",
"list_lineage",
"list_offspring",
"ancestors_of",
"descendants_of",
"location_hierarchy",
"event_chain",
"events_during",
"lore_about",
]