feat(session): atomicMutate (per-threadId mutex) + Redis key registry (Story 5.2)

sessionManager.atomicMutate(threadId, mutator): atomic read-modify-write
serialized per threadId via an in-process promise chain. Single-process
Node: the only concurrency is across await boundaries in one event loop, so
an in-process per-key mutex prevents lost updates (no Lua/WATCH — swap path
documented for a future multi-instance). update() retained (constant patches
with no concurrent read) but flagged as non-atomic.

Migrate every read-derived / concurrency-sensitive sessionManager.update()
call site to atomicMutate: pendingSkillCheck set/clear (skillCheckEmit,
rollHandler, messageRouter roll-resolve + auto-cancel), pendingSkillCheckAttempts
increment (messageRouter pending-block — now reads current count inside the
mutator so two concurrent pending messages can't both read a stale count),
heldMessages append/clear, players join (re-checks presence inside the
mutator), addMessage history (trim inside the lock), goal_register spec
update, encounter/turn resolve. Closes the TOCTOU windows the group-check
multi-player fan-out will stress.

Add src/db/keys.ts: the Redis key registry (single source for key shapes,
documented owner/TTL/sweep table) — groupcheck/lobby/encounter:active/
character_status/campaign key builders for the upcoming feature stories.

Tests: 432 unit pass (5 new atomicMutate concurrency tests + key-registry
shape test); migrated test mocks to expose atomicMutate; tsc clean.

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2026-06-21 00:09:51 +00:00
parent 9f401692c8
commit 86d4354b51
10 changed files with 217 additions and 84 deletions

View File

@@ -392,11 +392,11 @@ async function handleEnd(interaction: ChatInputCommandInteraction): Promise<void
summary = dmNotes || `Encounter ended by ${interaction.user.username}.`;
}
await sessionManager.update(channel.id, {
await sessionManager.atomicMutate(channel.id, () => ({
phase: 'resolved',
outcome: outcomeId,
outcomeSummary: summary,
});
}));
writeSummary(session, outcomeId, summary);

View File

