Per AC 6.5, 6.6 — adds a keyword-only 'setting' parameter to: - lookup (filter on matched name's setting membership) - entity_context (entity-level filter; empty shape if excluded) - was_true_at (both subject and object must be in setting) - true_during (subject must be in setting) - entities_present (located entity must be in setting) - events_during (event subject must be in setting) The filter resolves via graph.setting_entities(setting_id) — O(1) reverse lookup from slice 6.2's Protocol methods. An unknown setting returns empty results (defensive). Omitting 'setting' preserves slice 4 / 9 behaviour (back-compat fence). MCP tool schemas updated for all 6 entries to expose 'setting' as an optional [string, null] parameter; opt_string_params toggled so 'null' is coerced to None by the dispatcher. The cross-setting fact test (Roland ENCOUNTERED The Wanderer) is the canonical LLM-target: with setting='mardonari', Roland's home setting, the answer is was_true=False because The Wanderer is in the_wild_dream. +8 tests (739 → 747). All green. No regressions.
837 lines
30 KiB
Python
837 lines
30 KiB
Python
"""Lore Engine POC — read-side tools (slice 4).
|
|
|
|
The 16 read tools promised by slice 4 (`docs/04-slice-tools.md`).
|
|
Each is a pure-Python function of (:class:`Graph`, ...) — no
|
|
side effects, no global state. Response shapes go through
|
|
:mod:`lore_engine_poc.responses` so the contract is centralised.
|
|
|
|
The slice-0 ``was_true_at`` tool stays in ``tools.py`` for
|
|
backward compatibility; everything new lives here.
|
|
|
|
Tool groups:
|
|
|
|
* Group 1 (Identity): :func:`lookup`, :func:`entity_context`
|
|
* Group 2 (Time-aware): :func:`true_during`, :func:`entities_present`,
|
|
:func:`timeline`. ``state_at`` deferred (4.6+).
|
|
* Group 3 (Lineage): :func:`list_lineage`, :func:`list_offspring`,
|
|
:func:`ancestors_of`, :func:`descendants_of`,
|
|
:func:`location_hierarchy`
|
|
* Group 4 (Events): :func:`event_chain`, :func:`events_during`
|
|
* Group 5 (Lore): :func:`lore_about`. ``cite`` deferred (4.7).
|
|
|
|
The slice-4.7 write tools live in :mod:`lore_engine_poc.write_tools`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
|
|
from .responses import edge_to_fact, entity_summary
|
|
from .time_model import time_in_window
|
|
from .tools import Edge, Graph
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Group 1 — Identity & disambiguation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def lookup(
|
|
graph: Graph,
|
|
query: str,
|
|
type_: Optional[str] = None,
|
|
*,
|
|
setting: Optional[str] = None,
|
|
) -> list[dict]:
|
|
"""Find entities whose name matches ``query`` (case-insensitive,
|
|
substring OK).
|
|
|
|
Returns ``[{name, type, match_confidence}]``. The
|
|
``match_confidence`` is 1.0 for an exact (case-insensitive)
|
|
match; partial matches score lower (we use the substring
|
|
overlap ratio: ``min(len(query), len(name)) / max(len(query),
|
|
len(name))``). When ``type_`` is supplied, the result is
|
|
filtered to entities of that type.
|
|
|
|
Slice 6.5 — ``setting`` (keyword-only) restricts the
|
|
result to entities that ``EXISTS_IN`` the named
|
|
setting. ``None`` (default) means no setting filter —
|
|
slice 4 / 9 behaviour is preserved. An unknown setting
|
|
returns ``[]`` (defensive — the LLM caller can
|
|
disambiguate by also calling ``find_setting``).
|
|
"""
|
|
if not query:
|
|
return []
|
|
q = query.strip().lower()
|
|
if not q:
|
|
return []
|
|
# Slice 6.5 — resolve the setting's membership once.
|
|
# An unknown setting (``find_setting`` returns None) is
|
|
# treated as "no entities" → the result is empty.
|
|
if setting is not None and graph.find_setting(setting) is None:
|
|
return []
|
|
setting_members = (
|
|
graph.setting_entities(setting) if setting is not None else None
|
|
)
|
|
out: list[dict] = []
|
|
# If ``query`` matches exactly (case-insensitive), every name
|
|
# passes; otherwise we substring-match against ``graph.all_names()``.
|
|
for name in graph.all_names():
|
|
# Slice 6.5 — drop names that aren't in the named setting.
|
|
if setting_members is not None and name not in setting_members:
|
|
continue
|
|
low = name.lower()
|
|
if low == q:
|
|
score = 1.0
|
|
elif q in low:
|
|
score = len(q) / len(low)
|
|
else:
|
|
continue
|
|
# Resolve the entity's type from the index. We try the
|
|
# requested ``type_`` first (which might be the canonical
|
|
# PascalCase label like ``"Person"``); if that doesn't
|
|
# match, we fall through to *any* type. The filter is
|
|
# applied *after* scoring so a wrong-type match is still
|
|
# reported (with ``match_confidence``) — just ranked lower.
|
|
entity_type = _lookup_entity_type(graph, name)
|
|
if type_ is not None and entity_type != type_:
|
|
continue
|
|
out.append({"name": name, "type": entity_type, "match_confidence": round(score, 4)})
|
|
# Sort: exact match first, then highest score, then alphabetical.
|
|
out.sort(key=lambda d: (-d["match_confidence"], d["name"]))
|
|
return out
|
|
|
|
|
|
def _lookup_entity_type(graph: Graph, name: str) -> str:
|
|
"""The canonical type for ``name`` from ``entities_by_type``.
|
|
|
|
The type index can hold the same name under multiple keys
|
|
(e.g. ``"npc"`` and ``"Person"``). We pick the PascalCase
|
|
variant when present — that's what LLM-facing tools want.
|
|
"""
|
|
# Prefer a PascalCase type over the lowercase markdown-style.
|
|
candidates = [
|
|
t for t in graph.all_entity_types()
|
|
if name in graph.entities_of_type(t)
|
|
]
|
|
if not candidates:
|
|
return ""
|
|
pascal = [t for t in candidates if t[:1].isupper()]
|
|
return pascal[0] if pascal else candidates[0]
|
|
|
|
|
|
def _empty_entity_context(at_time: Optional[str]) -> dict:
|
|
"""The empty-context response shape.
|
|
|
|
Used by :func:`entity_context` when the entity is unknown
|
|
OR when a ``setting`` filter excludes the entity. Keeping
|
|
the shape centralised prevents the two branches from
|
|
drifting apart.
|
|
"""
|
|
return {
|
|
"entity": None,
|
|
"at_time": at_time,
|
|
"factions": [],
|
|
"locations": [],
|
|
"items_possessed": [],
|
|
"alive": False,
|
|
"lifespan": {"from": None, "until": None},
|
|
}
|
|
|
|
|
|
def entity_context(
|
|
graph: Graph,
|
|
name: str,
|
|
at_time: Optional[str] = None,
|
|
*,
|
|
setting: Optional[str] = None,
|
|
) -> dict:
|
|
"""One-hop summary for an entity, optionally time-bucketed.
|
|
|
|
The result shape::
|
|
|
|
{
|
|
"entity": {"name": str, "type": str} | None,
|
|
"at_time": str | None,
|
|
"factions": [{"name", "valid_from", "valid_until", "sources"}],
|
|
"locations": [{"name", "valid_from", "valid_until", "sources"}],
|
|
"items_possessed": [...],
|
|
"alive": bool,
|
|
"lifespan": {"from": str | None, "until": str | None},
|
|
}
|
|
|
|
When ``at_time`` is provided, only edges whose
|
|
``[valid_from, valid_until]`` window contains the time are
|
|
counted (per :func:`time_in_window`). When omitted, all
|
|
edges are listed (this is the "current state" view, but
|
|
without a fixed current time it spans the whole timeline).
|
|
|
|
Unknown entity → ``entity: None`` and empty lists. Never raises.
|
|
|
|
Slice 6.5 — ``setting`` (keyword-only) restricts the
|
|
response to entities that ``EXISTS_IN`` the named
|
|
setting. An entity outside the setting returns the same
|
|
shape as "unknown entity" (``entity: None``). ``None``
|
|
(default) means no filter — slice 4 / 9 behaviour.
|
|
"""
|
|
canonical = graph.by_name(name)
|
|
# Slice 6.5 — entity-level setting filter.
|
|
if canonical is None:
|
|
return _empty_entity_context(at_time)
|
|
if setting is not None:
|
|
if graph.find_setting(setting) is None:
|
|
return _empty_entity_context(at_time)
|
|
if canonical not in graph.setting_entities(setting):
|
|
return _empty_entity_context(at_time)
|
|
# Outgoing edges from the entity (subject side). Slice 5.4
|
|
# routes through the GraphBackend Protocol method: pull all
|
|
# edges for the subject, then group by relation in-process.
|
|
from collections import defaultdict
|
|
rel_map: dict[str, list] = defaultdict(list)
|
|
for e in graph.edges_for_subject(canonical):
|
|
rel_map[e.relation].append(e)
|
|
factions = _collect_relations(graph, rel_map, "MEMBER_OF", at_time, canonical)
|
|
locations = _collect_relations(graph, rel_map, "LOCATED_IN", at_time, canonical)
|
|
items = _collect_relations(graph, rel_map, "POSSESSES", at_time, canonical)
|
|
# Lifespan from MEMBER_OF(Lineage) edges (slice-2 helper).
|
|
from .consistency_runner import _infer_person_lifespan
|
|
birth, death = _infer_person_lifespan(
|
|
[e for sub_edges in rel_map.values() for e in sub_edges]
|
|
)
|
|
alive = death is None # no inferred death ⇒ still alive
|
|
return {
|
|
"entity": entity_summary(canonical, _lookup_entity_type(graph, canonical)),
|
|
"at_time": at_time,
|
|
"factions": factions,
|
|
"locations": locations,
|
|
"items_possessed": items,
|
|
"alive": alive,
|
|
"lifespan": {"from": birth, "until": death},
|
|
}
|
|
|
|
|
|
def _collect_relations(graph, rel_map, relation, at_time, subject):
|
|
"""Helper for ``entity_context``: pull edges of one relation type,
|
|
time-filtered when ``at_time`` is given.
|
|
"""
|
|
out = []
|
|
for e in rel_map.get(relation, []):
|
|
if at_time is not None and not time_in_window(
|
|
at_time, e.valid_from, e.valid_until
|
|
):
|
|
continue
|
|
out.append({
|
|
"name": e.object,
|
|
"valid_from": e.valid_from,
|
|
"valid_until": e.valid_until,
|
|
"sources": list(e.sources),
|
|
})
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Group 2 — Time-aware queries (partial; state_at deferred to 4.6)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def true_during(
|
|
graph: Graph,
|
|
relation: str,
|
|
subject: str,
|
|
era: str,
|
|
object_: Optional[str] = None,
|
|
*,
|
|
setting: Optional[str] = None,
|
|
) -> list[dict]:
|
|
"""Edges of ``relation`` from ``subject`` that were active
|
|
*somewhere* inside ``era``.
|
|
|
|
The era string is a canonical time atom (e.g. ``"3rd_age"``
|
|
or ``"3rd_age.year_345"``). An edge qualifies when its
|
|
``[valid_from, valid_until]`` window intersects the era's
|
|
own window (via :func:`time_in_window`). The half-open
|
|
semantics inherit from the time model.
|
|
|
|
When ``object_`` is provided, only edges to that object are
|
|
returned; otherwise all objects for the subject+relation.
|
|
|
|
Slice 6.5 — ``setting`` (keyword-only) restricts the
|
|
result to edges whose ``subject`` ``EXISTS_IN`` the
|
|
named setting. An unknown setting returns ``[]``. ``None``
|
|
(default) means no filter — slice 4 / 9 behaviour.
|
|
"""
|
|
canonical = graph.by_name(subject)
|
|
if canonical is None:
|
|
return []
|
|
# Slice 6.5 — subject-level setting filter. Unlike
|
|
# ``was_true_at`` (which checks both endpoints), this
|
|
# filter is on the subject only — the object is the
|
|
# answer's payload.
|
|
if setting is not None:
|
|
if graph.find_setting(setting) is None:
|
|
return []
|
|
if canonical not in graph.setting_entities(setting):
|
|
return []
|
|
out = []
|
|
for e in graph.edges_for_subject(canonical, relation):
|
|
if object_ is not None:
|
|
obj = graph.by_name(object_)
|
|
if obj is None or e.object != obj:
|
|
continue
|
|
# The era qualifies the edge if its window contains *some*
|
|
# point inside the era. We use ``era`` as the ``at_time``
|
|
# against the edge's window — when the era is at the
|
|
# top of the tree (e.g. ``"3rd_age"``), ``time_in_window``
|
|
# accepts any edge that touches that subtree.
|
|
if not time_in_window(era, e.valid_from, e.valid_until):
|
|
continue
|
|
out.append(edge_to_fact(e))
|
|
return out
|
|
|
|
|
|
def entities_present(
|
|
graph: Graph,
|
|
location: str,
|
|
at_time: str,
|
|
type_: Optional[str] = None,
|
|
*,
|
|
setting: Optional[str] = None,
|
|
) -> list[dict]:
|
|
"""Entities in ``location`` at ``at_time``, time-filtered.
|
|
|
|
Returns entities via ``LOCATED_IN`` edges (Person/Creature/Item)
|
|
or ``CONTROLS`` (Faction) whose window contains ``at_time``.
|
|
When ``type_`` is supplied, the result is filtered to entities
|
|
of that type.
|
|
|
|
Slice 6.5 — ``setting`` (keyword-only) restricts the
|
|
result to entities that ``EXISTS_IN`` the named setting.
|
|
The filter is on the *located entity*, not the location
|
|
itself — Roland is in mardonari, so a search for
|
|
entities in Mardsville under
|
|
``setting="the_wild_dream"`` returns ``[]``. ``None``
|
|
(default) means no filter — slice 4 / 9 behaviour.
|
|
"""
|
|
loc = graph.by_name(location)
|
|
if loc is None:
|
|
return []
|
|
# Slice 6.5 — resolve setting membership once. An
|
|
# unknown setting is treated as "no entities".
|
|
if setting is not None and graph.find_setting(setting) is None:
|
|
return []
|
|
setting_members = (
|
|
graph.setting_entities(setting) if setting is not None else None
|
|
)
|
|
out: list[dict] = []
|
|
# Slice 5.4: route through the GraphBackend Protocol. Edges
|
|
# pointing at ``loc`` are the incoming edges; filter to the
|
|
# two relations of interest.
|
|
for e in graph.edges_for_object(loc):
|
|
if e.relation not in ("LOCATED_IN", "CONTROLS"):
|
|
continue
|
|
# Slice 6.5 — filter on the located entity's setting
|
|
# membership. ``setting_members`` is None when no
|
|
# filter is requested, so the membership check is
|
|
# a no-op in that case.
|
|
if setting_members is not None and e.subject not in setting_members:
|
|
continue
|
|
if not time_in_window(at_time, e.valid_from, e.valid_until):
|
|
continue
|
|
if type_ is not None and _lookup_entity_type(graph, e.subject) != type_:
|
|
continue
|
|
out.append({
|
|
"name": e.subject,
|
|
"type": _lookup_entity_type(graph, e.subject),
|
|
"via": e.relation,
|
|
"valid_from": e.valid_from,
|
|
"valid_until": e.valid_until,
|
|
})
|
|
return out
|
|
|
|
|
|
def timeline(
|
|
graph: Graph,
|
|
entity: str,
|
|
relation_type: Optional[str] = None,
|
|
start_time: Optional[str] = None,
|
|
end_time: Optional[str] = None,
|
|
) -> list[dict]:
|
|
"""All edges touching ``entity`` (as subject or object), optionally
|
|
time- and relation-type-filtered, sorted chronologically by
|
|
``valid_from``.
|
|
|
|
Each entry is an ``edge_to_fact`` dict plus the direction the
|
|
entity participates in (``"outgoing"`` or ``"incoming"``).
|
|
"""
|
|
canonical = graph.by_name(entity)
|
|
if canonical is None:
|
|
return []
|
|
facts: list[dict] = []
|
|
# Outgoing edges (entity as subject). Slice 5.4 routes through
|
|
# the GraphBackend Protocol method.
|
|
for e in graph.edges_for_subject(canonical):
|
|
if relation_type is not None and e.relation != relation_type:
|
|
continue
|
|
if not _within_range(e.valid_from, e.valid_until, start_time, end_time):
|
|
continue
|
|
f = edge_to_fact(e)
|
|
f["direction"] = "outgoing"
|
|
facts.append(f)
|
|
# Incoming edges (entity as object).
|
|
for e in graph.edges_for_object(canonical):
|
|
if relation_type is not None and e.relation != relation_type:
|
|
continue
|
|
# Skip the outgoing-side duplicate: an edge indexed at
|
|
# *both* endpoints in ``edges_by_object`` would double-
|
|
# count. Filter by direction.
|
|
if e.subject == canonical:
|
|
continue
|
|
if not _within_range(e.valid_from, e.valid_until, start_time, end_time):
|
|
continue
|
|
f = edge_to_fact(e)
|
|
f["direction"] = "incoming"
|
|
facts.append(f)
|
|
# Sort by valid_from (None sorts first to keep undated facts visible).
|
|
facts.sort(key=lambda f: (f["valid_from"] or "", f["valid_until"] or ""))
|
|
return facts
|
|
|
|
|
|
def _within_range(valid_from, valid_until, start_time, end_time) -> bool:
|
|
"""The edge's window overlaps ``[start_time, end_time]``.
|
|
|
|
Used by ``timeline`` and ``events_during``. Empty bounds on
|
|
either side are open. We don't use the canonical
|
|
``time_in_window`` here because that helper answers "is X
|
|
inside [lo, hi]?" — we need the symmetric interval-overlap
|
|
test instead.
|
|
"""
|
|
if start_time is None and end_time is None:
|
|
return True
|
|
# An edge entirely before ``start_time`` is excluded.
|
|
if valid_until is not None and start_time is not None:
|
|
from .time_model import _cmp_atoms
|
|
if _cmp_atoms(valid_until, start_time) < 0:
|
|
return False
|
|
# An edge entirely after ``end_time`` is excluded.
|
|
if valid_from is not None and end_time is not None:
|
|
from .time_model import _cmp_atoms
|
|
if _cmp_atoms(valid_from, end_time) > 0:
|
|
return False
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Group 3 — Lineage & hierarchy
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def list_lineage(graph: Graph, person: str, depth: int = 2) -> dict:
|
|
"""The lineage ``person`` belongs to, plus members within ``depth``.
|
|
|
|
Returns ``{lineage: {name, founding_ancestor} | None, members:
|
|
[{name, relation}], cadet_branches: [...], depth_covered: int}``.
|
|
"""
|
|
canonical = graph.by_name(person)
|
|
if canonical is None:
|
|
return {"lineage": None, "members": [], "cadet_branches": [], "depth_covered": 0}
|
|
# The person is in a Lineage via MEMBER_OF(Lineage).
|
|
lineage_name = None
|
|
for e in graph.edges_for_subject(canonical, "MEMBER_OF"):
|
|
if _lookup_entity_type(graph, e.object) == "Lineage":
|
|
lineage_name = e.object
|
|
break
|
|
if lineage_name is None:
|
|
return {"lineage": None, "members": [], "cadet_branches": [], "depth_covered": 0}
|
|
# Find the founding ancestor.
|
|
founding = None
|
|
for e in graph.edges_for_subject(lineage_name, "FOUNDED_BY"):
|
|
founding = e.object
|
|
break
|
|
# Members via reverse MEMBER_OF edges.
|
|
members: list[dict] = []
|
|
seen: set[str] = set()
|
|
for e in graph.edges_for_object(lineage_name):
|
|
if e.relation != "MEMBER_OF":
|
|
continue
|
|
if e.subject == canonical or e.subject in seen:
|
|
continue
|
|
seen.add(e.subject)
|
|
members.append({"name": e.subject, "relation": "MEMBER_OF"})
|
|
return {
|
|
"lineage": {"name": lineage_name, "founding_ancestor": founding},
|
|
"members": members,
|
|
"cadet_branches": [], # slice 4 keeps this empty; richer output is slice 4.x
|
|
"depth_covered": depth,
|
|
}
|
|
|
|
|
|
def list_offspring(graph: Graph, person: str) -> list[str]:
|
|
"""Direct children of ``person`` via ``PARENT_OF``.
|
|
|
|
Returns the children's canonical names. The in-memory graph
|
|
doesn't carry ``generations`` (each ``PARENT_OF`` is one hop);
|
|
for multi-hop use :func:`descendants_of`.
|
|
"""
|
|
canonical = graph.by_name(person)
|
|
if canonical is None:
|
|
return []
|
|
out = []
|
|
seen: set[str] = set()
|
|
for e in graph.edges_for_subject(canonical, "PARENT_OF"):
|
|
if e.object in seen:
|
|
continue
|
|
seen.add(e.object)
|
|
out.append(e.object)
|
|
return out
|
|
|
|
|
|
def ancestors_of(graph: Graph, person: str, generations: int = 3) -> list[str]:
|
|
"""Ancestors of ``person`` via reverse ``PARENT_OF`` walks.
|
|
|
|
Bounded by ``generations`` (default 3). Returns ancestors in
|
|
BFS order (closest first). Cycle-safe via ``seen``.
|
|
"""
|
|
canonical = graph.by_name(person)
|
|
if canonical is None:
|
|
return []
|
|
seen: set[str] = {canonical}
|
|
frontier: list[str] = [canonical]
|
|
out: list[str] = []
|
|
for _ in range(generations):
|
|
next_frontier: list[str] = []
|
|
for n in frontier:
|
|
for e in graph.edges_for_object(n):
|
|
if e.relation != "PARENT_OF":
|
|
continue
|
|
if e.subject in seen:
|
|
continue
|
|
seen.add(e.subject)
|
|
out.append(e.subject)
|
|
next_frontier.append(e.subject)
|
|
frontier = next_frontier
|
|
if not frontier:
|
|
break
|
|
return out
|
|
|
|
|
|
def descendants_of(graph: Graph, person: str, generations: int = 3) -> list[str]:
|
|
"""Descendants of ``person`` via forward ``PARENT_OF`` walks.
|
|
|
|
Bounded by ``generations`` (default 3). Returns descendants
|
|
in BFS order.
|
|
"""
|
|
canonical = graph.by_name(person)
|
|
if canonical is None:
|
|
return []
|
|
seen: set[str] = {canonical}
|
|
frontier: list[str] = [canonical]
|
|
out: list[str] = []
|
|
for _ in range(generations):
|
|
next_frontier: list[str] = []
|
|
for n in frontier:
|
|
for e in graph.edges_for_subject(n, "PARENT_OF"):
|
|
if e.object in seen:
|
|
continue
|
|
seen.add(e.object)
|
|
out.append(e.object)
|
|
next_frontier.append(e.object)
|
|
frontier = next_frontier
|
|
if not frontier:
|
|
break
|
|
return out
|
|
|
|
|
|
def location_hierarchy(
|
|
graph: Graph,
|
|
location: str,
|
|
direction: str = "up",
|
|
) -> list[dict]:
|
|
"""The ``PART_OF`` chain above or below ``location``.
|
|
|
|
``direction="up"`` walks ``PART_OF`` edges (location →
|
|
parent). ``direction="down"`` walks reverse ``PART_OF``
|
|
(location → children). For ``up``, the result is a chain
|
|
(one parent per level); for ``down``, every direct child
|
|
is listed at its level before descending further.
|
|
|
|
Returns ``[{name, valid_from, valid_until}]``.
|
|
"""
|
|
canonical = graph.by_name(location)
|
|
if canonical is None:
|
|
return []
|
|
if direction not in ("up", "down"):
|
|
raise ValueError(f"direction must be 'up' or 'down', got {direction!r}")
|
|
out: list[dict] = []
|
|
seen: set[str] = {canonical}
|
|
if direction == "up":
|
|
# Chain walk: location → parent → grandparent ...
|
|
current = canonical
|
|
while True:
|
|
edges = graph.edges_for_subject(current, "PART_OF")
|
|
if not edges:
|
|
break
|
|
e = edges[0]
|
|
if e.object in seen:
|
|
break
|
|
seen.add(e.object)
|
|
out.append({
|
|
"name": e.object,
|
|
"valid_from": e.valid_from,
|
|
"valid_until": e.valid_until,
|
|
})
|
|
current = e.object
|
|
else:
|
|
# Tree walk: list every direct child of ``canonical``
|
|
# before descending. This is the "what's in region X?" query.
|
|
current = canonical
|
|
direct_children = [
|
|
e.subject
|
|
for e in graph.edges_for_object(current)
|
|
if e.relation == "PART_OF" and e.subject != current
|
|
]
|
|
for child in direct_children:
|
|
if child in seen:
|
|
continue
|
|
seen.add(child)
|
|
# Find the edge for valid_from/until.
|
|
child_edge = next(
|
|
(e for e in graph.edges_for_object(current)
|
|
if e.relation == "PART_OF" and e.subject == child),
|
|
None,
|
|
)
|
|
out.append({
|
|
"name": child,
|
|
"valid_from": child_edge.valid_from if child_edge else None,
|
|
"valid_until": child_edge.valid_until if child_edge else None,
|
|
})
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Group 4 — Causal & event chains
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Edges that imply "this thing led to that thing" for event_chain.
|
|
CAUSAL_RELATIONS = frozenset({"CAUSED", "PRECEDED", "CONCURRENT_WITH"})
|
|
|
|
|
|
def event_chain(graph: Graph, event: str, depth: int = 2) -> dict:
|
|
"""Bounded multi-hop walk of ``CAUSED`` / ``PRECEDED`` /
|
|
``CONCURRENT_WITH`` edges from ``event``.
|
|
|
|
Returns ``{event: str, causes: [...], effects: [...],
|
|
concurrent: [...], depth_covered: int}``. Each cause /
|
|
effect / concurrent entry is a chain node::
|
|
|
|
{"event": str, "via": "CAUSED", "depth": int, "sources": [...]}
|
|
"""
|
|
canonical = graph.by_name(event)
|
|
if canonical is None:
|
|
return {"event": None, "causes": [], "effects": [], "concurrent": [], "depth_covered": 0}
|
|
|
|
def walk(direction: str) -> list[dict]:
|
|
out: list[dict] = []
|
|
seen: set[str] = {canonical}
|
|
frontier = [(canonical, 0)]
|
|
for _ in range(depth):
|
|
next_frontier: list[tuple[str, int]] = []
|
|
for current, d in frontier:
|
|
if direction == "effects":
|
|
edges_iter = (
|
|
(e, e.object)
|
|
for e in graph.edges_for_subject(current)
|
|
)
|
|
else:
|
|
edges_iter = (
|
|
(e, e.subject)
|
|
for e in graph.edges_for_object(current)
|
|
if e.subject != current
|
|
)
|
|
for e, other in edges_iter:
|
|
if e.relation not in CAUSAL_RELATIONS:
|
|
continue
|
|
if other in seen:
|
|
continue
|
|
seen.add(other)
|
|
out.append({
|
|
"event": other,
|
|
"via": e.relation,
|
|
"depth": d + 1,
|
|
"sources": list(e.sources),
|
|
})
|
|
next_frontier.append((other, d + 1))
|
|
frontier = next_frontier
|
|
if not frontier:
|
|
break
|
|
return out
|
|
|
|
causes = walk("causes")
|
|
effects = walk("effects")
|
|
# CONCURRENT_WITH is symmetric: any node it points at is a peer.
|
|
concurrent: list[dict] = []
|
|
for e in graph.edges_for_subject(canonical, "CONCURRENT_WITH"):
|
|
concurrent.append({
|
|
"event": e.object,
|
|
"via": "CONCURRENT_WITH",
|
|
"depth": 0,
|
|
"sources": list(e.sources),
|
|
})
|
|
return {
|
|
"event": canonical,
|
|
"causes": causes,
|
|
"effects": effects,
|
|
"concurrent": concurrent,
|
|
"depth_covered": depth,
|
|
}
|
|
|
|
|
|
def events_during(
|
|
graph: Graph,
|
|
era: str,
|
|
location: Optional[str] = None,
|
|
type_: Optional[str] = None,
|
|
start_time: Optional[str] = None,
|
|
end_time: Optional[str] = None,
|
|
*,
|
|
setting: Optional[str] = None,
|
|
) -> list[dict]:
|
|
"""Events whose ``OCCURRED_DURING`` window intersects ``era``.
|
|
|
|
Filters: ``location`` (events must have ``OCCURRED_AT`` to
|
|
that location), ``type_`` (Event / Battle / Ceremony /
|
|
etc.), and an optional ``[start_time, end_time]`` range that
|
|
narrows the era window. Returns events sorted by
|
|
``valid_from``.
|
|
|
|
Slice 6.5 — ``setting`` (keyword-only) restricts the
|
|
result to events whose subject ``EXISTS_IN`` the named
|
|
setting. The filter is on the *event subject*, not on
|
|
the era — eras are not setting-scoped (the design
|
|
allows the same era id to exist in multiple settings).
|
|
An unknown setting returns ``[]``. ``None`` (default)
|
|
means no filter — slice 4 / 9 behaviour.
|
|
"""
|
|
# Locate the era node.
|
|
era_name = graph.by_name(era) or era
|
|
# Slice 6.5 — resolve setting membership once. An
|
|
# unknown setting is treated as "no events".
|
|
if setting is not None and graph.find_setting(setting) is None:
|
|
return []
|
|
setting_members = (
|
|
graph.setting_entities(setting) if setting is not None else None
|
|
)
|
|
# Events are nodes that have an outgoing ``OCCURRED_DURING``
|
|
# edge to the era (or any descendant of it).
|
|
out: list[dict] = []
|
|
# Slice 5.4: route through the GraphBackend Protocol. For
|
|
# each name in the graph, look at its OCCURRED_DURING /
|
|
# OCCURRED_AT edges.
|
|
for subject in graph.all_names():
|
|
# Slice 6.5 — drop events whose subject isn't in the
|
|
# named setting.
|
|
if setting_members is not None and subject not in setting_members:
|
|
continue
|
|
sub_edges = graph.edges_for_subject(subject)
|
|
occurred_during = [e for e in sub_edges if e.relation == "OCCURRED_DURING"]
|
|
if not occurred_during:
|
|
continue
|
|
for e in occurred_during:
|
|
if e.object != era_name:
|
|
# Descendant check via the time tree would be
|
|
# cleaner; for the POC we accept exact-match only.
|
|
continue
|
|
# Pull the event's other metadata: OCCURRED_AT for
|
|
# the location filter, type for the type filter.
|
|
event_loc = None
|
|
for e2 in sub_edges:
|
|
if e2.relation == "OCCURRED_AT":
|
|
event_loc = e2.object
|
|
break
|
|
event_type = _lookup_entity_type(graph, subject)
|
|
if location is not None:
|
|
loc_canonical = graph.by_name(location)
|
|
if loc_canonical is None or event_loc != loc_canonical:
|
|
continue
|
|
if type_ is not None and event_type != type_:
|
|
continue
|
|
if not _within_range(e.valid_from, e.valid_until, start_time, end_time):
|
|
continue
|
|
out.append({
|
|
"name": subject,
|
|
"type": event_type,
|
|
"location": event_loc,
|
|
"valid_from": e.valid_from,
|
|
"valid_until": e.valid_until,
|
|
})
|
|
out.sort(key=lambda d: (d["valid_from"] or "", d["valid_until"] or ""))
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Group 5 — Knowledge & lore
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def lore_about(
|
|
graph: Graph,
|
|
entity: str,
|
|
type_: Optional[str] = None,
|
|
limit: int = 10,
|
|
) -> list[dict]:
|
|
"""``LoreSource`` documents that mention ``entity``.
|
|
|
|
The in-memory graph stores a ``SOURCED_FROM`` edge per typed
|
|
edge in the structured-YAML path, and every markdown entity
|
|
carries a ``LoreSource`` on ``entity.sources``. We pull the
|
|
unique source paths from those edges, join against
|
|
``graph.lore_sources``, and return one entry per source.
|
|
|
|
Sort order: ``source_confidence`` descending (we trust the
|
|
canonical sources most). Capped at ``limit`` (default 10).
|
|
"""
|
|
canonical = graph.by_name(entity)
|
|
if canonical is None:
|
|
return []
|
|
paths: set[str] = set()
|
|
# Outgoing edges. Slice 5.4: use the GraphBackend Protocol.
|
|
for e in graph.edges_for_subject(canonical):
|
|
for src in e.sources:
|
|
paths.add(src)
|
|
# Incoming edges.
|
|
for e in graph.edges_for_object(canonical):
|
|
for src in e.sources:
|
|
paths.add(src)
|
|
out: list[dict] = []
|
|
for path in paths:
|
|
ls = graph.lore_source(path)
|
|
if ls is None:
|
|
continue
|
|
if type_ is not None and ls.source_type != type_:
|
|
continue
|
|
out.append({
|
|
"path": ls.path,
|
|
"title": ls.name,
|
|
"source_type": ls.source_type,
|
|
"reliability": ls.reliability,
|
|
"source_confidence": ls.source_confidence,
|
|
})
|
|
out.sort(key=lambda d: (-d["source_confidence"], d["path"]))
|
|
return out[:limit]
|
|
|
|
|
|
__all__ = [
|
|
"lookup",
|
|
"entity_context",
|
|
"true_during",
|
|
"entities_present",
|
|
"timeline",
|
|
"list_lineage",
|
|
"list_offspring",
|
|
"ancestors_of",
|
|
"descendants_of",
|
|
"location_hierarchy",
|
|
"event_chain",
|
|
"events_during",
|
|
"lore_about",
|
|
] |