Event Store¶
PostgreSQL-based event sourcing using Marten v8. Every action in the Engineering Rig emits an event. The event store is the source of truth for all coordination state.
Event Types¶
All event types are mapped in the SubmitEvent use case. Every type has unit test coverage.
Lifecycle Events¶
| Event | Description | Key Fields |
|---|---|---|
ISSUE_APPROVED |
Issue enters the rig | repo, issueNumber, title, priority, dependsOn |
ISSUE_ASSIGNED |
Conductor assigns to agent | agentId, attempt |
ISSUE_UNASSIGNED |
Agent removed (reassignment) | agentId, reason |
WORK_STARTED |
Agent begins implementation | agentId, branch |
BRANCH_CREATED |
Feature branch created | agentId, branch |
PR_CREATED |
Pull request opened | agentId, prNumber, prUrl, branch |
CI_PASSED |
All CI checks green | prNumber |
CI_FAILED |
CI check failed | prNumber, failedChecks, logs |
REVIEW_PASSED |
Review-E approved | prNumber |
REVIEW_DISPUTED |
Agent disagrees with review | prNumber, attempt (iteration) |
HUMAN_GATE_TRIGGERED |
Sensitive code detected | prNumber, reason |
MERGED |
PR merged to main | prNumber, mergeSha |
DEPLOYED_STAGING |
Deployed to staging | reason (environment) |
SMOKE_PASSED |
Staging smoke tests passed | — |
SMOKE_FAILED |
Smoke test failed | reason, attempt (retryCount), maxRetries=3 |
E2E_PASSED |
Tier-2 Playwright e2e all green | passedCount, totalCount, durationMs, runUrl, browser (default chromium) |
E2E_FAILED |
Tier-2 Playwright e2e regression | passedCount, failedCount, totalCount, durationMs, firstFailure, runUrl, browser |
DEPLOYED_PRODUCTION |
Deployed to production | reason (environment) |
VERIFIED |
Production verification passed | — |
ISSUE_DONE |
Issue complete | durationMinutes |
Health Events¶
| Event | Description | Key Fields |
|---|---|---|
HEARTBEAT |
Agent alive signal and capability snapshot | agentId, status, currentIssue, currentRepo, activeProvider, availableProviders, providers[], integrations[] |
AGENT_STUCK |
Agent unresponsive or SLA exceeded | agentId, reason, attempt |
Escalation Events¶
| Event | Description | Key Fields |
|---|---|---|
ESCALATED |
Issue escalated to human | reason, attempts |
HUMAN_GATE_REMINDER |
30-min reminder | prNumber, attempt (waitMinutes) |
MILESTONE_COMPLETE |
All issues in milestone done | milestone, issueNumber (issueCount) |
Test Run Events¶
| Event | Description | Key Fields |
|---|---|---|
TESTS_RUN |
Dev-E ran the project test suite pre-push | repo, issueNumber, agentId, stack, command, passed, durationMs, output (first 500 chars on failure) |
TESTS_RUN is emitted by all Dev-E variants immediately before pushing to the remote branch. It records whether the suite passed, which command was run, and how long it took. Dashboard activity feed surfaces these events with a 🧪 icon. Failures include the first 500 characters of test output in the output field.
Release-Note Events (rig-conductor#1439)¶
Emitted by ReleaseNoteService (rc#1439b) after a release-note has been posted to the #releases Discord channel and the dashboard's read-model IssueStatus needs to flip its sticky release fields.
| Event | Description | Key Fields |
|---|---|---|
RELEASE_NOTE_POSTED |
Discord #releases post sent + read model can show this in the Releases tab |
repo, issueNumber, releaseSummary, mergeSha, sourcePrNumber, mergeAuthor, issueAuthor, mergedAt, deployedAt |
Stream id: {repo}#{issueNumber} (the standard issue stream — the projection's Apply(ReleaseNotePosted) is sticky, so a replay never re-posts or overwrites the original summary).
Trigger gate (in ReleaseNoteService after consuming MERGED):
1. Skip when IssueStatus.ReleaseNote == true (idempotent fast-path).
2. Fetch the merged PR; gate via ReleaseNotePolicy.ShouldPost(prTitle, hasNoReleaseNoteLabel) — drops image-pin SHA-bumps and any PR carrying the no-release-note label.
3. If the PR body carries a Refs <owner/repo>#<n> trailer (rc#1438), the service traces back to the source PR and uses its body/title/author for the announcement — image-pin merges aren't true releases, but the helper degrades gracefully when the trailer is missing.
4. Attribution string is rendered by ReleaseNotePolicy.FormatAttribution(mergeAuthor, issueAuthor): bot mergers surface "shipped via …", human mergers prefer the issue author.
AI summary fallback: the production path AI-summarises the PR body with a ~10 s hard timeout. On failure (LLM unconfigured, timeout, parse error) the service falls back to the first ~3 sentences of the PR body via ReleaseNoteService.SummariseFallback. Both paths emit the same shape of RELEASE_NOTE_POSTED.
Required env: DISCORD_RELEASES_WEBHOOK_URL (the #releases channel webhook URL). When absent the service still consumes events but the post step is a no-op — the projection only flips when the service successfully emits RELEASE_NOTE_POSTED.
Accessibility (a11y) Events¶
Tier 3 of the test strategy (rig-conductor#543). Emitted by the Tier 2 Playwright workflow's optional axe-core a11y job (gated on a non-empty a11y_pages input). One event per scan run, repo-scoped — stream ID is {repo}#0 (the repo-level sentinel; not tied to any issue).
| Event | Description | Key Fields |
|---|---|---|
A11Y_PASSED |
Scan completed with zero violations at-or-above the configured severity | repo, a11yPages[] (with violations[]), a11ySeverity, commitSha, runUrl |
A11Y_FAILED |
Scan found ≥1 violation at-or-above severity on ≥1 page | repo, a11yPages[], a11ySeverity, a11yTotalViolations, commitSha, runUrl |
Each a11yPages[] entry carries { url, violations: [ { id, impact, description, nodesCount } ] }. violations[] is severity-filtered upstream by the Playwright workflow — it contains only entries with impact >= a11ySeverity. Sub-threshold violations are dropped before the event is posted, so consumers can treat any non-empty violations[] as "this page failed at the configured severity" without re-filtering. The full per-page list is still carried on both A11Y_PASSED and A11Y_FAILED (passing pages just have violations: []) so the projection can record the scan's coverage.
The A11yDiscordListener background service watches these events and posts a violation summary to DISCORD_A11Y_WEBHOOK_URL (falling back to DISCORD_ADMIN_WEBHOOK_URL):
- Pass: ♿ a11y 3/3 ✅ — 0 serious violations on dashe-website
- Fail: 🔴 a11y 2/3 on dashe-website — / has 4 serious: color-contrast (3 nodes), label (1 node)
The dashboard renders the latest scan per repo via GET /api/a11y (read model: A11yStatus).
Observability Events¶
Emitted by agents for analytics. Stored in Marten and queryable via SQL. Discord posting is analytics-only by default; set DISCORD_TOOL_EVENTS=true to also forward TOOL_USED to Discord threads.
Tool Usage¶
| Event | Description | Key Fields |
|---|---|---|
TOOL_USED |
Every tool call from every agent | repo, issueNumber, agentId, toolName, turn, targetPreview (≤120 chars), durationMs, isMcp |
Memory Tracking¶
| Event | Description | Key Fields |
|---|---|---|
MEMORY_WRITE |
Agent writes a memory entry | repo, issueNumber, agentId, scope, kind, memoryId, tokens |
MEMORY_READ |
Agent queries the memory store | repo, issueNumber, agentId, query, hits, tokensLoaded |
MEMORY_HIT_USED |
Whether a recalled memory influenced output | memoryId, agentId, usedInOutput |
MEMORY_HIT_USED is a critical quality signal. Aggregate usedInOutput over time to measure whether memory retrieval actually improves agent behaviour.
Stream IDs for observability events:
- TOOL_USED, MEMORY_WRITE, MEMORY_READ → {repo}#{issueNumber} (issue stream)
- MEMORY_HIT_USED → {agentId} (agent stream — no issue context needed)
Sample Queries¶
Top 10 tools by usage per repo (Marten / PostgreSQL):
SELECT
data->>'Repo' AS repo,
data->>'ToolName' AS tool_name,
COUNT(*) AS uses,
AVG((data->>'DurationMs')::bigint) AS avg_duration_ms,
SUM(CASE WHEN data->>'IsMcp' = 'true' THEN 1 ELSE 0 END) AS mcp_calls
FROM mt_events
WHERE type = 'tool_used'
GROUP BY repo, tool_name
ORDER BY uses DESC
LIMIT 10;
Memory quality: recall hit-rate by agent:
SELECT
data->>'AgentId' AS agent_id,
COUNT(*) AS total_hits,
SUM(CASE WHEN data->>'UsedInOutput' = 'true' THEN 1 ELSE 0 END) AS used_count,
ROUND(
100.0 * SUM(CASE WHEN data->>'UsedInOutput' = 'true' THEN 1 ELSE 0 END) / COUNT(*), 1
) AS used_pct
FROM mt_events
WHERE type = 'memory_hit_used'
GROUP BY agent_id
ORDER BY used_pct DESC;
Submitting Events¶
curl -X POST http://rig-conductor-api:8080/api/events \
-H "Content-Type: application/json" \
-d '{
"type": "ISSUE_APPROVED",
"repo": "dashecorp/rig-conductor",
"issueNumber": 42,
"title": "feat: Add health check endpoint",
"priority": "normal",
"dependsOn": []
}'
Idempotency key¶
Add idempotencyKey to any event request to deduplicate retries. When the same key
is submitted within 24 hours the second call returns null (same as "unknown type")
and no event is appended. The key is arbitrary but should encode enough context to
distinguish legitimate duplicate events (e.g. different PRs closing the same issue)
from retries.
Recommended format: {type}:{repo}#{issueNumber}:{discriminator}.
curl -X POST http://rig-conductor-api:8080/api/events \
-H "Content-Type: application/json" \
-d '{
"type": "MERGED",
"repo": "dashecorp/rig-conductor",
"issueNumber": 133,
"prNumber": 141,
"agentId": "github",
"idempotencyKey": "MERGED:dashecorp/rig-conductor#133:141:abc-delivery-id"
}'
Response:
{
"streamId": "dashecorp/rig-conductor#42",
"type": "ISSUE_APPROVED",
"timestamp": "2026-04-02T06:56:22Z"
}
Querying Events¶
# Get all events for an issue stream (URL-encode the #)
curl "http://rig-conductor-api:8080/api/events/stream?id=dashecorp/rig-conductor%2342"
Stream Identity¶
String-based (configured via StreamIdentity.AsString):
- Issue streams:
{repo}#{issueNumber}(e.g.,dashecorp/rig-conductor#42) - Agent streams:
{agentId}(e.g.,dev-e-1) — used for heartbeats, provider/integration health snapshots, and direct agent-specific events (TokenUsage,AgentQuotaReported,MemoryQueried).
Agent-status routing (post-rc#1133, shipped in rc#1165)¶
AgentStatusProjection is a MultiStreamProjection<AgentStatus, string> that derives its
identity from each event's AgentId payload field via explicit
Identity<TEvent>(e => e.AgentId) selectors. Events that carry an AgentId (Heartbeat,
IssueAssigned, WorkStarted, IssueUnassigned, AgentStuck, TokenUsage,
AgentQuotaReported, MemoryQueried) route to the agent's document regardless of which
stream they physically landed on. IssueAssigned / WorkStarted / IssueUnassigned /
AgentStuck are emitted only on the issue stream — AgentStatusProjection picks them
up via the Identity selectors. No dual-write needed.
Pre-rc#1133 the projection was SingleStreamProjection<AgentStatus, string> and the
identity was the stream key, so the agent's document only got updated when the event
physically landed on the agent's stream. That required a dual-write in SubmitEvent.cs
to append assignment/lifecycle events to both streams. The dual-write design accreted
~367 polluted AgentStatus documents keyed by issue-stream IDs (one per
{repo}#{issueNumber} that ever emitted a handled event), broke every codex pod's
heartbeats with optimistic-concurrency batch conflicts, and was removed in rc#1165.
Events that carry no AgentId (ISSUE_DONE, ISSUE_CANCELLED) are appended to the issue
stream only and are NOT routed to AgentStatusProjection. The agent's working state
clears via the next Heartbeat (carrying Status=idle, CurrentIssue=null) or via
IssueUnassigned when the conductor explicitly releases the claim.
See docs/2026-05-19-agent-stream-projection-multistream.md for the full migration design
and prod evidence.
Heartbeat Capability Snapshot¶
Heartbeat events can also carry the agent's current runtime capability view:
activeProvider— currently selected AI service for the agentavailableProviders— provider names that operators can switch toproviders[]— provider health entries with:namestatussuch asready,authenticated,auth_required,cooldown,missing_authdetailsactiveintegrations[]— integration health entries with:namestatusdetails
rig-conductor projects this onto AgentStatus, so /api/agents becomes the canonical live overview of:
- which agents are online
- which provider each agent is actively using
- which alternative providers are available
- which integrations are healthy or missing
Recovery Paths¶
CI_FAILED → Agent fixes → new commit → CI re-runs → CI_PASSED
SMOKE_FAILED (code) → Agent fixes → redeploy → retry (max 3)
SMOKE_FAILED (external_dependency) → ESCALATED → human decides
AGENT_STUCK (1st) → ISSUE_UNASSIGNED → ISSUE_ASSIGNED (new agent)
AGENT_STUCK (2nd) → ESCALATED → human decides
Stale in_progress/failed + linked open PR exists on GitHub
→ MissingPrCreatedRecovery (5-min sweep)
→ emit PrCreated
→ in_review
Database¶
Marten auto-creates schema on startup (ApplyAllDatabaseChangesOnStartup). Core tables:
| Table | Purpose |
|---|---|
mt_events |
All events (append-only) |
mt_streams |
Event streams (one per issue or agent) |
mt_doc_issuestatus |
IssueStatus projection |
mt_doc_agentstatus |
AgentStatus projection |
mt_doc_tenant |
Tenant allowlist (multi-tenancy, rc#1459) |
mt_doc_repotenant |
Repo → tenant mapping (rc#1459) |
Implementation¶
Events are pure records in ConductorE.Core.Domain (zero dependencies). The SubmitEvent use case maps API requests to domain events and appends via the IEventStore port. The MartenEventStore adapter implements the port using Marten sessions.
Multi-Tenancy (rc#1459)¶
Every event carries a server-resolved tenant_id so the rig can serve isolated
customers (tenant #0 = invotek, GitHub org dashecorp). Phase-0 is a zero-behavior-change
retrofit; see docs/2026-06-07-multi-tenancy-tenant-id-keystone.md for the full record.
- Carrier: a Marten event header (
Events.MetadataConfig.HeadersEnabled = true; set viasession.SetHeader("tenant_id", …)), not a field on any event record and not a wrapping envelope. Read back onEventRecord.TenantId. - Resolution:
ITenantResolver(over theTenant/RepoTenantallowlist) maps the trusted GitHubinstallation_id/ org / repo to a tenant_id at the ingress boundary. Unknown ⟹null(rejected — never auto-provisioned). The value lives in a scopedITenantContext, never onSubmitEventRequest, so it cannot be set from a request body, tool argument, or LLM output. - Backfill: pre-keystone events have no header and read back as
invotekvia a read-time coalesce — the immutable event store is never rewritten.
Phase 1 — DB-per-tenant (rc#1477 PR-2)¶
The data plane is now siloed per tenant: Marten runs master-table tenancy
(MultiTenantedDatabasesWithMasterDatabaseTable), one Postgres database per tenant
(rig_t_<id>_evt), with the registry in the control database rig_control. Master-table
tenancy has no default tenant, so every session names one.
- Routing: tenant-0
invotekis statically registered to the legacyrig_conductordatabase; other tenants are provisioned at runtime byTenantProvisioner. A request-scoped tenant-bound session factory opens for the resolvedITenantContext.TenantId, defaulting toinvotekwhen unset (single-tenant behavior unchanged). - The
tenant_idheader is retained for audit/display, but it is no longer a cost filter — the database is the tenant boundary. Cost/usage reads (MartenCostQuery/MartenUsageQuery) query per-tenant databases:?tenantId=Xreads X's database; unscoped reads aggregate across all registered tenant databases (ITenantDatabaseRegistry). - No data migration on cutover: master-table separate-DB tenancy treats the database as the
boundary and does not filter rows by the
tenant_idcolumn, so legacy*DEFAULT*events stay visible underinvotek. Verified — seedocs/2026-06-08-multi-tenancy-db-per-tenant-pr2-cost-model.md. - Still deferred (later slices): fail-closed
RequireTenantfor background consumers, per-tenant least-privilege DB roles, control-plane allowlist relocation torig_control, per-tenant App installs/dispatch, namespace + pgvector/secret isolation, the cutover hardening (AutoCreate.None).