@@ -64,9 +64,9 @@ export async function replayHeldMessages(
if (held.length === 0) continue;
// Remove held messages before replaying so a crash doesn't double-replay
await sessionManager.update(threadId, {
heldMessages: session.heldMessages.filter(m => m.discordUserId !== userId),
});
await sessionManager.atomicMutate(threadId, s => ({
heldMessages: s.heldMessages.filter(m => m.discordUserId !== userId),
}));
const thread = await client.channels.fetch(threadId).catch(() => null);
if (!thread?.isThread()) continue;
@@ -116,10 +116,10 @@ async function processEncounterMessage(
timestamp: Date.now(),
};
await sessionManager.update(session.threadId, {
await sessionManager.atomicMutate(session.threadId, () => ({
pendingSkillCheck: undefined,
pendingSkillCheckAttempts: undefined,
});
}));
await sessionManager.addMessage(session.threadId, systemMsg);
scheduleEncounterLLMTurn(session.threadId, thread, client, true);
return;
@@ -128,8 +128,9 @@ async function processEncounterMessage(
// ── Player gate
const player = await playerRegistry.get(guildId, userId);
if (!player) {
const held = [...session.heldMessages, { discordUserId: userId, content, timestamp: Date.now() }];
await sessionManager.update(session.threadId, { heldMessages: held });
await sessionManager.atomicMutate(session.threadId, s => ({
heldMessages: [...s.heldMessages, { discordUserId: userId, content, timestamp: Date.now() }],
}));
const gate = buildPlayerGateEmbed();
const sent = await thread.send({ content: `<@${userId}>`, embeds: [gate] });
@@ -139,7 +140,12 @@ async function processEncounterMessage(
// ── Block messages while a dice roll is pending
if (session.pendingSkillCheck) {
const attempts = (session.pendingSkillCheckAttempts ?? 0) + 1;
// Atomically increment the skip counter and read the new value, so two
// concurrent pending-block messages can't both read the same stale count.
const updated = await sessionManager.atomicMutate(session.threadId, s => ({
pendingSkillCheckAttempts: (s.pendingSkillCheckAttempts ?? 0) + 1,
}));
const attempts = updated.pendingSkillCheckAttempts ?? 0;
if (attempts >= PENDING_ROLL_LIMIT) {
// Auto-cancel: disable the embed buttons and inject a FAIL result
@@ -155,16 +161,15 @@ async function processEncounterMessage(
timestamp: Date.now(),
};
await sessionManager.update(session.threadId, {
await sessionManager.atomicMutate(session.threadId, () => ({
pendingSkillCheck: undefined,
pendingSkillCheckAttempts: undefined,
});
}));
await sessionManager.addMessage(session.threadId, failMsg);
scheduleEncounterLLMTurn(session.threadId, thread, client, true);
return;
}
await sessionManager.update(session.threadId, { pendingSkillCheckAttempts: attempts });
const remaining = PENDING_ROLL_LIMIT - attempts;
await thread.send(
`*A roll is still pending! Use the buttons above to roll. (${remaining} message${remaining === 1 ? '' : 's'} left before auto-fail.)*`,
@@ -191,16 +196,16 @@ async function processEncounterMessage(
const playerEntry = charProfile?.pronouns
? { ...player, pronouns: charProfile.pronouns }
: player;
const updatedPlayers = { ...session.players, [userId]: playerEntry };
const joinMsg: ChatMessage = {
role: 'system',
content: `[SESSION] ${player.dndName} has entered the encounter.`,
timestamp: Date.now(),
};
await sessionManager.update(session.threadId, {
players: updatedPlayers,
phase: 'active',
});
await sessionManager.atomicMutate(session.threadId, s => (
s.players[userId]
? { phase: 'active' }
: { players: { ...s.players, [userId]: playerEntry }, phase: 'active' }
));
await sessionManager.addMessage(session.threadId, joinMsg);
}
@@ -393,11 +398,12 @@ export async function runLLMTurn(
}
if (result.resolved) {
await sessionManager.update(session.threadId, {
const { outcomeId, summary: resolvedSummary } = result.resolved;
await sessionManager.atomicMutate(session.threadId, () => ({
phase: 'resolved',
outcome: result.resolved.outcomeId,
outcomeSummary: result.resolved.summary,
});
outcome: outcomeId,
outcomeSummary: resolvedSummary,
}));
setTimeout(async () => {
await (thread as ThreadChannel).setArchived?.(true).catch(() => null);
}, 5_000);

View File

@@ -78,10 +78,10 @@ async function submitResult(
timestamp: Date.now(),
};
await sessionManager.update(session.threadId, {
await sessionManager.atomicMutate(session.threadId, () => ({
pendingSkillCheck: undefined,
pendingSkillCheckAttempts: undefined,
});
}));
await sessionManager.addMessage(session.threadId, systemMsg);
scheduleEncounterLLMTurn(session.threadId, channel, client, true);
}

31
src/db/keys.ts Normal file
View File

@@ -0,0 +1,31 @@
// Redis key registry — the single source for key shapes. Every key the engine
// writes should be built here so the keyspace is documented (prefix, owner, TTL,
// boot-sweep behaviour) and the restart sweep can enumerate via SCAN (never
// KEYS). New group-check / lobby / status stories MUST build their keys via this
// module so the registry stays complete and unowned keys don't proliferate.
//
// Key Owner TTL Boot sweep
// ──────────────────────────────────────── ───────────── ───────── ──────────────────────────────
// session:{threadId} session SESSION_TTL pending-check finalize
// guild_threads:{guildId} session SESSION_TTL —
// characters:{guildId} character — —
// players:{guildId} player — —
// groupcheck:{threadId} group check ~10m finalize (successRule on partial)
// lobby:{threadId} lobby ~30m idle close
// encounter:{threadId}:active encounter ~4h* republish "still active"
// character_status:{guildId}:{discordId} story status ~24h expire (silent drop)
// campaign:{campaignId} campaign — —
//
// * encounter:active TTL is refreshed on activity.
export const KEYS = {
session: (threadId: string) => `session:${threadId}`,
guildThreads: (guildId: string) => `guild_threads:${guildId}`,
characters: (guildId: string) => `characters:${guildId}`,
players: (guildId: string) => `players:${guildId}`,
groupcheck: (threadId: string) => `groupcheck:${threadId}`,
lobby: (threadId: string) => `lobby:${threadId}`,
encounterActive: (threadId: string) => `encounter:${threadId}:active`,
characterStatus: (guildId: string, discordId: string) => `character_status:${guildId}:${discordId}`,
campaign: (campaignId: string) => `campaign:${campaignId}`,
} as const;

View File

@@ -78,27 +78,20 @@ const goalRegister: ToolPlugin = {
};
}
// Append to appropriate list
const updatedGoals = {
...goals,
primary: [...goals.primary],
secondary: [...goals.secondary],
};
const newGoal = { id: finalId, label };
if (isPrimary) {
updatedGoals.primary.push(newGoal);
} else {
updatedGoals.secondary.push(newGoal);
}
const updatedSpec = {
...session.spec,
goals: updatedGoals,
};
// Save spec update to Redis session
await sessionManager.update(session.threadId, { spec: updatedSpec });
// Append to the appropriate goal list and persist atomically — read the
// current spec inside the mutator so a concurrent spec change can't be lost.
await sessionManager.atomicMutate(session.threadId, s => {
const currentGoals = s.spec.goals;
const ug = {
...currentGoals,
primary: [...currentGoals.primary],
secondary: [...currentGoals.secondary],
};
const newGoal = { id: finalId, label };
if (isPrimary) ug.primary.push(newGoal);
else ug.secondary.push(newGoal);
return { spec: { ...s.spec, goals: ug } };
});
return {
systemMessage: `[TOOL] New hidden goal registered on the fly: "${finalId}" (Primary: ${isPrimary}). Label: "${label}"`,

View File

@@ -7,6 +7,14 @@ const SESSION_TTL = 60 * 60 * config.SESSION_TTL_HOURS;
const sessionKey = (threadId: string) => `session:${threadId}`;
const guildThreadsKey = (guildId: string) => `guild_threads:${guildId}`;
// Per-threadId in-process mutex chain. Single-process Node: the only
// concurrency is across `await` boundaries in one event loop, so an
// in-process per-key promise chain serializes read-modify-write per thread
// and prevents lost updates (two interleaved mutations can't both write a
// stale snapshot). Multi-instance future: swap this chain for a Redis Lua
// EVAL / Redlock — `atomicMutate` call sites stay unchanged.
const mutexChains = new Map<string, Promise<void>>();
export const sessionManager = {
async create(threadId: string, state: SessionState): Promise<void> {
const pipe = redis.pipeline();
@@ -22,6 +30,11 @@ export const sessionManager = {
return JSON.parse(raw) as SessionState;
},
// NOTE: `update` is a non-atomic read-modify-write (get → spread → set). It is
// safe only when no `await` separates the read from the write AND no second
// caller can mutate the same session concurrently. For any mutation derived
// from current session state (pendingSkillCheck, players, attempts,
// heldMessages, history), prefer `atomicMutate` — it serializes per thread.
async update(threadId: string, patch: Partial<SessionState>): Promise<void> {
const current = await this.get(threadId);
if (!current) throw new Error(`Session not found: ${threadId}`);
@@ -29,21 +42,42 @@ export const sessionManager = {
await redis.set(sessionKey(threadId), JSON.stringify(updated), 'EX', SESSION_TTL);
},
// Atomic read-modify-write, serialized per threadId. The mutator receives the
// current SessionState and returns a patch (merged shallow, like `update`).
// Returns the persisted state. Use this for any mutation derived from current
// session state (pending checks, player roster, attempt counters, history).
async atomicMutate(
threadId: string,
mutator: (current: SessionState) => (Partial<SessionState> | Promise<Partial<SessionState>>),
): Promise<SessionState> {
const prev = mutexChains.get(threadId) ?? Promise.resolve();
const next = prev.then(async () => {
const current = await this.get(threadId);
if (!current) throw new Error(`Session not found: ${threadId}`);
const patch = await mutator(current);
const updated: SessionState = { ...current, ...patch, updatedAt: Date.now() };
await redis.set(sessionKey(threadId), JSON.stringify(updated), 'EX', SESSION_TTL);
return updated;
});
// Keep the chain alive without surfacing a failure to the next waiter — a
// failed mutate must not block later mutates; each gets its own try.
mutexChains.set(threadId, next.then(() => undefined, () => undefined));
return next;
},
async delete(threadId: string, guildId: string): Promise<void> {
await redis.del(sessionKey(threadId));
await redis.srem(guildThreadsKey(guildId), threadId);
},
async addMessage(threadId: string, msg: ChatMessage): Promise<void> {
const session = await this.get(threadId);
if (!session) throw new Error(`Session not found: ${threadId}`);
const pinned = session.history.filter(m => m.pinned);
const sliding = session.history.filter(m => !m.pinned);
sliding.push(msg);
const trimmed = trimHistory(sliding);
await this.update(threadId, { history: [...pinned, ...trimmed] });
await this.atomicMutate(threadId, s => {
const pinned = s.history.filter(m => m.pinned);
const sliding = s.history.filter(m => !m.pinned);
sliding.push(msg);
const trimmed = trimHistory(sliding);
return { history: [...pinned, ...trimmed] };
});
},
// Returns thread IDs for a guild — used by /dndname set to find held messages.

View File

@@ -1,11 +1,12 @@
import { vi, describe, it, expect, beforeEach } from 'vitest';
const { mockSessionUpdate } = vi.hoisted(() => ({
const { mockSessionUpdate, mockAtomicMutate } = vi.hoisted(() => ({
mockSessionUpdate: vi.fn(),
mockAtomicMutate: vi.fn(),
}));
vi.mock('../../src/session/sessionManager.js', () => ({
sessionManager: { update: mockSessionUpdate },
sessionManager: { update: mockSessionUpdate, atomicMutate: mockAtomicMutate },
}));
import { dispatchTool } from '../../src/harness/toolDispatcher.js';
@@ -19,6 +20,12 @@ function makeThread() {
beforeEach(() => {
vi.clearAllMocks();
// Faithful atomicMutate: run the mutator against the mock session and return
// the merged state, so mutator logic (e.g. goal_register's spec update) runs.
mockAtomicMutate.mockImplementation(async (_tid: string, mutator: (s: any) => any) => {
const patch = await mutator(mockSession);
return { ...mockSession, ...patch };
});
});
describe('dispatchTool — goal_register', () => {
@@ -38,17 +45,13 @@ describe('dispatchTool — goal_register', () => {
);
expect(result.systemMessage).toContain('New hidden goal registered on the fly: "dynamic_bribe_and_recruit" (Primary: true)');
expect(mockSessionUpdate).toHaveBeenCalledWith(
mockSession.threadId,
expect.objectContaining({
spec: expect.objectContaining({
goals: expect.objectContaining({
primary: expect.arrayContaining([
expect.objectContaining({ id: 'dynamic_bribe_and_recruit', label: 'Players bribe and recruit the NPC.' }),
]),
}),
}),
}),
expect(mockAtomicMutate).toHaveBeenCalledWith(mockSession.threadId, expect.any(Function));
const primaryMutator = mockAtomicMutate.mock.calls[0][1] as (s: any) => any;
const primaryPatch = primaryMutator(mockSession) as { spec: { goals: { primary: any[] } } };
expect(primaryPatch.spec.goals.primary).toEqual(
expect.arrayContaining([
expect.objectContaining({ id: 'dynamic_bribe_and_recruit', label: 'Players bribe and recruit the NPC.' }),
]),
);
});
@@ -68,17 +71,13 @@ describe('dispatchTool — goal_register', () => {
);
expect(result.systemMessage).toContain('New hidden goal registered on the fly: "dynamic_escape_via_sewers" (Primary: false)');
expect(mockSessionUpdate).toHaveBeenCalledWith(
mockSession.threadId,
expect.objectContaining({
spec: expect.objectContaining({
goals: expect.objectContaining({
secondary: expect.arrayContaining([
expect.objectContaining({ id: 'dynamic_escape_via_sewers', label: 'Players escape via sewers.' }),
]),
}),
}),
}),
expect(mockAtomicMutate).toHaveBeenCalledWith(mockSession.threadId, expect.any(Function));
const secondaryMutator = mockAtomicMutate.mock.calls[0][1] as (s: any) => any;
const secondaryPatch = secondaryMutator(mockSession) as { spec: { goals: { secondary: any[] } } };
expect(secondaryPatch.spec.goals.secondary).toEqual(
expect.arrayContaining([
expect.objectContaining({ id: 'dynamic_escape_via_sewers', label: 'Players escape via sewers.' }),
]),
);
});
@@ -98,7 +97,7 @@ describe('dispatchTool — goal_register', () => {
);
expect(result.systemMessage).toContain('[TOOL ERROR] Invalid goal ID format: "Bribe NPC!"');
expect(mockSessionUpdate).not.toHaveBeenCalled();
expect(mockAtomicMutate).not.toHaveBeenCalled();
});
it('rejects duplicate goal IDs', async () => {
@@ -130,7 +129,7 @@ describe('dispatchTool — goal_register', () => {
);
expect(result.systemMessage).toContain('[TOOL ERROR] A goal with ID "dynamic_bribe_and_recruit" already exists');
expect(mockSessionUpdate).not.toHaveBeenCalled();
expect(mockAtomicMutate).not.toHaveBeenCalled();
});
it('rejects if the encounter has gone on for too long (history > 20 messages)', async () => {
@@ -153,7 +152,7 @@ describe('dispatchTool — goal_register', () => {
);
expect(result.systemMessage).toContain('[TOOL ERROR] The encounter has gone on for too long');
expect(mockSessionUpdate).not.toHaveBeenCalled();
expect(mockAtomicMutate).not.toHaveBeenCalled();
});
it('rejects if maximum limit of 2 dynamic goals is reached', async () => {
@@ -186,6 +185,6 @@ describe('dispatchTool — goal_register', () => {
);
expect(result.systemMessage).toContain('[TOOL ERROR] Maximum limit of 2 dynamic goals reached');
expect(mockSessionUpdate).not.toHaveBeenCalled();
expect(mockAtomicMutate).not.toHaveBeenCalled();
});
});

18
tests/unit/keys.test.ts Normal file
View File

@@ -0,0 +1,18 @@
import { describe, it, expect } from 'vitest';
import { KEYS } from '../../src/db/keys.js';
// The key registry is the single source for Redis key shapes. Lock the shapes
// so a refactor can't silently drift the keyspace the boot sweep enumerates.
describe('Redis key registry (KEYS)', () => {
it('builds the expected key shapes', () => {
expect(KEYS.session('t1')).toBe('session:t1');
expect(KEYS.guildThreads('g1')).toBe('guild_threads:g1');
expect(KEYS.characters('g1')).toBe('characters:g1');
expect(KEYS.players('g1')).toBe('players:g1');
expect(KEYS.groupcheck('t1')).toBe('groupcheck:t1');
expect(KEYS.lobby('t1')).toBe('lobby:t1');
expect(KEYS.encounterActive('t1')).toBe('encounter:t1:active');
expect(KEYS.characterStatus('g1', 'u1')).toBe('character_status:g1:u1');
expect(KEYS.campaign('c1')).toBe('campaign:c1');
});
});

View File

@@ -28,10 +28,11 @@ vi.mock('../../src/harness/toolDispatcher.js', () => ({
}));
// ── sessionManager mock ──────────────────────────────────────────────────────
const { mockAddMessage, mockUpdate, mockGet } = vi.hoisted(() => ({
const { mockAddMessage, mockUpdate, mockGet, mockAtomicMutate } = vi.hoisted(() => ({
mockAddMessage: vi.fn(),
mockUpdate: vi.fn(),
mockGet: vi.fn(),
mockAtomicMutate: vi.fn(),
}));
vi.mock('../../src/session/sessionManager.js', () => ({
@@ -39,6 +40,7 @@ vi.mock('../../src/session/sessionManager.js', () => ({
addMessage: mockAddMessage,
update: mockUpdate,
get: mockGet,
atomicMutate: mockAtomicMutate,
},
}));
@@ -299,7 +301,9 @@ describe('runLLMTurn — tool call dispatch', () => {
await runLLMTurn(sessionWith([]), thread, {} as any);
expect(mockUpdate).toHaveBeenCalledWith(mockSession.threadId, {
expect(mockAtomicMutate).toHaveBeenCalledWith(mockSession.threadId, expect.any(Function));
const resolveMutator = mockAtomicMutate.mock.calls[0][1] as () => any;
expect(resolveMutator()).toEqual({
phase: 'resolved',
outcome: 'catch',
outcomeSummary: 'got him',

View File

@@ -105,3 +105,51 @@ describe('sessionManager.delete', () => {
expect(ids).not.toContain('thread-1');
});
});
describe('sessionManager.atomicMutate', () => {
it('merges a patch and updates updatedAt', async () => {
await sessionManager.create('thread-1', mockSession);
const before = (await sessionManager.get('thread-1'))!.updatedAt;
await new Promise(r => setTimeout(r, 5));
await sessionManager.atomicMutate('thread-1', () => ({ phase: 'resolved' }));
const after = (await sessionManager.get('thread-1'))!;
expect(after.phase).toBe('resolved');
expect(after.updatedAt).toBeGreaterThan(before);
});
it('returns the persisted state', async () => {
await sessionManager.create('thread-1', mockSession);
const updated = await sessionManager.atomicMutate('thread-1', () => ({ phase: 'resolved' }));
expect(updated.phase).toBe('resolved');
});
it('throws when session does not exist', async () => {
await expect(sessionManager.atomicMutate('ghost', () => ({ phase: 'resolved' }))).rejects.toThrow();
});
it('serializes concurrent mutations on the same thread (no lost update)', async () => {
await sessionManager.create('thread-1', { ...mockSession, heldMessages: [] });
const delay = (ms: number) => new Promise(r => setTimeout(r, ms));
const mk = (id: string) => ({ discordUserId: id, content: `msg-${id}`, timestamp: Date.now() });
// Two mutators both append to heldMessages, with an await inside each to
// force interleaving. Without per-thread serialization, one append is lost.
await Promise.all([
sessionManager.atomicMutate('thread-1', async s => { await delay(5); return { heldMessages: [...s.heldMessages, mk('a')] }; }),
sessionManager.atomicMutate('thread-1', async s => { await delay(5); return { heldMessages: [...s.heldMessages, mk('b')] }; }),
]);
const s = await sessionManager.get('thread-1');
expect(s?.heldMessages).toHaveLength(2);
expect(s!.heldMessages.map(m => m.discordUserId).sort()).toEqual(['a', 'b']);
});
it('does not serialize mutations across different threads', async () => {
await sessionManager.create('t1', { ...mockSession, threadId: 't1' });
await sessionManager.create('t2', { ...mockSession, encounterId: 'enc-2', threadId: 't2' });
await Promise.all([
sessionManager.atomicMutate('t1', () => ({ phase: 'resolved' })),
sessionManager.atomicMutate('t2', () => ({ phase: 'resolved' })),
]);
expect((await sessionManager.get('t1'))?.phase).toBe('resolved');
expect((await sessionManager.get('t2'))?.phase).toBe('resolved');
});
});