Skip to content

Stream-side reaper — closes rc#959 gap 3

Tracking: rig-conductor#959.

The gap

When a Redis-streams consumer pod is alive enough to XREADGROUP a message off assignments:<agentId> but its agent has gone silent (no heartbeat for >10 min), the message stays stuck in that consumer's PEL forever. No other pod in the group can pick it up — Redis routes by consumer name, and the dead consumer "owns" the entry until something explicitly XCLAIMs it.

Original 2026-05-16 incident: review-e-codex-rig-agent-runtime-0 consumed reviews on PRs #955 + #957, hit codex 429 usage_limit_reached, retried 3×, then XACKed without delivering a review. The claude variant of review-e was alive and would have done the work, but the consumer-group routing preferred the codex consumer name. Manual XCLAIM was the only recovery.

What today's chain looks like

# Mechanism What it does Where
1 Codex fail-fast _quotaExhaustedUntil short-circuits process() in <10ms; throws AgentProviderError with userMessage for chain fallback rar#469
2 Provider chain iteration processWithProviders walks the configured chain on fallback-eligible errors rar#475
3 Degraded heartbeat summarizeDegradationState adds degraded: true, degradedReasons: ['codex_quota_exhausted_until_<iso>'] to heartbeat payload rar#484
4 Quota recovery reconciliation ReconciliationService.ReconcileQuotaRecoveryAsync re-dispatches stuck PRs after quota recovers rc#1094 / rc#944
5 Stream-side reaper This doc rc#959 gap 3

Steps 1–4 handle the quota path: known-cause failures where the rar pod can detect saturation and exit cleanly. Step 5 handles the silent-failure path: pod is alive enough to claim a message but never XACKs (crashed mid-processing, hung in a slow CLI, OOMd between read and ack, etc.).

Design

Pure policy in ConductorE.Core/Domain/StreamReclaimPolicy.cs — two decisions:

  1. ShouldReclaim(entry, assignedConsumer, now) — given a pending entry's idle time and the assigned consumer's agent-level heartbeat health, should the entry be moved to a different consumer? Returns true iff the entry is past DefaultEntryStaleMs (5 min) AND the agent owning the assigned consumer has no heartbeat within DefaultAgentDownMs (10 min, mirrors AgentHeartbeatWatchdog).
  2. PickReclaimTarget(candidates, excludeConsumer, now) — given the universe of consumers in the same group, which one should receive the reclaim? Excludes the originally-assigned consumer, filters out down agents, prefers the freshest heartbeat.

Thin I/O adapter in ConductorE.Api/Services/StreamReclaimService.cs — a BackgroundService that scans every 60 s. For each known agent stream:

  1. XINFO CONSUMERS assignments:<agentId> agents → build the ConsumerHealth view (consumer name + idle ms + resolved agent id + last heartbeat).
  2. XPENDING assignments:<agentId> agents - + 100 → list pending entries across all consumers.
  3. For each entry: ask the policy. If reclaim → XCLAIM to the target consumer with min-idle-time = 0 (force-move).
  4. Log [StreamReclaim] Reclaimed <entry> on <stream>: <from> -> <to> for observability.

Consumer-name → agent-id resolution: rar uses ${agentId}-rig-agent-runtime-${N} consumer names. The resolver walks the AgentIds list longest-first so review-e-codex wins over review-e. This is independently tested — a regression in the ordering would silently misroute codex consumers onto the non-codex agent's heartbeat data.

Why an actuator + detector

