Files
Lore Engine Dev 66e0104b6d slice 5.4: reified :Relation reads + full read-tool parity
Implements the read + write surface of Neo4jGraph against the
reified :Relation shape (ADR 0009). The read tools (slice 4) and
the consistency runner / ontology rules (slice 2) are migrated
to use only GraphBackend Protocol methods, so the same Python
code works against both InMemoryGraph and Neo4jGraph.

Reads (Neo4jGraph):
  * edges_for_subject(name, relation=None) -> list[Edge]
  * edges_for_object(name) -> list[Edge]
  * find_edge_by_id(edge_id) -> Edge | None
  * by_name, all_names, all_entity_types, entities_of_type,
    lore_source (slice 5.3)
  * Round-trip: each Edge field is stored as a :Relation node
    property and rehydrated on read; Cypher ORDER BY edge_id
    so list order matches the in-memory insertion order

Writes (Neo4jGraph):
  * add(edge): MERGE subject + object :Entity nodes, upsert
    :Relation (id-keyed), link :FROM/:TO, link :SOURCED_FROM
    to each :LoreSource in the edge's sources list
  * replace_edge(old_id, new_edge): in-place property update
    for same (subject, relation, object); drop+re-add for
    different endpoints (preserves edge_id for retcon audit)
  * remove_entity(name): DETACH DELETE the :Entity + alias
    cleanup; returns the number of edges that were attached
  * remove_entity_of_type(name, type_): REMOVE n:Label
  * rename_entity(old, new): rename + register old as alias
  * resolve_alias, register_name, register_alias, add_lore_source,
    add_entity_of_type (slice 5.3)

Migrations (read tools + consistency + ontology):
  * tools.py: was_true_at uses graph.edges_for_subject(...)
  * read_tools.py: 22 sites of graph.edges_by_subject.get /
    .items / .values / graph.edges_by_object.get / graph.entities_by_type
    .items / graph.lore_sources.get / graph.names migrated to the
    Protocol methods
  * consistency_runner.py: 4 sites (all_edges flatten,
    anachronism detector, orphan detector)
  * ontology_rules.py: 13 sites (10 ontology rules + helper)
  * write_tools.py: 3 sites (label membership check, era walk)

CI fence (test_graph_backend_writes.py):
  test_no_direct_dict_access_outside_graph_backend now greps for
  the broader pattern (bracket, .get, .items, .values, .keys on
  graph.edges_by_*, entities_by_type, lore_sources, aliases; and
  bare graph.names). Fails the build on regression.

Parity tests (test_neo4j_read_tools_parity.py): 15 docker-gated
tests, one per read tool, asserting the in-memory and Neo4j
backends produce matching answers for a known fixture.

Suite: 596 -> 611 passed (+15 parity tests, 559 baseline preserved)
2026-06-18 22:32:47 -04:00

883 lines
31 KiB
Python

