212 lines
8.0 KiB
Python
212 lines
8.0 KiB
Python
"""Time model for the Lore Engine POC.
|
|
|
|
Implements ``time_in_window(at_time, valid_from, valid_until)`` in pure Python.
|
|
A UDF-shaped helper that matches the spec in ``docs/02-time-model.md`` lines 15-99
|
|
of the parent lore-engine design.
|
|
|
|
Canonical time format: ``{era}.{unit}``
|
|
- ``3rd_age.year_345`` — year granularity
|
|
- ``3rd_age.age_of_iron.year_3`` — sub-era
|
|
- ``3rd_age`` — era-only (matches any descendant)
|
|
- ``current`` — reserved token (resolved by caller)
|
|
- ``null`` / ``None`` / ``""`` — open-ended bound
|
|
|
|
Era hierarchy is represented as dotted prefixes. The helper uses
|
|
prefix matching to handle ``3rd_age`` matching ``3rd_age.year_345``
|
|
and ``3rd_age.age_of_iron`` matching ``3rd_age.age_of_iron.year_3``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
|
|
# Reserved tokens
|
|
CURRENT = "current"
|
|
NULL_TOKENS = {None, "", "null", "none", "∞", "infinity"}
|
|
|
|
|
|
def normalize(t: Optional[str]) -> Optional[str]:
|
|
"""Normalize a time string. Returns None for open-ended, 'current' for the
|
|
reserved token, or a lowercased canonical string. Raises ValueError on a
|
|
non-empty string that doesn't look like a time atom (e.g. ``"foo"``).
|
|
|
|
The format is loose: lowercase letters, digits, underscores, and dots
|
|
are allowed; whitespace, punctuation, and other characters are
|
|
rejected. This catches typos like ``"3rd age year 345"`` early
|
|
instead of silently producing False from ``time_in_window``.
|
|
"""
|
|
if t is None:
|
|
return None
|
|
s = str(t).strip()
|
|
if not s:
|
|
return None
|
|
low = s.lower()
|
|
if low in {"null", "none", "∞", "infinity"}:
|
|
return None
|
|
if low == "current":
|
|
return "current"
|
|
# Time atoms are lowercase letters, digits, underscores, dots only.
|
|
if not all(c.islower() or c.isdigit() or c in "._" for c in low):
|
|
raise ValueError(f"not a valid time atom: {t!r}")
|
|
return low
|
|
|
|
|
|
def is_descendant(child: Optional[str], ancestor: Optional[str]) -> bool:
|
|
"""True if ``child`` is ``ancestor`` or a descendant in the era tree.
|
|
|
|
Examples:
|
|
is_descendant("3rd_age.year_345", "3rd_age") -> True
|
|
is_descendant("3rd_age.age_of_iron", "3rd_age") -> True
|
|
is_descendant("3rd_age.age_of_iron.year_3", "3rd_age.age_of_iron") -> True
|
|
is_descendant("2nd_age.year_5", "3rd_age") -> False
|
|
"""
|
|
if child is None or ancestor is None:
|
|
return False
|
|
if child == ancestor:
|
|
return True
|
|
return child.startswith(ancestor + ".")
|
|
|
|
|
|
def _cmp_atoms(a: str, b: str) -> int:
|
|
"""Compare two time atoms by era-then-year structure.
|
|
|
|
Splits on dots, then for each segment tries to extract a numeric tail
|
|
(e.g. ``year_345`` -> 345) so that ``year_9 < year_10`` works.
|
|
Falls back to lexical compare when neither side parses.
|
|
"""
|
|
a_parts = a.split(".")
|
|
b_parts = b.split(".")
|
|
for ap, bp in zip(a_parts, b_parts):
|
|
an = _trailing_int(ap)
|
|
bn = _trailing_int(bp)
|
|
if an is not None and bn is not None and ap.split("_")[0] == bp.split("_")[0]:
|
|
if an != bn:
|
|
return -1 if an < bn else 1
|
|
elif ap != bp:
|
|
return -1 if ap < bp else 1
|
|
if len(a_parts) != len(b_parts):
|
|
return -1 if len(a_parts) < len(b_parts) else 1
|
|
return 0
|
|
|
|
|
|
def _trailing_int(s: str) -> Optional[int]:
|
|
"""Return the trailing integer of ``s`` if one is present."""
|
|
i = 0
|
|
while i < len(s) and not s[i].isdigit():
|
|
i += 1
|
|
if i == len(s):
|
|
return None
|
|
tail = s[i:]
|
|
try:
|
|
return int(tail)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _in_window_open(at: str, hi: str) -> bool:
|
|
"""at < hi, treating era hierarchy as containment.
|
|
|
|
Semantics: at is "before" hi if either
|
|
(a) at == hi (treat as not before — exclusive), or
|
|
(b) hi is an ancestor of at (at is inside a capped era -> false), or
|
|
(c) at is an ancestor of hi (at is coarser than hi -> true), or
|
|
(d) structural compare: at < hi.
|
|
"""
|
|
if at == hi:
|
|
return False
|
|
if is_descendant(at, hi):
|
|
return False
|
|
if is_descendant(hi, at):
|
|
return True
|
|
return _cmp_atoms(at, hi) < 0
|
|
|
|
|
|
def time_in_window(
|
|
at_time: Optional[str],
|
|
valid_from: Optional[str],
|
|
valid_until: Optional[str],
|
|
current_time: Optional[str] = None,
|
|
) -> bool:
|
|
"""Is ``at_time`` inside the half-open window ``[valid_from, valid_until)``?
|
|
|
|
Open-ended bounds (None) extend to infinity in that direction.
|
|
``current`` is resolved against the caller's ``current_time`` argument.
|
|
Era-tree membership: a coarse valid_from like ``3rd_age`` accepts any
|
|
descendant time (``3rd_age.year_345``, ``3rd_age.age_of_iron.year_3``).
|
|
|
|
Args:
|
|
at_time: The time being tested. ``"current"`` is allowed.
|
|
valid_from: Inclusive lower bound. None = -infinity.
|
|
valid_until: Exclusive upper bound. None = +infinity.
|
|
current_time: The world clock value. Required iff ``at_time``,
|
|
``valid_from``, or ``valid_until`` is ``"current"``.
|
|
|
|
Returns:
|
|
True if ``at_time`` falls inside the window.
|
|
"""
|
|
at = normalize(at_time)
|
|
lo = normalize(valid_from)
|
|
hi = normalize(valid_until)
|
|
|
|
# Resolve "current" tokens using the world clock.
|
|
def resolve(t: Optional[str]) -> Optional[str]:
|
|
if t == "current":
|
|
if current_time is None:
|
|
raise ValueError("current_time required when resolving 'current'")
|
|
return normalize(current_time)
|
|
return t
|
|
|
|
at = resolve(at)
|
|
lo = resolve(lo)
|
|
hi = resolve(hi)
|
|
|
|
if at is None:
|
|
return lo is None and hi is None
|
|
|
|
if lo is not None:
|
|
if at == lo:
|
|
pass # inclusive
|
|
elif is_descendant(at, lo):
|
|
pass # at is inside the era lo defines
|
|
elif is_descendant(lo, at):
|
|
# at is coarser than lo. Conservatively accept: if the era that
|
|
# lo points to is a descendant of the era at points to, the two
|
|
# refer to the same time hierarchy. The caller asked about a
|
|
# coarse label that contains the lower bound.
|
|
pass
|
|
elif _cmp_atoms(at, lo) < 0:
|
|
return False
|
|
if hi is not None:
|
|
if not _in_window_open(at, hi):
|
|
return False
|
|
return True
|
|
|
|
|
|
# A handful of self-tests run on import if executed as a script.
|
|
if __name__ == "__main__":
|
|
cases = [
|
|
# (at, lo, hi, current, expected, description)
|
|
("3rd_age.year_345", "3rd_age.year_340", "3rd_age.year_352", None, True, "year inside year window"),
|
|
("3rd_age.year_352", "3rd_age.year_340", "3rd_age.year_352", None, False, "year at exclusive upper"),
|
|
("3rd_age.year_340", "3rd_age.year_340", "3rd_age.year_352", None, True, "year at inclusive lower"),
|
|
("3rd_age", "3rd_age.year_1", "3rd_age.year_999", None, True, "era ancestor of lo"),
|
|
("3rd_age.year_5", "3rd_age", "3rd_age.year_10", None, True, "at is descendant of lo"),
|
|
("3rd_age.age_of_iron.year_3", "3rd_age.age_of_iron.year_1", "3rd_age.age_of_iron.year_5", None, True, "sub-era window"),
|
|
("3rd_age.age_of_iron.year_6", "3rd_age.age_of_iron.year_1", "3rd_age.age_of_iron.year_5", None, False, "sub-era past upper"),
|
|
(None, "3rd_age.year_1", "3rd_age.year_10", None, False, "open at, bounded"),
|
|
("3rd_age.year_5", None, "3rd_age.year_10", None, True, "open lower"),
|
|
("3rd_age.year_50", "3rd_age.year_10", None, None, True, "open upper"),
|
|
("current", "3rd_age.year_300", "3rd_age.year_400", "3rd_age.year_345", True, "current inside"),
|
|
("current", "3rd_age.year_300", "3rd_age.year_400", "3rd_age.year_500", False, "current outside"),
|
|
("2nd_age.year_5", "3rd_age", "4th_age", None, False, "different era"),
|
|
]
|
|
fail = 0
|
|
for at, lo, hi, cur, exp, desc in cases:
|
|
got = time_in_window(at, lo, hi, cur)
|
|
ok = got == exp
|
|
if not ok:
|
|
fail += 1
|
|
print(f" {'OK' if ok else 'FAIL'} {desc}: got={got} want={exp}")
|
|
print(f"\n{len(cases) - fail}/{len(cases)} passed")
|
|
raise SystemExit(1 if fail else 0)
|