The repo already has StreamConsumerWithoutHeartbeatWatcher — a SelfImprovementService watcher that observes the same pattern and auto-files a gap-analysis issue. The watcher is the detector; this service is the actuator. Both stay active in steady state:

  • Detector catches new manifestations of the pattern that the actuator's rules don't yet cover (e.g. a future stream where the consumer name format diverges, or a new agent class that hasn't been added to AgentIds).
  • Actuator closes the loop on the cases it does cover.

If the actuator ever silently regresses (XCLAIM stops firing), the detector keeps emitting SignatureOccurrences — and the rc#947 framework auto-files an issue.

What's tested

Tier Cases Where
Pure policy 14 — ShouldReclaim (fresh / stale + agent-up / agent-down combos), PickReclaimTarget (exclude assigned, skip down, prefer freshest, empty list), IsAgentDown (null heartbeat, custom threshold) tests/ConductorE.Core.Tests/Domain/StreamReclaimPolicyTests.cs
Adapter unit 10 — ResolveOwningAgentId longest-prefix-match across all 13 agent ids, ScanAsync defensive guard (no Redis → returns 0 silently), AgentIds parity with StreamConsumerWithoutHeartbeatWatcher tests/ConductorE.Api.Tests/StreamReclaimServiceTests.cs
End-to-end None in CI. Redis testcontainer infrastructure doesn't exist in tests/ConductorE.Api.Tests yet (PostgreSQL is the only testcontainer wired). Production verification: watch for [StreamReclaim] Reclaimed … log line on the next codex-saturation event. Follow-up to wire a Redis testcontainer is tracked separately.

Tuning knobs

  • StreamReclaimPolicy.DefaultEntryStaleMs — 5 min. Long enough that a healthy consumer in the middle of a slow review can finish; short enough that a dead pod doesn't hold work for hours.
  • StreamReclaimPolicy.DefaultAgentDownMs — 10 min. Matches AgentHeartbeatWatchdog.DefaultThresholdSeconds so the two services agree on liveness.
  • StreamReclaimService.DefaultScanInterval — 60 s. Worst-case latency for a stuck entry is entryStaleMs + scanInterval ≈ 6 min.
  • StreamReclaimService startup delay — 60 s. Gives the conductor's other services time to initialise before the first scan.

These can be plumbed to env vars if operational tuning becomes necessary; for now they're constants.

Multi-tenancy (PR-4)

StreamReclaimService is control-plane, not a per-tenant fan-out. The agent fleet and the Redis assignments:{agentId} streams are global rig infrastructure, and the agent-fleet snapshot (IAgentQuery / AgentStatus) lands in the control/default tenant (invotek) because the /api/events HEARTBEAT path does no per-repo tenant resolution. So its single scan pass binds ITenantContext to the default tenant before resolving IAgentQuery, then runs the global Redis reclaim loop once and returns a single count — iterating per active tenant would issue N redundant XCLAIMs against the same shared streams and reclaim live entries off healthy consumers. Setting the tenant also keeps the session-binding IAgentQuery resolution fail-closed-safe after the RequireTenant flip (PR-4M). Behaviour-neutral today.

Out of scope

  • Per-entry retry tracking. A reclaim to a healthy consumer that then ALSO fails will land back in the new consumer's PEL. The reaper will re-XCLAIM it on the next scan. There's no per-entry attempt counter — if the same entry bounces between consumers indefinitely, something deeper is broken (e.g. malformed payload) and the existing agent_stuck machinery should fire.
  • Event emission. No StreamEntryReclaimed event type added in this PR. The log line is the observability surface; if persistent tracking is needed later, the event can be added incrementally.
  • rar#484 degraded flag consumption. The reaper currently uses heartbeat-age only, matching AgentHeartbeatWatchdog. Once the rar#484 degraded: true payload field is projected onto AgentStatus (separate follow-up), the policy can be extended to also reclaim from "alive-but-degraded" pods — catching the case where the codex pod IS heartbeating but its provider is saturated.
  • Hung-pod self-fence. Option A in the rc#959 issue (pod self-deletes its consumer when its provider reports exhausted, rejoins on recovery). Not strictly necessary now that the actuator runs server-side, but worth filing if the stream-side approach proves too noisy.

Pointers

  • Issue: rc#959
  • Companion detector: src/ConductorE.Api/Services/SelfImprovement/StreamConsumerWithoutHeartbeatWatcher.cs
  • Companion policies (same file pattern): Domain/QuotaRecoveryPolicy.cs (rc#944) and earlier reconciler-recovery paths (rc#608, rc#765)