Implements the read + write surface of Neo4jGraph against the
reified :Relation shape (ADR 0009). The read tools (slice 4) and
the consistency runner / ontology rules (slice 2) are migrated
to use only GraphBackend Protocol methods, so the same Python
code works against both InMemoryGraph and Neo4jGraph.
Reads (Neo4jGraph):
* edges_for_subject(name, relation=None) -> list[Edge]
* edges_for_object(name) -> list[Edge]
* find_edge_by_id(edge_id) -> Edge | None
* by_name, all_names, all_entity_types, entities_of_type,
lore_source (slice 5.3)
* Round-trip: each Edge field is stored as a :Relation node
property and rehydrated on read; Cypher ORDER BY edge_id
so list order matches the in-memory insertion order
Writes (Neo4jGraph):
* add(edge): MERGE subject + object :Entity nodes, upsert
:Relation (id-keyed), link :FROM/:TO, link :SOURCED_FROM
to each :LoreSource in the edge's sources list
* replace_edge(old_id, new_edge): in-place property update
for same (subject, relation, object); drop+re-add for
different endpoints (preserves edge_id for retcon audit)
* remove_entity(name): DETACH DELETE the :Entity + alias
cleanup; returns the number of edges that were attached
* remove_entity_of_type(name, type_): REMOVE n:Label
* rename_entity(old, new): rename + register old as alias
* resolve_alias, register_name, register_alias, add_lore_source,
add_entity_of_type (slice 5.3)
Migrations (read tools + consistency + ontology):
* tools.py: was_true_at uses graph.edges_for_subject(...)
* read_tools.py: 22 sites of graph.edges_by_subject.get /
.items / .values / graph.edges_by_object.get / graph.entities_by_type
.items / graph.lore_sources.get / graph.names migrated to the
Protocol methods
* consistency_runner.py: 4 sites (all_edges flatten,
anachronism detector, orphan detector)
* ontology_rules.py: 13 sites (10 ontology rules + helper)
* write_tools.py: 3 sites (label membership check, era walk)
CI fence (test_graph_backend_writes.py):
test_no_direct_dict_access_outside_graph_backend now greps for
the broader pattern (bracket, .get, .items, .values, .keys on
graph.edges_by_*, entities_by_type, lore_sources, aliases; and
bare graph.names). Fails the build on regression.
Parity tests (test_neo4j_read_tools_parity.py): 15 docker-gated
tests, one per read tool, asserting the in-memory and Neo4j
backends produce matching answers for a known fixture.
Suite: 596 -> 611 passed (+15 parity tests, 559 baseline preserved)
883 lines
31 KiB
Python
883 lines
31 KiB
Python
"""Lore Engine POC — write-side tools (slice 4.7).
|
|
|
|
The minimal world-builder surface. Three tools: add_entity,
|
|
add_relation, add_lore_source. Each returns the standard
|
|
``{ok, data}`` / ``{ok, error}`` envelope from
|
|
:mod:`lore_engine_poc.responses`.
|
|
|
|
Out of scope for slice 4.7 (deferred to later sub-slices):
|
|
* update_entity, delete_entity, retcon, mark_verified,
|
|
merge_entities, set_alias
|
|
* define_era, define_calendar, define_date
|
|
* bulk / batched writes
|
|
* persistence to disk — slice 4.7 writes are in-memory only;
|
|
the next ``01_ingest.py`` rebuilds from codex.
|
|
|
|
All three functions mutate the graph in place. Returns the
|
|
envelope for transport.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from .parsers import LoreSource
|
|
from .responses import envelope
|
|
from .time_model import normalize, time_in_window
|
|
from .tools import Edge, Graph, replace
|
|
|
|
|
|
def _new_id(prefix: str) -> str:
|
|
"""Stable-ish id for write tool responses."""
|
|
return f"{prefix}-{uuid.uuid4().hex[:8]}"
|
|
|
|
|
|
# Allowlisted labels the world-builder can create. Anything
|
|
# outside the list returns an error envelope — this is the
|
|
# minimum structural contract that prevents the LLM from
|
|
# minting a 37th label.
|
|
ALLOWED_LABELS = frozenset({
|
|
"Person", "Faction", "Location", "Creature", "Item",
|
|
"Title", "Language", "Deity", "Spell", "Material",
|
|
"Era", "Calendar", "Lineage", "Culture", "MagicSystem",
|
|
"Region", "Plane", "Setting",
|
|
"NPC", "PC", "Human",
|
|
"Date",
|
|
"DomainEntity", "Relation", "TypeTemplate",
|
|
"LoreSource",
|
|
"Contradiction", "Anachronism", "Orphan", "OntologyViolation",
|
|
})
|
|
|
|
|
|
def add_entity(
|
|
graph: Graph,
|
|
label: str,
|
|
name: str,
|
|
properties: Optional[dict] = None,
|
|
) -> dict:
|
|
"""Create a node tagged with ``label`` and ``name``.
|
|
|
|
The function doesn't store the ``properties`` payload in
|
|
the in-memory graph (slice 4.7 has no schema registry
|
|
yet); it only registers the name+label so future reads
|
|
can find it. ``properties`` is reserved for the slice-4.7
|
|
follow-up that introduces TypeTemplate-driven storage.
|
|
|
|
Returns ``{"ok": True, "data": {"id", "name", "label"}}``
|
|
on success, ``{"ok": False, "error": ...}`` if ``label``
|
|
isn't allowlisted or ``name`` is empty.
|
|
"""
|
|
if not name or not name.strip():
|
|
return envelope(ok=False, error="entity name is required")
|
|
if label not in ALLOWED_LABELS:
|
|
return envelope(
|
|
ok=False,
|
|
error=f"label {label!r} is not in the allowed list ({len(ALLOWED_LABELS)} labels; see 01-ontology.md)",
|
|
)
|
|
canonical = name.strip()
|
|
graph.add_entity_of_type(canonical, label)
|
|
return envelope(
|
|
ok=True,
|
|
data={"id": _new_id("ent"), "name": canonical, "label": label},
|
|
)
|
|
|
|
|
|
def add_relation(
|
|
graph: Graph,
|
|
from_name: str,
|
|
relation: str,
|
|
to_name: str,
|
|
valid_from: Optional[str] = None,
|
|
valid_until: Optional[str] = None,
|
|
) -> dict:
|
|
"""Append a time-bounded edge to the graph.
|
|
|
|
``valid_from`` / ``valid_until`` are canonical time atoms
|
|
(parsed by :func:`lore_engine_poc.time_model.normalize`).
|
|
Malformed bounds return an error envelope rather than
|
|
raising. ``relation`` is free-form — slice 4.7 doesn't
|
|
enforce an edge-type allowlist (that's a slice-5 TypeTemplate
|
|
concern).
|
|
"""
|
|
if not relation or not relation.strip():
|
|
return envelope(ok=False, error="relation type is required")
|
|
# Validate the time bounds (if given). Empty-string and
|
|
# "null" / "none" are accepted as "no bound".
|
|
try:
|
|
vfrom = normalize(valid_from) if valid_from not in (None, "", "null", "none") else None
|
|
vuntil = normalize(valid_until) if valid_until not in (None, "", "null", "none") else None
|
|
except ValueError as exc:
|
|
return envelope(ok=False, error=f"invalid time bound: {exc}")
|
|
# Confirm a basic sanity: from < until when both are bounded.
|
|
if vfrom is not None and vuntil is not None and not time_in_window(vfrom, vfrom, vuntil):
|
|
# The "until" is strictly before "from" — invalid.
|
|
return envelope(
|
|
ok=False,
|
|
error=f"valid_from {vfrom!r} is not before valid_until {vuntil!r}",
|
|
)
|
|
from_canonical = graph.by_name(from_name) or from_name.strip()
|
|
to_canonical = graph.by_name(to_name) or to_name.strip()
|
|
edge = Edge(
|
|
subject=from_canonical,
|
|
relation=relation,
|
|
object=to_canonical,
|
|
valid_from=vfrom,
|
|
valid_until=vuntil,
|
|
sources=["world_builder:add_relation"],
|
|
)
|
|
graph.add(edge)
|
|
return envelope(
|
|
ok=True,
|
|
data={
|
|
# Slice 10.2 — return the *actual* edge id so the
|
|
# world-builder can immediately call ``retcon`` /
|
|
# ``mark_verified`` on this edge. The pre-10.2
|
|
# response synthesised a separate uuid; that's
|
|
# dropped because the Edge's ``edge_id`` is the
|
|
# stable identifier for the slice-10.2 tools.
|
|
"edge_id": edge.edge_id,
|
|
"subject": from_canonical,
|
|
"relation": relation,
|
|
"object": to_canonical,
|
|
"valid_from": vfrom,
|
|
"valid_until": vuntil,
|
|
},
|
|
)
|
|
|
|
|
|
def add_lore_source(
|
|
graph: Graph,
|
|
title: str,
|
|
source_type: str,
|
|
content: str = "",
|
|
author: Optional[str] = None,
|
|
) -> dict:
|
|
"""Create a :class:`LoreSource` node + register it in the graph.
|
|
|
|
The function does **not** chunk / embed the content (the
|
|
LLM extraction path is slice 3). It only records the
|
|
source's metadata so the ``lore_about`` and consistency
|
|
tools can find it.
|
|
|
|
``source_type`` is one of ``prose`` / ``timeline`` /
|
|
``family_tree`` / ``gazetteer`` / ``bestiary`` / ``culture``
|
|
/ ``magic_system`` / ``dialogue`` per the ``LoreSource``
|
|
contract.
|
|
"""
|
|
if not title or not title.strip():
|
|
return envelope(ok=False, error="title is required")
|
|
allowed_types = {"prose", "timeline", "family_tree", "gazetteer",
|
|
"bestiary", "culture", "magic_system", "dialogue"}
|
|
if source_type not in allowed_types:
|
|
return envelope(
|
|
ok=False,
|
|
error=f"source_type {source_type!r} not in {sorted(allowed_types)}",
|
|
)
|
|
# Synthesise a path so the source has a stable key in the
|
|
# ``lore_sources`` side index. We don't write to disk in
|
|
# slice 4.7; the path is an in-memory id.
|
|
path = f"world_builder/{_new_id('src')}.md"
|
|
source = LoreSource(
|
|
path=path,
|
|
name=title.strip(),
|
|
source_type=source_type,
|
|
reliability="world_builder", # slice 4.7 stamps a custom reliability
|
|
source_confidence=1.0,
|
|
)
|
|
graph.add_lore_source(source)
|
|
# ``add_lore_source`` registers the title in the names set
|
|
# so ``lookup`` / ``lore_about`` can find it.
|
|
return envelope(
|
|
ok=True,
|
|
data={
|
|
"id": path,
|
|
"title": title.strip(),
|
|
"source_type": source_type,
|
|
"author": author,
|
|
"reliability": source.reliability,
|
|
},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Slice 10 — deferred write tools (subset 1)
|
|
# ---------------------------------------------------------------------------
|
|
#
|
|
# These three were deferred from slice 4.7 (see the module
|
|
# docstring). They follow the same envelope contract as the
|
|
# slice-4.7 writes. Each mutates the in-memory graph; the next
|
|
# ``01_ingest.py`` rebuilds from codex, so writes are not
|
|
# persisted to disk.
|
|
|
|
|
|
def set_alias(
|
|
graph: Graph,
|
|
name: str,
|
|
alias: str,
|
|
) -> dict:
|
|
"""Register ``alias`` as an alternative name for ``name``.
|
|
|
|
The alias is stored on ``graph.aliases`` (a ``name → set``
|
|
map). :func:`Graph.by_name` consults the map, so a lookup
|
|
by the alias returns the canonical entity. Aliases are
|
|
case-sensitive but the resolve path lowercases for
|
|
case-insensitive matching.
|
|
|
|
Returns the standard envelope. Errors:
|
|
* ``name`` not in the graph → ``name not found``.
|
|
* empty / whitespace ``alias`` → ``alias is required``.
|
|
* duplicate alias on the same canonical → ``already
|
|
registered as alias for ...``.
|
|
"""
|
|
canonical = graph.by_name(name)
|
|
if canonical is None:
|
|
return envelope(ok=False, error=f"name {name!r} not found in graph")
|
|
if not alias or not alias.strip():
|
|
return envelope(ok=False, error="alias is required")
|
|
alias_clean = alias.strip()
|
|
existing = graph.aliases.setdefault(canonical, set())
|
|
if alias_clean in existing:
|
|
return envelope(
|
|
ok=False,
|
|
error=f"{alias_clean!r} is already registered as alias for {canonical!r}",
|
|
)
|
|
existing.add(alias_clean)
|
|
return envelope(
|
|
ok=True,
|
|
data={"name": canonical, "alias": alias_clean},
|
|
)
|
|
|
|
|
|
def update_entity(
|
|
graph: Graph,
|
|
name: str,
|
|
*,
|
|
label: Optional[str] = None,
|
|
name_: Optional[str] = None,
|
|
) -> dict:
|
|
"""Update an entity's label and/or canonical name.
|
|
|
|
Pass ``label`` to change the entity's type (must be in
|
|
:data:`ALLOWED_LABELS`). Pass ``name_`` to rename the
|
|
canonical name; all edges that point at the old name are
|
|
re-pointed, and the old name is preserved as an alias of
|
|
the new one.
|
|
|
|
The rename parameter is ``name_`` (with a trailing
|
|
underscore) to avoid colliding with the positional
|
|
``name`` argument; the MCP layer maps the user-facing
|
|
``name`` JSON field to ``name_``.
|
|
|
|
At least one of ``label`` / ``name_`` must be supplied.
|
|
Returns the standard envelope.
|
|
"""
|
|
if label is None and name_ is None:
|
|
return envelope(
|
|
ok=False,
|
|
error="nothing to update: supply label= and/or name=",
|
|
)
|
|
canonical = graph.by_name(name)
|
|
if canonical is None:
|
|
return envelope(ok=False, error=f"name {name!r} not found in graph")
|
|
re_pointed = 0
|
|
new_label = None
|
|
if label is not None:
|
|
if label not in ALLOWED_LABELS:
|
|
return envelope(
|
|
ok=False,
|
|
error=f"label {label!r} is not in the allowed list",
|
|
)
|
|
# Move the name across the type index — drop it from every
|
|
# type bucket first, then add it under the new label.
|
|
# Snapshot the keys before mutating.
|
|
for type_ in list(graph.all_entity_types()):
|
|
graph.remove_entity_of_type(canonical, type_)
|
|
graph.add_entity_of_type(canonical, label)
|
|
new_label = label
|
|
if name_ is not None:
|
|
if not name_.strip():
|
|
return envelope(ok=False, error="name must be non-empty")
|
|
new_clean = name_.strip()
|
|
if new_clean == canonical:
|
|
# No-op rename; surface as a successful no-change.
|
|
return envelope(
|
|
ok=True,
|
|
data={"name": canonical, "edges_repointed": 0, "label": new_label},
|
|
)
|
|
re_pointed = graph.rename_entity(canonical, new_clean)
|
|
canonical = new_clean
|
|
return envelope(
|
|
ok=True,
|
|
data={
|
|
"name": canonical,
|
|
"label": new_label,
|
|
"edges_repointed": re_pointed,
|
|
},
|
|
)
|
|
|
|
|
|
def delete_entity(graph: Graph, name: str) -> dict:
|
|
"""Remove ``name`` and every edge that touches it.
|
|
|
|
Cascades through the graph's reverse indexes so no dangling
|
|
references remain. The entity's aliases are dropped too.
|
|
Returns the standard envelope with ``data.edges_removed``
|
|
so the world-builder can audit the blast radius.
|
|
"""
|
|
canonical = graph.by_name(name)
|
|
if canonical is None:
|
|
return envelope(ok=False, error=f"name {name!r} not found in graph")
|
|
removed = graph.remove_entity(canonical)
|
|
return envelope(
|
|
ok=True,
|
|
data={"name": canonical, "edges_removed": removed},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Slice 10.2 — deferred write tools (subset 2)
|
|
# ---------------------------------------------------------------------------
|
|
#
|
|
# retcon / mark_verified / merge_entities. These mutate existing
|
|
# edges (rather than creating new ones) and need ``edge_id`` for
|
|
# identity. The id reverse index on Graph keeps the cost O(1).
|
|
|
|
|
|
def _now_iso() -> str:
|
|
"""ISO-8601 UTC timestamp with second precision.
|
|
|
|
Stable for the audit log; we don't need microsecond resolution
|
|
and the test suite asserts only that the value is a non-empty
|
|
string. Uses ``datetime`` so tests don't depend on
|
|
``time.time`` mocking."""
|
|
return datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
|
|
|
|
|
def retcon(
|
|
graph: Graph,
|
|
edge_id: str,
|
|
*,
|
|
valid_from: Optional[str] = None,
|
|
valid_until: Optional[str] = None,
|
|
relation: Optional[str] = None,
|
|
object_: Optional[str] = None,
|
|
note: Optional[str] = None,
|
|
) -> dict:
|
|
"""Amend an existing edge's bounds, relation, or object.
|
|
|
|
Loads the edge by id, validates any new time bounds via
|
|
:func:`time_model.normalize`, and stamps ``retcon_at`` /
|
|
``retcon_note`` for the audit log. The mutation is in place
|
|
— the edge object is rebuilt via ``dataclasses.replace`` and
|
|
re-indexed by subject / object / id.
|
|
|
|
At least one of ``valid_from`` / ``valid_until`` /
|
|
``relation`` / ``object_`` / ``note`` must be supplied.
|
|
Empty-string / whitespace-only values are treated as
|
|
"leave that field unchanged."
|
|
|
|
Returns the standard envelope. Errors:
|
|
* ``edge_id`` not in the graph → ``edge ... not found``.
|
|
* malformed time bound → ``invalid time bound: ...``.
|
|
* inverted bounds → ``valid_from ... is not before
|
|
valid_until ...``.
|
|
* nothing to update → ``nothing to update: ...``.
|
|
"""
|
|
if (
|
|
valid_from is None
|
|
and valid_until is None
|
|
and relation is None
|
|
and object_ is None
|
|
and note is None
|
|
):
|
|
return envelope(
|
|
ok=False,
|
|
error="nothing to retcon: supply valid_from= and/or valid_until= "
|
|
"and/or relation= and/or object= and/or note=",
|
|
)
|
|
edge = graph.find_edge_by_id(edge_id)
|
|
if edge is None:
|
|
return envelope(ok=False, error=f"edge {edge_id!r} not found in graph")
|
|
# Validate time bounds (if given).
|
|
try:
|
|
new_from = (
|
|
normalize(valid_from)
|
|
if valid_from not in (None, "", "null", "none")
|
|
else edge.valid_from
|
|
)
|
|
new_until = (
|
|
normalize(valid_until)
|
|
if valid_until not in (None, "", "null", "none")
|
|
else edge.valid_until
|
|
)
|
|
except ValueError as exc:
|
|
return envelope(ok=False, error=f"invalid time bound: {exc}")
|
|
if (
|
|
new_from is not None
|
|
and new_until is not None
|
|
and not time_in_window(new_from, new_from, new_until)
|
|
):
|
|
return envelope(
|
|
ok=False,
|
|
error=f"valid_from {new_from!r} is not before valid_until {new_until!r}",
|
|
)
|
|
new_relation = relation.strip() if relation is not None else edge.relation
|
|
if not new_relation:
|
|
return envelope(ok=False, error="relation must be non-empty")
|
|
new_object = object_.strip() if object_ is not None else edge.object
|
|
if not new_object:
|
|
return envelope(ok=False, error="object must be non-empty")
|
|
# If the object is changing, the subject/object indexes need
|
|
# to move the edge. We rebuild via replace + re-index.
|
|
new_edge = replace(
|
|
edge,
|
|
valid_from=new_from,
|
|
valid_until=new_until,
|
|
relation=new_relation,
|
|
object=new_object,
|
|
retcon_at=_now_iso(),
|
|
retcon_note=note if note is not None else edge.retcon_note,
|
|
)
|
|
# Slice 5.2 — single chokepoint for any edge-identity
|
|
# mutation. ``replace_edge`` handles the in-place swap or
|
|
# the drop-and-re-add depending on whether the new edge's
|
|
# subject / relation / object differs from the old one.
|
|
graph.replace_edge(edge_id, new_edge)
|
|
return envelope(
|
|
ok=True,
|
|
data={
|
|
"edge_id": new_edge.edge_id,
|
|
"subject": new_edge.subject,
|
|
"relation": new_edge.relation,
|
|
"object": new_edge.object,
|
|
"valid_from": new_edge.valid_from,
|
|
"valid_until": new_edge.valid_until,
|
|
"retcon_at": new_edge.retcon_at,
|
|
"retcon_note": new_edge.retcon_note,
|
|
},
|
|
)
|
|
|
|
|
|
def mark_verified(
|
|
graph: Graph,
|
|
edge_id: str,
|
|
*,
|
|
verifier: str,
|
|
note: Optional[str] = None,
|
|
) -> dict:
|
|
"""Record a human verification of an edge's contents.
|
|
|
|
Appends a (1.0, 1.0, "human_verified") source tuple so the
|
|
aggregate confidence floors to 1.0, and stamps
|
|
``verified_by`` / ``verified_at`` / ``verified_note`` for
|
|
the audit log. Doesn't touch ``valid_from`` /
|
|
``valid_until`` — that's ``retcon``'s job.
|
|
|
|
Returns the standard envelope. Errors:
|
|
* ``edge_id`` not in the graph → ``edge ... not found``.
|
|
* empty ``verifier`` → ``verifier is required``.
|
|
"""
|
|
if not verifier or not verifier.strip():
|
|
return envelope(ok=False, error="verifier is required")
|
|
verifier_clean = verifier.strip()
|
|
edge = graph.find_edge_by_id(edge_id)
|
|
if edge is None:
|
|
return envelope(ok=False, error=f"edge {edge_id!r} not found in graph")
|
|
new_edge = replace(
|
|
edge,
|
|
extraction_confidences=list(edge.extraction_confidences) + [1.0],
|
|
source_confidences=list(edge.source_confidences) + [1.0],
|
|
reliabilities=list(edge.reliabilities) + ["human_verified"],
|
|
verified_by=verifier_clean,
|
|
verified_at=_now_iso(),
|
|
verified_note=note if note is not None else edge.verified_note,
|
|
)
|
|
# Swap into the indexes via the slice 5.2 chokepoint.
|
|
graph.replace_edge(edge_id, new_edge)
|
|
return envelope(
|
|
ok=True,
|
|
data={
|
|
"edge_id": new_edge.edge_id,
|
|
"verified_by": new_edge.verified_by,
|
|
"verified_at": new_edge.verified_at,
|
|
"verified_note": new_edge.verified_note,
|
|
},
|
|
)
|
|
|
|
|
|
def merge_entities(graph: Graph, from_name: str, to_name: str) -> dict:
|
|
"""Fold ``from_name`` into ``to_name``.
|
|
|
|
Every edge that has ``from_name`` as subject or object is
|
|
re-pointed to ``to_name``; ``from_name`` is preserved as an
|
|
alias of ``to_name`` and removed from the names set. The
|
|
two names must have the **same** type label (e.g. both
|
|
``Person`` or both ``Faction``) — folding across labels
|
|
would silently drop the type information, which slice 10.2
|
|
refuses.
|
|
|
|
Returns the standard envelope. Errors:
|
|
* ``from_name`` not in the graph → ``name ... not found``.
|
|
* ``to_name`` not in the graph → ``name ... not found``.
|
|
* the two have different labels → ``cannot merge ... with
|
|
different labels: ...``.
|
|
* the two are the same canonical name → no-op success
|
|
with ``edges_repointed == 0``.
|
|
"""
|
|
from_canonical = graph.by_name(from_name)
|
|
if from_canonical is None:
|
|
return envelope(ok=False, error=f"name {from_name!r} not found in graph")
|
|
to_canonical = graph.by_name(to_name)
|
|
if to_canonical is None:
|
|
return envelope(ok=False, error=f"name {to_name!r} not found in graph")
|
|
if from_canonical == to_canonical:
|
|
return envelope(
|
|
ok=True,
|
|
data={"merged": from_canonical, "into": to_canonical, "edges_repointed": 0},
|
|
)
|
|
# Type-label check.
|
|
from_labels = {
|
|
label for label in graph.all_entity_types()
|
|
if from_canonical in graph.entities_of_type(label)
|
|
}
|
|
to_labels = {
|
|
label for label in graph.all_entity_types()
|
|
if to_canonical in graph.entities_of_type(label)
|
|
}
|
|
# Subset check: if ``from``'s labels are a subset of ``to``'s
|
|
# (or empty), it's safe to merge. Otherwise it's a label
|
|
# mismatch we refuse.
|
|
if from_labels and not from_labels.issubset(to_labels):
|
|
return envelope(
|
|
ok=False,
|
|
error=(
|
|
f"cannot merge {from_canonical!r} (labels={sorted(from_labels)}) "
|
|
f"into {to_canonical!r} (labels={sorted(to_labels)}): "
|
|
f"labels differ"
|
|
),
|
|
)
|
|
re_pointed = graph.rename_entity(from_canonical, to_canonical)
|
|
return envelope(
|
|
ok=True,
|
|
data={"merged": from_canonical, "into": to_canonical, "edges_repointed": re_pointed},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Slice 10.3 — deferred write tools (subset 3)
|
|
# ---------------------------------------------------------------------------
|
|
#
|
|
# define_era / define_calendar / define_date. These add the
|
|
# time-domain metadata nodes (Calendar, Era, Date) that the
|
|
# rest of the graph can reference. The canonical time atom
|
|
# returned by ``define_date`` matches the format the rest of
|
|
# the time model uses (``{era}.{unit}``).
|
|
|
|
|
|
def define_calendar(
|
|
graph: Graph,
|
|
name: str,
|
|
*,
|
|
days_per_year: Optional[int] = None,
|
|
months: Optional[int] = None,
|
|
description: Optional[str] = None,
|
|
) -> dict:
|
|
"""Create a Calendar node.
|
|
|
|
Registers the name in the graph and in the ``Calendar``
|
|
type index. ``days_per_year`` and ``months`` are optional
|
|
metadata — when both are absent, the calendar is a
|
|
placeholder (e.g. an unmodelled real-world calendar).
|
|
|
|
Returns the standard envelope. Errors:
|
|
* empty / whitespace name → ``name is required``.
|
|
* duplicate name → ``name ... is already registered``.
|
|
* negative ``days_per_year`` → ``days_per_year must be
|
|
positive``.
|
|
* non-positive ``months`` → ``months must be positive``.
|
|
"""
|
|
if not name or not name.strip():
|
|
return envelope(ok=False, error="name is required")
|
|
canonical = name.strip()
|
|
if graph.by_name(canonical) is not None:
|
|
return envelope(
|
|
ok=False,
|
|
error=f"name {canonical!r} is already registered",
|
|
)
|
|
if days_per_year is not None and days_per_year <= 0:
|
|
return envelope(ok=False, error="days_per_year must be positive")
|
|
if months is not None and months <= 0:
|
|
return envelope(ok=False, error="months must be positive")
|
|
graph.add_entity_of_type(canonical, "Calendar")
|
|
return envelope(
|
|
ok=True,
|
|
data={
|
|
"name": canonical,
|
|
"days_per_year": days_per_year,
|
|
"months": months,
|
|
"description": description,
|
|
},
|
|
)
|
|
|
|
|
|
def define_era(
|
|
graph: Graph,
|
|
name: str,
|
|
*,
|
|
calendar: str,
|
|
start: str,
|
|
end: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
) -> dict:
|
|
"""Create an Era node and link it to a Calendar.
|
|
|
|
Stamps a ``PART_OF`` edge from the era to the calendar.
|
|
If another era already exists for the same calendar, this
|
|
one gets a ``PRECEDED`` edge pointing at the most-recent
|
|
prior era (simple linear ordering; the world-builder can
|
|
override with ``retcon`` later).
|
|
|
|
The era's name doubles as the era-prefix portion of the
|
|
canonical time atom: a date in this era is
|
|
``{name}.year_N`` / ``{name}.age_of_iron.year_N`` etc.
|
|
That naming is enforced by ``define_date`` — the era's
|
|
name is the prefix.
|
|
|
|
Returns the standard envelope. Errors:
|
|
* empty / whitespace name → ``name is required``.
|
|
* empty / whitespace calendar → ``calendar is required``.
|
|
* unknown calendar → ``calendar ... not found in graph``.
|
|
* malformed time bound → ``invalid time bound: ...``.
|
|
* inverted bounds → ``valid_from ... is not before
|
|
valid_until ...``.
|
|
* duplicate era name → ``name ... is already registered``.
|
|
"""
|
|
if not name or not name.strip():
|
|
return envelope(ok=False, error="name is required")
|
|
if not calendar or not calendar.strip():
|
|
return envelope(ok=False, error="calendar is required")
|
|
canonical = name.strip()
|
|
calendar_canonical = graph.by_name(calendar.strip())
|
|
if calendar_canonical is None:
|
|
return envelope(
|
|
ok=False,
|
|
error=f"calendar {calendar!r} not found in graph "
|
|
"(call define_calendar first)",
|
|
)
|
|
if graph.by_name(canonical) is not None:
|
|
return envelope(
|
|
ok=False,
|
|
error=f"name {canonical!r} is already registered",
|
|
)
|
|
# Validate time bounds.
|
|
try:
|
|
vfrom = normalize(start)
|
|
except ValueError as exc:
|
|
return envelope(ok=False, error=f"invalid time bound: {exc}")
|
|
vuntil = None
|
|
if end is not None and end not in (None, "", "null", "none"):
|
|
try:
|
|
vuntil = normalize(end)
|
|
except ValueError as exc:
|
|
return envelope(ok=False, error=f"invalid time bound: {exc}")
|
|
if vfrom is not None and vuntil is not None and not time_in_window(
|
|
vfrom, vfrom, vuntil
|
|
):
|
|
return envelope(
|
|
ok=False,
|
|
error=f"valid_from {vfrom!r} is not before valid_until {vuntil!r}",
|
|
)
|
|
graph.add_entity_of_type(canonical, "Era")
|
|
# PART_OF Calendar
|
|
graph.add(Edge(
|
|
subject=canonical,
|
|
relation="PART_OF",
|
|
object=calendar_canonical,
|
|
valid_from=vfrom,
|
|
valid_until=vuntil,
|
|
sources=["world_builder:define_era"],
|
|
))
|
|
# PRECEDED the most recent prior era in the same calendar,
|
|
# if any. We pick the era with the largest start that's
|
|
# strictly less than this era's start.
|
|
prior_era: Optional[str] = None
|
|
prior_start: Optional[str] = None
|
|
for e in graph.edges_for_object(calendar_canonical):
|
|
if e.relation != "PART_OF" or e.subject == canonical:
|
|
continue
|
|
if e.valid_from is None:
|
|
continue
|
|
if vfrom is not None and e.valid_from is not None:
|
|
# Compare atomically via time_model.
|
|
from .time_model import _cmp_atoms
|
|
cmp = _cmp_atoms(e.valid_from, vfrom)
|
|
if cmp is None or cmp >= 0:
|
|
continue
|
|
if prior_start is None:
|
|
prior_start = e.valid_from
|
|
prior_era = e.subject
|
|
else:
|
|
cmp2 = _cmp_atoms(e.valid_from, prior_start)
|
|
if cmp2 is not None and cmp2 > 0:
|
|
prior_start = e.valid_from
|
|
prior_era = e.subject
|
|
if prior_era is not None:
|
|
graph.add(Edge(
|
|
subject=canonical,
|
|
relation="PRECEDED",
|
|
object=prior_era,
|
|
sources=["world_builder:define_era"],
|
|
))
|
|
return envelope(
|
|
ok=True,
|
|
data={
|
|
"name": canonical,
|
|
"calendar": calendar_canonical,
|
|
"start": vfrom,
|
|
"end": vuntil,
|
|
"description": description,
|
|
},
|
|
)
|
|
|
|
|
|
def define_date(
|
|
graph: Graph,
|
|
*,
|
|
calendar: str,
|
|
year: int,
|
|
month: Optional[int] = None,
|
|
day: Optional[int] = None,
|
|
era: Optional[str] = None,
|
|
label: Optional[str] = None,
|
|
) -> dict:
|
|
"""Create a Date node for a point in a calendar.
|
|
|
|
The canonical time atom is:
|
|
* ``{era}.year_{year}`` if an era is given (the era's
|
|
name is the prefix)
|
|
* ``year_{year}`` if no era is given
|
|
* ``{era}.year_{year}.month_{month}`` (or
|
|
``year_{year}.month_{month}``) if a month is given
|
|
* similarly for ``day``.
|
|
|
|
Stamps ``INSTANCE_OF Calendar`` and, if an era is given,
|
|
``DURING Era`` edges. Idempotent: calling twice with the
|
|
same arguments returns the same canonical atom and does
|
|
not add a duplicate date node.
|
|
|
|
Returns the standard envelope. Errors:
|
|
* empty calendar → ``calendar is required``.
|
|
* unknown calendar → ``calendar ... not found``.
|
|
* non-integer / negative year → ``year must be a
|
|
non-negative integer``.
|
|
* non-integer / out-of-range month → ``month must be in
|
|
1..12`` (we don't know the calendar's month count, so
|
|
we default to the Gregorian range as a soft sanity
|
|
check).
|
|
* unknown era → ``era ... not found``.
|
|
* era given but year is 0 → ``year must be >= 1 when an
|
|
era is given`` (0-year is reserved for era-onwards).
|
|
"""
|
|
if not calendar or not calendar.strip():
|
|
return envelope(ok=False, error="calendar is required")
|
|
calendar_canonical = graph.by_name(calendar.strip())
|
|
if calendar_canonical is None:
|
|
return envelope(
|
|
ok=False,
|
|
error=f"calendar {calendar!r} not found in graph",
|
|
)
|
|
if not isinstance(year, int) or isinstance(year, bool) or year < 0:
|
|
return envelope(
|
|
ok=False,
|
|
error="year must be a non-negative integer",
|
|
)
|
|
if month is not None and (not isinstance(month, int) or isinstance(month, bool) or month < 1 or month > 12):
|
|
return envelope(ok=False, error="month must be in 1..12")
|
|
if day is not None and (not isinstance(day, int) or isinstance(day, bool) or day < 1 or day > 31):
|
|
return envelope(ok=False, error="day must be in 1..31")
|
|
era_canonical: Optional[str] = None
|
|
if era is not None and era.strip():
|
|
era_canonical = graph.by_name(era.strip())
|
|
if era_canonical is None:
|
|
return envelope(
|
|
ok=False,
|
|
error=f"era {era!r} not found in graph",
|
|
)
|
|
if year < 1:
|
|
return envelope(
|
|
ok=False,
|
|
error="year must be >= 1 when an era is given",
|
|
)
|
|
# Canonical time atom.
|
|
parts: list[str] = []
|
|
if era_canonical is not None:
|
|
parts.append(era_canonical)
|
|
parts.append(f"year_{year}")
|
|
if month is not None:
|
|
parts.append(f"month_{month}")
|
|
if day is not None:
|
|
parts.append(f"day_{day}")
|
|
canonical = ".".join(parts)
|
|
# Idempotency: if the date node already exists, return it.
|
|
if graph.by_name(canonical) is not None:
|
|
return envelope(
|
|
ok=True,
|
|
data={
|
|
"canonical": canonical,
|
|
"calendar": calendar_canonical,
|
|
"year": year,
|
|
"month": month,
|
|
"day": day,
|
|
"era": era_canonical,
|
|
"label": label,
|
|
},
|
|
)
|
|
graph.add_entity_of_type(canonical, "Date")
|
|
# INSTANCE_OF Calendar
|
|
graph.add(Edge(
|
|
subject=canonical,
|
|
relation="INSTANCE_OF",
|
|
object=calendar_canonical,
|
|
sources=["world_builder:define_date"],
|
|
))
|
|
if era_canonical is not None:
|
|
graph.add(Edge(
|
|
subject=canonical,
|
|
relation="DURING",
|
|
object=era_canonical,
|
|
sources=["world_builder:define_date"],
|
|
))
|
|
return envelope(
|
|
ok=True,
|
|
data={
|
|
"canonical": canonical,
|
|
"calendar": calendar_canonical,
|
|
"year": year,
|
|
"month": month,
|
|
"day": day,
|
|
"era": era_canonical,
|
|
"label": label,
|
|
},
|
|
)
|
|
|
|
|
|
__all__ = [
|
|
"add_entity",
|
|
"add_lore_source",
|
|
"add_relation",
|
|
"ALLOWED_LABELS",
|
|
# --- Slice 10.1: deferred write tools ---
|
|
"set_alias",
|
|
"update_entity",
|
|
"delete_entity",
|
|
# --- Slice 10.2: deferred write tools ---
|
|
"retcon",
|
|
"mark_verified",
|
|
"merge_entities",
|
|
# --- Slice 10.3: deferred write tools ---
|
|
"define_calendar",
|
|
"define_era",
|
|
"define_date",
|
|
] |