Files

212 lines
8.0 KiB
Python

"""Time model for the Lore Engine POC.
Implements ``time_in_window(at_time, valid_from, valid_until)`` in pure Python.
A UDF-shaped helper that matches the spec in ``docs/02-time-model.md`` lines 15-99
of the parent lore-engine design.
Canonical time format: ``{era}.{unit}``
- ``3rd_age.year_345`` — year granularity
- ``3rd_age.age_of_iron.year_3`` — sub-era
- ``3rd_age`` — era-only (matches any descendant)
- ``current`` — reserved token (resolved by caller)
- ``null`` / ``None`` / ``""`` — open-ended bound
Era hierarchy is represented as dotted prefixes. The helper uses
prefix matching to handle ``3rd_age`` matching ``3rd_age.year_345``
and ``3rd_age.age_of_iron`` matching ``3rd_age.age_of_iron.year_3``.
"""
from __future__ import annotations
from typing import Optional
# Reserved tokens
CURRENT = "current"
NULL_TOKENS = {None, "", "null", "none", "", "infinity"}
def normalize(t: Optional[str]) -> Optional[str]:
"""Normalize a time string. Returns None for open-ended, 'current' for the
reserved token, or a lowercased canonical string. Raises ValueError on a
non-empty string that doesn't look like a time atom (e.g. ``"foo"``).
The format is loose: lowercase letters, digits, underscores, and dots
are allowed; whitespace, punctuation, and other characters are
rejected. This catches typos like ``"3rd age year 345"`` early
instead of silently producing False from ``time_in_window``.
"""
if t is None:
return None
s = str(t).strip()
if not s:
return None
low = s.lower()
if low in {"null", "none", "", "infinity"}:
return None
if low == "current":
return "current"
# Time atoms are lowercase letters, digits, underscores, dots only.
if not all(c.islower() or c.isdigit() or c in "._" for c in low):
raise ValueError(f"not a valid time atom: {t!r}")
return low
def is_descendant(child: Optional[str], ancestor: Optional[str]) -> bool:
"""True if ``child`` is ``ancestor`` or a descendant in the era tree.
Examples:
is_descendant("3rd_age.year_345", "3rd_age") -> True
is_descendant("3rd_age.age_of_iron", "3rd_age") -> True
is_descendant("3rd_age.age_of_iron.year_3", "3rd_age.age_of_iron") -> True
is_descendant("2nd_age.year_5", "3rd_age") -> False
"""
if child is None or ancestor is None:
return False
if child == ancestor:
return True
return child.startswith(ancestor + ".")
def _cmp_atoms(a: str, b: str) -> int:
"""Compare two time atoms by era-then-year structure.
Splits on dots, then for each segment tries to extract a numeric tail
(e.g. ``year_345`` -> 345) so that ``year_9 < year_10`` works.
Falls back to lexical compare when neither side parses.
"""
a_parts = a.split(".")
b_parts = b.split(".")
for ap, bp in zip(a_parts, b_parts):
an = _trailing_int(ap)
bn = _trailing_int(bp)
if an is not None and bn is not None and ap.split("_")[0] == bp.split("_")[0]:
if an != bn:
return -1 if an < bn else 1
elif ap != bp:
return -1 if ap < bp else 1
if len(a_parts) != len(b_parts):
return -1 if len(a_parts) < len(b_parts) else 1
return 0
def _trailing_int(s: str) -> Optional[int]:
"""Return the trailing integer of ``s`` if one is present."""
i = 0
while i < len(s) and not s[i].isdigit():
i += 1
if i == len(s):
return None
tail = s[i:]
try:
return int(tail)
except ValueError:
return None
def _in_window_open(at: str, hi: str) -> bool:
"""at < hi, treating era hierarchy as containment.
Semantics: at is "before" hi if either
(a) at == hi (treat as not before — exclusive), or
(b) hi is an ancestor of at (at is inside a capped era -> false), or
(c) at is an ancestor of hi (at is coarser than hi -> true), or
(d) structural compare: at < hi.
"""
if at == hi:
return False
if is_descendant(at, hi):
return False
if is_descendant(hi, at):
return True
return _cmp_atoms(at, hi) < 0
def time_in_window(
at_time: Optional[str],
valid_from: Optional[str],
valid_until: Optional[str],
current_time: Optional[str] = None,
) -> bool:
"""Is ``at_time`` inside the half-open window ``[valid_from, valid_until)``?
Open-ended bounds (None) extend to infinity in that direction.
``current`` is resolved against the caller's ``current_time`` argument.
Era-tree membership: a coarse valid_from like ``3rd_age`` accepts any
descendant time (``3rd_age.year_345``, ``3rd_age.age_of_iron.year_3``).
Args:
at_time: The time being tested. ``"current"`` is allowed.
valid_from: Inclusive lower bound. None = -infinity.
valid_until: Exclusive upper bound. None = +infinity.
current_time: The world clock value. Required iff ``at_time``,
``valid_from``, or ``valid_until`` is ``"current"``.
Returns:
True if ``at_time`` falls inside the window.
"""
at = normalize(at_time)
lo = normalize(valid_from)
hi = normalize(valid_until)
# Resolve "current" tokens using the world clock.
def resolve(t: Optional[str]) -> Optional[str]:
if t == "current":
if current_time is None:
raise ValueError("current_time required when resolving 'current'")
return normalize(current_time)
return t
at = resolve(at)
lo = resolve(lo)
hi = resolve(hi)
if at is None:
return lo is None and hi is None
if lo is not None:
if at == lo:
pass # inclusive
elif is_descendant(at, lo):
pass # at is inside the era lo defines
elif is_descendant(lo, at):
# at is coarser than lo. Conservatively accept: if the era that
# lo points to is a descendant of the era at points to, the two
# refer to the same time hierarchy. The caller asked about a
# coarse label that contains the lower bound.
pass
elif _cmp_atoms(at, lo) < 0:
return False
if hi is not None:
if not _in_window_open(at, hi):
return False
return True
# A handful of self-tests run on import if executed as a script.
if __name__ == "__main__":
cases = [
# (at, lo, hi, current, expected, description)
("3rd_age.year_345", "3rd_age.year_340", "3rd_age.year_352", None, True, "year inside year window"),
("3rd_age.year_352", "3rd_age.year_340", "3rd_age.year_352", None, False, "year at exclusive upper"),
("3rd_age.year_340", "3rd_age.year_340", "3rd_age.year_352", None, True, "year at inclusive lower"),
("3rd_age", "3rd_age.year_1", "3rd_age.year_999", None, True, "era ancestor of lo"),
("3rd_age.year_5", "3rd_age", "3rd_age.year_10", None, True, "at is descendant of lo"),
("3rd_age.age_of_iron.year_3", "3rd_age.age_of_iron.year_1", "3rd_age.age_of_iron.year_5", None, True, "sub-era window"),
("3rd_age.age_of_iron.year_6", "3rd_age.age_of_iron.year_1", "3rd_age.age_of_iron.year_5", None, False, "sub-era past upper"),
(None, "3rd_age.year_1", "3rd_age.year_10", None, False, "open at, bounded"),
("3rd_age.year_5", None, "3rd_age.year_10", None, True, "open lower"),
("3rd_age.year_50", "3rd_age.year_10", None, None, True, "open upper"),
("current", "3rd_age.year_300", "3rd_age.year_400", "3rd_age.year_345", True, "current inside"),
("current", "3rd_age.year_300", "3rd_age.year_400", "3rd_age.year_500", False, "current outside"),
("2nd_age.year_5", "3rd_age", "4th_age", None, False, "different era"),
]
fail = 0
for at, lo, hi, cur, exp, desc in cases:
got = time_in_window(at, lo, hi, cur)
ok = got == exp
if not ok:
fail += 1
print(f" {'OK' if ok else 'FAIL'} {desc}: got={got} want={exp}")
print(f"\n{len(cases) - fail}/{len(cases)} passed")
raise SystemExit(1 if fail else 0)