Stream-side reaper — closes rc#959 gap 3¶
Tracking: rig-conductor#959.
The gap¶
When a Redis-streams consumer pod is alive enough to XREADGROUP a message off assignments:<agentId> but its agent has gone silent (no heartbeat for >10 min), the message stays stuck in that consumer's PEL forever. No other pod in the group can pick it up — Redis routes by consumer name, and the dead consumer "owns" the entry until something explicitly XCLAIMs it.
Original 2026-05-16 incident: review-e-codex-rig-agent-runtime-0 consumed reviews on PRs #955 + #957, hit codex 429 usage_limit_reached, retried 3×, then XACKed without delivering a review. The claude variant of review-e was alive and would have done the work, but the consumer-group routing preferred the codex consumer name. Manual XCLAIM was the only recovery.
What today's chain looks like¶
| # | Mechanism | What it does | Where |
|---|---|---|---|
| 1 | Codex fail-fast | _quotaExhaustedUntil short-circuits process() in <10ms; throws AgentProviderError with userMessage for chain fallback |
rar#469 |
| 2 | Provider chain iteration | processWithProviders walks the configured chain on fallback-eligible errors |
rar#475 |
| 3 | Degraded heartbeat | summarizeDegradationState adds degraded: true, degradedReasons: ['codex_quota_exhausted_until_<iso>'] to heartbeat payload |
rar#484 |
| 4 | Quota recovery reconciliation | ReconciliationService.ReconcileQuotaRecoveryAsync re-dispatches stuck PRs after quota recovers |
rc#1094 / rc#944 |
| 5 | Stream-side reaper | This doc | rc#959 gap 3 |
Steps 1–4 handle the quota path: known-cause failures where the rar pod can detect saturation and exit cleanly. Step 5 handles the silent-failure path: pod is alive enough to claim a message but never XACKs (crashed mid-processing, hung in a slow CLI, OOMd between read and ack, etc.).
Design¶
Pure policy in ConductorE.Core/Domain/StreamReclaimPolicy.cs — two decisions:
ShouldReclaim(entry, assignedConsumer, now)— given a pending entry's idle time and the assigned consumer's agent-level heartbeat health, should the entry be moved to a different consumer? Returns true iff the entry is pastDefaultEntryStaleMs(5 min) AND the agent owning the assigned consumer has no heartbeat withinDefaultAgentDownMs(10 min, mirrorsAgentHeartbeatWatchdog).PickReclaimTarget(candidates, excludeConsumer, now)— given the universe of consumers in the same group, which one should receive the reclaim? Excludes the originally-assigned consumer, filters out down agents, prefers the freshest heartbeat.
Thin I/O adapter in ConductorE.Api/Services/StreamReclaimService.cs — a BackgroundService that scans every 60 s. For each known agent stream:
XINFO CONSUMERS assignments:<agentId> agents→ build theConsumerHealthview (consumer name + idle ms + resolved agent id + last heartbeat).XPENDING assignments:<agentId> agents - + 100→ list pending entries across all consumers.- For each entry: ask the policy. If reclaim →
XCLAIMto the target consumer withmin-idle-time = 0(force-move). - Log
[StreamReclaim] Reclaimed <entry> on <stream>: <from> -> <to>for observability.
Consumer-name → agent-id resolution: rar uses ${agentId}-rig-agent-runtime-${N} consumer names. The resolver walks the AgentIds list longest-first so review-e-codex wins over review-e. This is independently tested — a regression in the ordering would silently misroute codex consumers onto the non-codex agent's heartbeat data.
Why an actuator + detector¶
The repo already has StreamConsumerWithoutHeartbeatWatcher — a SelfImprovementService watcher that observes the same pattern and auto-files a gap-analysis issue. The watcher is the detector; this service is the actuator. Both stay active in steady state:
- Detector catches new manifestations of the pattern that the actuator's rules don't yet cover (e.g. a future stream where the consumer name format diverges, or a new agent class that hasn't been added to
AgentIds). - Actuator closes the loop on the cases it does cover.
If the actuator ever silently regresses (XCLAIM stops firing), the detector keeps emitting SignatureOccurrences — and the rc#947 framework auto-files an issue.
What's tested¶
| Tier | Cases | Where |
|---|---|---|
| Pure policy | 14 — ShouldReclaim (fresh / stale + agent-up / agent-down combos), PickReclaimTarget (exclude assigned, skip down, prefer freshest, empty list), IsAgentDown (null heartbeat, custom threshold) |
tests/ConductorE.Core.Tests/Domain/StreamReclaimPolicyTests.cs |
| Adapter unit | 10 — ResolveOwningAgentId longest-prefix-match across all 13 agent ids, ScanAsync defensive guard (no Redis → returns 0 silently), AgentIds parity with StreamConsumerWithoutHeartbeatWatcher |
tests/ConductorE.Api.Tests/StreamReclaimServiceTests.cs |
| End-to-end | None in CI. Redis testcontainer infrastructure doesn't exist in tests/ConductorE.Api.Tests yet (PostgreSQL is the only testcontainer wired). Production verification: watch for [StreamReclaim] Reclaimed … log line on the next codex-saturation event. Follow-up to wire a Redis testcontainer is tracked separately. |
Tuning knobs¶
StreamReclaimPolicy.DefaultEntryStaleMs— 5 min. Long enough that a healthy consumer in the middle of a slow review can finish; short enough that a dead pod doesn't hold work for hours.StreamReclaimPolicy.DefaultAgentDownMs— 10 min. MatchesAgentHeartbeatWatchdog.DefaultThresholdSecondsso the two services agree on liveness.StreamReclaimService.DefaultScanInterval— 60 s. Worst-case latency for a stuck entry isentryStaleMs + scanInterval≈ 6 min.StreamReclaimServicestartup delay — 60 s. Gives the conductor's other services time to initialise before the first scan.
These can be plumbed to env vars if operational tuning becomes necessary; for now they're constants.
Multi-tenancy (PR-4)¶
StreamReclaimService is control-plane, not a per-tenant fan-out. The agent fleet and the Redis
assignments:{agentId} streams are global rig infrastructure, and the agent-fleet snapshot
(IAgentQuery / AgentStatus) lands in the control/default tenant (invotek) because the /api/events
HEARTBEAT path does no per-repo tenant resolution. So its single scan pass binds ITenantContext to the
default tenant before resolving IAgentQuery, then runs the global Redis reclaim loop once and returns
a single count — iterating per active tenant would issue N redundant XCLAIMs against the same shared
streams and reclaim live entries off healthy consumers. Setting the tenant also keeps the session-binding
IAgentQuery resolution fail-closed-safe after the RequireTenant flip (PR-4M). Behaviour-neutral today.
Out of scope¶
- Per-entry retry tracking. A reclaim to a healthy consumer that then ALSO fails will land back in the new consumer's PEL. The reaper will re-
XCLAIMit on the next scan. There's no per-entry attempt counter — if the same entry bounces between consumers indefinitely, something deeper is broken (e.g. malformed payload) and the existingagent_stuckmachinery should fire. - Event emission. No
StreamEntryReclaimedevent type added in this PR. The log line is the observability surface; if persistent tracking is needed later, the event can be added incrementally. - rar#484
degradedflag consumption. The reaper currently uses heartbeat-age only, matchingAgentHeartbeatWatchdog. Once the rar#484degraded: truepayload field is projected ontoAgentStatus(separate follow-up), the policy can be extended to also reclaim from "alive-but-degraded" pods — catching the case where the codex pod IS heartbeating but its provider is saturated. - Hung-pod self-fence. Option A in the rc#959 issue (pod self-deletes its consumer when its provider reports exhausted, rejoins on recovery). Not strictly necessary now that the actuator runs server-side, but worth filing if the stream-side approach proves too noisy.
Pointers¶
- Issue: rc#959
- Companion detector:
src/ConductorE.Api/Services/SelfImprovement/StreamConsumerWithoutHeartbeatWatcher.cs - Companion policies (same file pattern):
Domain/QuotaRecoveryPolicy.cs(rc#944) and earlier reconciler-recovery paths (rc#608, rc#765)