"""Lore Engine POC — write-side tools (slice 4.7).
The minimal world-builder surface. Three tools: add_entity,
add_relation, add_lore_source. Each returns the standard
``{ok, data}`` / ``{ok, error}`` envelope from
:mod:`lore_engine_poc.responses`.
Out of scope for slice 4.7 (deferred to later sub-slices):
* update_entity, delete_entity, retcon, mark_verified,
merge_entities, set_alias
* define_era, define_calendar, define_date
* bulk / batched writes
* persistence to disk — slice 4.7 writes are in-memory only;
the next ``01_ingest.py`` rebuilds from codex.
All three functions mutate the graph in place. Returns the
envelope for transport.
"""
from __future__ import annotations
import datetime
import uuid
from typing import Optional
from .parsers import LoreSource
from .responses import envelope
from .time_model import normalize, time_in_window
from .tools import Edge, Graph, replace
def _new_id(prefix: str) -> str:
"""Stable-ish id for write tool responses."""
return f"{prefix}-{uuid.uuid4().hex[:8]}"
# Allowlisted labels the world-builder can create. Anything
# outside the list returns an error envelope — this is the
# minimum structural contract that prevents the LLM from
# minting a 37th label.
ALLOWED_LABELS = frozenset({
"Person", "Faction", "Location", "Creature", "Item",
"Title", "Language", "Deity", "Spell", "Material",
"Era", "Calendar", "Lineage", "Culture", "MagicSystem",
"Region", "Plane", "Setting",
"NPC", "PC", "Human",
"Date",
"DomainEntity", "Relation", "TypeTemplate",
"LoreSource",
"Contradiction", "Anachronism", "Orphan", "OntologyViolation",
})
def add_entity(
graph: Graph,
label: str,
name: str,
properties: Optional[dict] = None,
) -> dict:
"""Create a node tagged with ``label`` and ``name``.
The function doesn't store the ``properties`` payload in
the in-memory graph (slice 4.7 has no schema registry
yet); it only registers the name+label so future reads
can find it. ``properties`` is reserved for the slice-4.7
follow-up that introduces TypeTemplate-driven storage.
Returns ``{"ok": True, "data": {"id", "name", "label"}}``
on success, ``{"ok": False, "error": ...}`` if ``label``
isn't allowlisted or ``name`` is empty.
"""
if not name or not name.strip():
return envelope(ok=False, error="entity name is required")
if label not in ALLOWED_LABELS:
return envelope(
ok=False,
error=f"label {label!r} is not in the allowed list ({len(ALLOWED_LABELS)} labels; see 01-ontology.md)",
)
canonical = name.strip()
graph.add_entity_of_type(canonical, label)
return envelope(
ok=True,
data={"id": _new_id("ent"), "name": canonical, "label": label},
)
def add_relation(
graph: Graph,
from_name: str,
relation: str,
to_name: str,
valid_from: Optional[str] = None,
valid_until: Optional[str] = None,
) -> dict:
"""Append a time-bounded edge to the graph.
``valid_from`` / ``valid_until`` are canonical time atoms
(parsed by :func:`lore_engine_poc.time_model.normalize`).
Malformed bounds return an error envelope rather than
raising. ``relation`` is free-form — slice 4.7 doesn't
enforce an edge-type allowlist (that's a slice-5 TypeTemplate
concern).
"""
if not relation or not relation.strip():
return envelope(ok=False, error="relation type is required")
# Validate the time bounds (if given). Empty-string and
# "null" / "none" are accepted as "no bound".
try:
vfrom = normalize(valid_from) if valid_from not in (None, "", "null", "none") else None
vuntil = normalize(valid_until) if valid_until not in (None, "", "null", "none") else None
except ValueError as exc:
return envelope(ok=False, error=f"invalid time bound: {exc}")
# Confirm a basic sanity: from < until when both are bounded.
if vfrom is not None and vuntil is not None and not time_in_window(vfrom, vfrom, vuntil):
# The "until" is strictly before "from" — invalid.
return envelope(
ok=False,
error=f"valid_from {vfrom!r} is not before valid_until {vuntil!r}",
)
from_canonical = graph.by_name(from_name) or from_name.strip()
to_canonical = graph.by_name(to_name) or to_name.strip()
edge = Edge(
subject=from_canonical,
relation=relation,
object=to_canonical,
valid_from=vfrom,
valid_until=vuntil,
sources=["world_builder:add_relation"],
)
graph.add(edge)
return envelope(
ok=True,
data={
# Slice 10.2 — return the *actual* edge id so the
# world-builder can immediately call ``retcon`` /
# ``mark_verified`` on this edge. The pre-10.2
# response synthesised a separate uuid; that's
# dropped because the Edge's ``edge_id`` is the
# stable identifier for the slice-10.2 tools.
"edge_id": edge.edge_id,
"subject": from_canonical,
"relation": relation,
"object": to_canonical,
"valid_from": vfrom,
"valid_until": vuntil,
},
)
def add_lore_source(
graph: Graph,
title: str,
source_type: str,
content: str = "",
author: Optional[str] = None,
) -> dict:
"""Create a :class:`LoreSource` node + register it in the graph.
The function does **not** chunk / embed the content (the
LLM extraction path is slice 3). It only records the
source's metadata so the ``lore_about`` and consistency
tools can find it.
``source_type`` is one of ``prose`` / ``timeline`` /
``family_tree`` / ``gazetteer`` / ``bestiary`` / ``culture``
/ ``magic_system`` / ``dialogue`` per the ``LoreSource``
contract.
"""
if not title or not title.strip():
return envelope(ok=False, error="title is required")
allowed_types = {"prose", "timeline", "family_tree", "gazetteer",
"bestiary", "culture", "magic_system", "dialogue"}
if source_type not in allowed_types:
return envelope(
ok=False,
error=f"source_type {source_type!r} not in {sorted(allowed_types)}",
)
# Synthesise a path so the source has a stable key in the
# ``lore_sources`` side index. We don't write to disk in
# slice 4.7; the path is an in-memory id.
path = f"world_builder/{_new_id('src')}.md"
source = LoreSource(
path=path,
name=title.strip(),
source_type=source_type,
reliability="world_builder", # slice 4.7 stamps a custom reliability
source_confidence=1.0,
)
graph.add_lore_source(source)
# ``add_lore_source`` registers the title in the names set
# so ``lookup`` / ``lore_about`` can find it.
return envelope(
ok=True,
data={
"id": path,
"title": title.strip(),
"source_type": source_type,
"author": author,
"reliability": source.reliability,
},
)
# ---------------------------------------------------------------------------
# Slice 10 — deferred write tools (subset 1)
# ---------------------------------------------------------------------------
#
# These three were deferred from slice 4.7 (see the module
# docstring). They follow the same envelope contract as the
# slice-4.7 writes. Each mutates the in-memory graph; the next
# ``01_ingest.py`` rebuilds from codex, so writes are not
# persisted to disk.
def set_alias(
graph: Graph,
name: str,
alias: str,
) -> dict:
"""Register ``alias`` as an alternative name for ``name``.
The alias is stored on ``graph.aliases`` (a ``name → set``
map). :func:`Graph.by_name` consults the map, so a lookup
by the alias returns the canonical entity. Aliases are
case-sensitive but the resolve path lowercases for
case-insensitive matching.
Returns the standard envelope. Errors:
* ``name`` not in the graph → ``name not found``.
* empty / whitespace ``alias`` → ``alias is required``.
* duplicate alias on the same canonical → ``already
registered as alias for ...``.
"""
canonical = graph.by_name(name)
if canonical is None:
return envelope(ok=False, error=f"name {name!r} not found in graph")
if not alias or not alias.strip():
return envelope(ok=False, error="alias is required")
alias_clean = alias.strip()
existing = graph.aliases.setdefault(canonical, set())
if alias_clean in existing:
return envelope(
ok=False,
error=f"{alias_clean!r} is already registered as alias for {canonical!r}",
)
existing.add(alias_clean)
return envelope(
ok=True,
data={"name": canonical, "alias": alias_clean},
)
def update_entity(
graph: Graph,
name: str,
*,
label: Optional[str] = None,
name_: Optional[str] = None,
) -> dict:
"""Update an entity's label and/or canonical name.
Pass ``label`` to change the entity's type (must be in
:data:`ALLOWED_LABELS`). Pass ``name_`` to rename the
canonical name; all edges that point at the old name are
re-pointed, and the old name is preserved as an alias of
the new one.
The rename parameter is ``name_`` (with a trailing
underscore) to avoid colliding with the positional
``name`` argument; the MCP layer maps the user-facing
``name`` JSON field to ``name_``.
At least one of ``label`` / ``name_`` must be supplied.
Returns the standard envelope.
"""
if label is None and name_ is None:
return envelope(
ok=False,
error="nothing to update: supply label= and/or name=",
)
canonical = graph.by_name(name)
if canonical is None:
return envelope(ok=False, error=f"name {name!r} not found in graph")
re_pointed = 0
new_label = None
if label is not None:
if label not in ALLOWED_LABELS:
return envelope(
ok=False,
error=f"label {label!r} is not in the allowed list",
)
# Move the name across the type index — drop it from every
# type bucket first, then add it under the new label.
# Snapshot the keys before mutating.
for type_ in list(graph.all_entity_types()):
graph.remove_entity_of_type(canonical, type_)
graph.add_entity_of_type(canonical, label)
new_label = label
if name_ is not None:
if not name_.strip():
return envelope(ok=False, error="name must be non-empty")
new_clean = name_.strip()
if new_clean == canonical:
# No-op rename; surface as a successful no-change.
return envelope(
ok=True,
data={"name": canonical, "edges_repointed": 0, "label": new_label},
)
re_pointed = graph.rename_entity(canonical, new_clean)
canonical = new_clean
return envelope(
ok=True,
data={
"name": canonical,
"label": new_label,
"edges_repointed": re_pointed,
},
)
def delete_entity(graph: Graph, name: str) -> dict:
"""Remove ``name`` and every edge that touches it.
Cascades through the graph's reverse indexes so no dangling
references remain. The entity's aliases are dropped too.
Returns the standard envelope with ``data.edges_removed``
so the world-builder can audit the blast radius.
"""
canonical = graph.by_name(name)
if canonical is None:
return envelope(ok=False, error=f"name {name!r} not found in graph")
removed = graph.remove_entity(canonical)
return envelope(
ok=True,
data={"name": canonical, "edges_removed": removed},
)
# ---------------------------------------------------------------------------
# Slice 10.2 — deferred write tools (subset 2)
# ---------------------------------------------------------------------------
#
# retcon / mark_verified / merge_entities. These mutate existing
# edges (rather than creating new ones) and need ``edge_id`` for
# identity. The id reverse index on Graph keeps the cost O(1).
def _now_iso() -> str:
"""ISO-8601 UTC timestamp with second precision.
Stable for the audit log; we don't need microsecond resolution
and the test suite asserts only that the value is a non-empty
string. Uses ``datetime`` so tests don't depend on
``time.time`` mocking."""
return datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def retcon(
graph: Graph,
edge_id: str,
*,
valid_from: Optional[str] = None,
valid_until: Optional[str] = None,
relation: Optional[str] = None,
object_: Optional[str] = None,
note: Optional[str] = None,
) -> dict:
"""Amend an existing edge's bounds, relation, or object.
Loads the edge by id, validates any new time bounds via
:func:`time_model.normalize`, and stamps ``retcon_at`` /
``retcon_note`` for the audit log. The mutation is in place
— the edge object is rebuilt via ``dataclasses.replace`` and
re-indexed by subject / object / id.
At least one of ``valid_from`` / ``valid_until`` /
``relation`` / ``object_`` / ``note`` must be supplied.
Empty-string / whitespace-only values are treated as
"leave that field unchanged."
Returns the standard envelope. Errors:
* ``edge_id`` not in the graph → ``edge ... not found``.
* malformed time bound → ``invalid time bound: ...``.
* inverted bounds → ``valid_from ... is not before
valid_until ...``.
* nothing to update → ``nothing to update: ...``.
"""
if (
valid_from is None
and valid_until is None
and relation is None
and object_ is None
and note is None
):
return envelope(
ok=False,
error="nothing to retcon: supply valid_from= and/or valid_until= "
"and/or relation= and/or object= and/or note=",
)
edge = graph.find_edge_by_id(edge_id)
if edge is None:
return envelope(ok=False, error=f"edge {edge_id!r} not found in graph")
# Validate time bounds (if given).
try:
new_from = (
normalize(valid_from)
if valid_from not in (None, "", "null", "none")
else edge.valid_from
)
new_until = (
normalize(valid_until)
if valid_until not in (None, "", "null", "none")
else edge.valid_until
)
except ValueError as exc:
return envelope(ok=False, error=f"invalid time bound: {exc}")
if (
new_from is not None
and new_until is not None
and not time_in_window(new_from, new_from, new_until)
):
return envelope(
ok=False,
error=f"valid_from {new_from!r} is not before valid_until {new_until!r}",
)
new_relation = relation.strip() if relation is not None else edge.relation
if not new_relation:
return envelope(ok=False, error="relation must be non-empty")
new_object = object_.strip() if object_ is not None else edge.object
if not new_object:
return envelope(ok=False, error="object must be non-empty")
# If the object is changing, the subject/object indexes need
# to move the edge. We rebuild via replace + re-index.
new_edge = replace(
edge,
valid_from=new_from,
valid_until=new_until,
relation=new_relation,
object=new_object,
retcon_at=_now_iso(),
retcon_note=note if note is not None else edge.retcon_note,
)
# Slice 5.2 — single chokepoint for any edge-identity
# mutation. ``replace_edge`` handles the in-place swap or
# the drop-and-re-add depending on whether the new edge's
# subject / relation / object differs from the old one.
graph.replace_edge(edge_id, new_edge)
return envelope(
ok=True,
data={
"edge_id": new_edge.edge_id,
"subject": new_edge.subject,
"relation": new_edge.relation,
"object": new_edge.object,
"valid_from": new_edge.valid_from,
"valid_until": new_edge.valid_until,
"retcon_at": new_edge.retcon_at,
"retcon_note": new_edge.retcon_note,
},
)
def mark_verified(
graph: Graph,
edge_id: str,
*,
verifier: str,
note: Optional[str] = None,
) -> dict:
"""Record a human verification of an edge's contents.
Appends a (1.0, 1.0, "human_verified") source tuple so the
aggregate confidence floors to 1.0, and stamps
``verified_by`` / ``verified_at`` / ``verified_note`` for
the audit log. Doesn't touch ``valid_from`` /
``valid_until`` — that's ``retcon``'s job.
Returns the standard envelope. Errors:
* ``edge_id`` not in the graph → ``edge ... not found``.
* empty ``verifier`` → ``verifier is required``.
"""
if not verifier or not verifier.strip():
return envelope(ok=False, error="verifier is required")
verifier_clean = verifier.strip()
edge = graph.find_edge_by_id(edge_id)
if edge is None:
return envelope(ok=False, error=f"edge {edge_id!r} not found in graph")
new_edge = replace(
edge,
extraction_confidences=list(edge.extraction_confidences) + [1.0],
source_confidences=list(edge.source_confidences) + [1.0],
reliabilities=list(edge.reliabilities) + ["human_verified"],
verified_by=verifier_clean,
verified_at=_now_iso(),
verified_note=note if note is not None else edge.verified_note,
)
# Swap into the indexes via the slice 5.2 chokepoint.
graph.replace_edge(edge_id, new_edge)
return envelope(
ok=True,
data={
"edge_id": new_edge.edge_id,
"verified_by": new_edge.verified_by,
"verified_at": new_edge.verified_at,
"verified_note": new_edge.verified_note,
},
)
def merge_entities(graph: Graph, from_name: str, to_name: str) -> dict:
"""Fold ``from_name`` into ``to_name``.
Every edge that has ``from_name`` as subject or object is
re-pointed to ``to_name``; ``from_name`` is preserved as an
alias of ``to_name`` and removed from the names set. The
two names must have the **same** type label (e.g. both
``Person`` or both ``Faction``) — folding across labels
would silently drop the type information, which slice 10.2
refuses.
Returns the standard envelope. Errors:
* ``from_name`` not in the graph → ``name ... not found``.
* ``to_name`` not in the graph → ``name ... not found``.
* the two have different labels → ``cannot merge ... with
different labels: ...``.
* the two are the same canonical name → no-op success
with ``edges_repointed == 0``.
"""
from_canonical = graph.by_name(from_name)
if from_canonical is None:
return envelope(ok=False, error=f"name {from_name!r} not found in graph")
to_canonical = graph.by_name(to_name)
if to_canonical is None:
return envelope(ok=False, error=f"name {to_name!r} not found in graph")
if from_canonical == to_canonical:
return envelope(
ok=True,
data={"merged": from_canonical, "into": to_canonical, "edges_repointed": 0},
)
# Type-label check.
from_labels = {
label for label in graph.all_entity_types()
if from_canonical in graph.entities_of_type(label)
}
to_labels = {
label for label in graph.all_entity_types()
if to_canonical in graph.entities_of_type(label)
}
# Subset check: if ``from``'s labels are a subset of ``to``'s
# (or empty), it's safe to merge. Otherwise it's a label
# mismatch we refuse.
if from_labels and not from_labels.issubset(to_labels):
return envelope(
ok=False,
error=(
f"cannot merge {from_canonical!r} (labels={sorted(from_labels)}) "
f"into {to_canonical!r} (labels={sorted(to_labels)}): "
f"labels differ"
),
)
re_pointed = graph.rename_entity(from_canonical, to_canonical)
return envelope(
ok=True,
data={"merged": from_canonical, "into": to_canonical, "edges_repointed": re_pointed},
)
# ---------------------------------------------------------------------------
# Slice 10.3 — deferred write tools (subset 3)
# ---------------------------------------------------------------------------
#
# define_era / define_calendar / define_date. These add the
# time-domain metadata nodes (Calendar, Era, Date) that the
# rest of the graph can reference. The canonical time atom
# returned by ``define_date`` matches the format the rest of
# the time model uses (``{era}.{unit}``).
def define_calendar(
graph: Graph,
name: str,
*,
days_per_year: Optional[int] = None,
months: Optional[int] = None,
description: Optional[str] = None,
) -> dict:
"""Create a Calendar node.
Registers the name in the graph and in the ``Calendar``
type index. ``days_per_year`` and ``months`` are optional
metadata — when both are absent, the calendar is a
placeholder (e.g. an unmodelled real-world calendar).
Returns the standard envelope. Errors:
* empty / whitespace name → ``name is required``.
* duplicate name → ``name ... is already registered``.
* negative ``days_per_year`` → ``days_per_year must be
positive``.
* non-positive ``months`` → ``months must be positive``.
"""
if not name or not name.strip():
return envelope(ok=False, error="name is required")
canonical = name.strip()
if graph.by_name(canonical) is not None:
return envelope(
ok=False,
error=f"name {canonical!r} is already registered",
)
if days_per_year is not None and days_per_year <= 0:
return envelope(ok=False, error="days_per_year must be positive")
if months is not None and months <= 0:
return envelope(ok=False, error="months must be positive")
graph.add_entity_of_type(canonical, "Calendar")
return envelope(
ok=True,
data={
"name": canonical,
"days_per_year": days_per_year,
"months": months,
"description": description,
},
)
def define_era(
graph: Graph,
name: str,
*,
calendar: str,
start: str,
end: Optional[str] = None,
description: Optional[str] = None,
) -> dict:
"""Create an Era node and link it to a Calendar.
Stamps a ``PART_OF`` edge from the era to the calendar.
If another era already exists for the same calendar, this
one gets a ``PRECEDED`` edge pointing at the most-recent
prior era (simple linear ordering; the world-builder can
override with ``retcon`` later).
The era's name doubles as the era-prefix portion of the
canonical time atom: a date in this era is
``{name}.year_N`` / ``{name}.age_of_iron.year_N`` etc.
That naming is enforced by ``define_date`` — the era's
name is the prefix.
Returns the standard envelope. Errors:
* empty / whitespace name → ``name is required``.
* empty / whitespace calendar → ``calendar is required``.
* unknown calendar → ``calendar ... not found in graph``.
* malformed time bound → ``invalid time bound: ...``.
* inverted bounds → ``valid_from ... is not before
valid_until ...``.
* duplicate era name → ``name ... is already registered``.
"""
if not name or not name.strip():
return envelope(ok=False, error="name is required")
if not calendar or not calendar.strip():
return envelope(ok=False, error="calendar is required")
canonical = name.strip()
calendar_canonical = graph.by_name(calendar.strip())
if calendar_canonical is None:
return envelope(
ok=False,
error=f"calendar {calendar!r} not found in graph "
"(call define_calendar first)",
)
if graph.by_name(canonical) is not None:
return envelope(
ok=False,
error=f"name {canonical!r} is already registered",
)
# Validate time bounds.
try:
vfrom = normalize(start)
except ValueError as exc:
return envelope(ok=False, error=f"invalid time bound: {exc}")
vuntil = None
if end is not None and end not in (None, "", "null", "none"):
try:
vuntil = normalize(end)
except ValueError as exc:
return envelope(ok=False, error=f"invalid time bound: {exc}")
if vfrom is not None and vuntil is not None and not time_in_window(
vfrom, vfrom, vuntil
):
return envelope(
ok=False,
error=f"valid_from {vfrom!r} is not before valid_until {vuntil!r}",
)
graph.add_entity_of_type(canonical, "Era")
# PART_OF Calendar
graph.add(Edge(
subject=canonical,
relation="PART_OF",
object=calendar_canonical,
valid_from=vfrom,
valid_until=vuntil,
sources=["world_builder:define_era"],
))
# PRECEDED the most recent prior era in the same calendar,
# if any. We pick the era with the largest start that's
# strictly less than this era's start.
prior_era: Optional[str] = None
prior_start: Optional[str] = None
for e in graph.edges_for_object(calendar_canonical):
if e.relation != "PART_OF" or e.subject == canonical:
continue
if e.valid_from is None:
continue
if vfrom is not None and e.valid_from is not None:
# Compare atomically via time_model.
from .time_model import _cmp_atoms
cmp = _cmp_atoms(e.valid_from, vfrom)
if cmp is None or cmp >= 0:
continue
if prior_start is None:
prior_start = e.valid_from
prior_era = e.subject
else:
cmp2 = _cmp_atoms(e.valid_from, prior_start)
if cmp2 is not None and cmp2 > 0:
prior_start = e.valid_from
prior_era = e.subject
if prior_era is not None:
graph.add(Edge(
subject=canonical,
relation="PRECEDED",
object=prior_era,
sources=["world_builder:define_era"],
))
return envelope(
ok=True,
data={
"name": canonical,
"calendar": calendar_canonical,
"start": vfrom,
"end": vuntil,
"description": description,
},
)
def define_date(
graph: Graph,
*,
calendar: str,
year: int,
month: Optional[int] = None,
day: Optional[int] = None,
era: Optional[str] = None,
label: Optional[str] = None,
) -> dict:
"""Create a Date node for a point in a calendar.
The canonical time atom is:
* ``{era}.year_{year}`` if an era is given (the era's
name is the prefix)
* ``year_{year}`` if no era is given
* ``{era}.year_{year}.month_{month}`` (or
``year_{year}.month_{month}``) if a month is given
* similarly for ``day``.
Stamps ``INSTANCE_OF Calendar`` and, if an era is given,
``DURING Era`` edges. Idempotent: calling twice with the
same arguments returns the same canonical atom and does
not add a duplicate date node.
Returns the standard envelope. Errors:
* empty calendar → ``calendar is required``.
* unknown calendar → ``calendar ... not found``.
* non-integer / negative year → ``year must be a
non-negative integer``.
* non-integer / out-of-range month → ``month must be in
1..12`` (we don't know the calendar's month count, so
we default to the Gregorian range as a soft sanity
check).
* unknown era → ``era ... not found``.
* era given but year is 0 → ``year must be >= 1 when an
era is given`` (0-year is reserved for era-onwards).
"""
if not calendar or not calendar.strip():
return envelope(ok=False, error="calendar is required")
calendar_canonical = graph.by_name(calendar.strip())
if calendar_canonical is None:
return envelope(
ok=False,
error=f"calendar {calendar!r} not found in graph",
)
if not isinstance(year, int) or isinstance(year, bool) or year < 0:
return envelope(
ok=False,
error="year must be a non-negative integer",
)
if month is not None and (not isinstance(month, int) or isinstance(month, bool) or month < 1 or month > 12):
return envelope(ok=False, error="month must be in 1..12")
if day is not None and (not isinstance(day, int) or isinstance(day, bool) or day < 1 or day > 31):
return envelope(ok=False, error="day must be in 1..31")
era_canonical: Optional[str] = None
if era is not None and era.strip():
era_canonical = graph.by_name(era.strip())
if era_canonical is None:
return envelope(
ok=False,
error=f"era {era!r} not found in graph",
)
if year < 1:
return envelope(
ok=False,
error="year must be >= 1 when an era is given",
)
# Canonical time atom.
parts: list[str] = []
if era_canonical is not None:
parts.append(era_canonical)
parts.append(f"year_{year}")
if month is not None:
parts.append(f"month_{month}")
if day is not None:
parts.append(f"day_{day}")
canonical = ".".join(parts)
# Idempotency: if the date node already exists, return it.
if graph.by_name(canonical) is not None:
return envelope(
ok=True,
data={
"canonical": canonical,
"calendar": calendar_canonical,
"year": year,
"month": month,
"day": day,
"era": era_canonical,
"label": label,
},
)
graph.add_entity_of_type(canonical, "Date")
# INSTANCE_OF Calendar
graph.add(Edge(
subject=canonical,
relation="INSTANCE_OF",
object=calendar_canonical,
sources=["world_builder:define_date"],
))
if era_canonical is not None:
graph.add(Edge(
subject=canonical,
relation="DURING",
object=era_canonical,
sources=["world_builder:define_date"],
))
return envelope(
ok=True,
data={
"canonical": canonical,
"calendar": calendar_canonical,
"year": year,
"month": month,
"day": day,
"era": era_canonical,
"label": label,
},
)
__all__ = [
"add_entity",
"add_lore_source",
"add_relation",
"ALLOWED_LABELS",
# --- Slice 10.1: deferred write tools ---
"set_alias",
"update_entity",
"delete_entity",
# --- Slice 10.2: deferred write tools ---
"retcon",
"mark_verified",
"merge_entities",
# --- Slice 10.3: deferred write tools ---
"define_calendar",
"define_era",
"define_date",
]