Skip to content

Event Store

PostgreSQL-based event sourcing using Marten v8. Every action in the Engineering Rig emits an event. The event store is the source of truth for all coordination state.

Event Types

All event types are mapped in the SubmitEvent use case. Every type has unit test coverage.

Lifecycle Events

Event Description Key Fields
ISSUE_APPROVED Issue enters the rig repo, issueNumber, title, priority, dependsOn
ISSUE_ASSIGNED Conductor assigns to agent agentId, attempt
ISSUE_UNASSIGNED Agent removed (reassignment) agentId, reason
WORK_STARTED Agent begins implementation agentId, branch
BRANCH_CREATED Feature branch created agentId, branch
PR_CREATED Pull request opened agentId, prNumber, prUrl, branch
CI_PASSED All CI checks green prNumber
CI_FAILED CI check failed prNumber, failedChecks, logs
REVIEW_PASSED Review-E approved prNumber
REVIEW_DISPUTED Agent disagrees with review prNumber, attempt (iteration)
HUMAN_GATE_TRIGGERED Sensitive code detected prNumber, reason
MERGED PR merged to main prNumber, mergeSha
DEPLOYED_STAGING Deployed to staging reason (environment)
SMOKE_PASSED Staging smoke tests passed
SMOKE_FAILED Smoke test failed reason, attempt (retryCount), maxRetries=3
E2E_PASSED Tier-2 Playwright e2e all green passedCount, totalCount, durationMs, runUrl, browser (default chromium)
E2E_FAILED Tier-2 Playwright e2e regression passedCount, failedCount, totalCount, durationMs, firstFailure, runUrl, browser
DEPLOYED_PRODUCTION Deployed to production reason (environment)
VERIFIED Production verification passed
ISSUE_DONE Issue complete durationMinutes

Health Events

Event Description Key Fields
HEARTBEAT Agent alive signal and capability snapshot agentId, status, currentIssue, currentRepo, activeProvider, availableProviders, providers[], integrations[]
AGENT_STUCK Agent unresponsive or SLA exceeded agentId, reason, attempt

Escalation Events

Event Description Key Fields
ESCALATED Issue escalated to human reason, attempts
HUMAN_GATE_REMINDER 30-min reminder prNumber, attempt (waitMinutes)
MILESTONE_COMPLETE All issues in milestone done milestone, issueNumber (issueCount)

Test Run Events

Event Description Key Fields
TESTS_RUN Dev-E ran the project test suite pre-push repo, issueNumber, agentId, stack, command, passed, durationMs, output (first 500 chars on failure)

TESTS_RUN is emitted by all Dev-E variants immediately before pushing to the remote branch. It records whether the suite passed, which command was run, and how long it took. Dashboard activity feed surfaces these events with a 🧪 icon. Failures include the first 500 characters of test output in the output field.

Release-Note Events (rig-conductor#1439)

Emitted by ReleaseNoteService (rc#1439b) after a release-note has been posted to the #releases Discord channel and the dashboard's read-model IssueStatus needs to flip its sticky release fields.

Event Description Key Fields
RELEASE_NOTE_POSTED Discord #releases post sent + read model can show this in the Releases tab repo, issueNumber, releaseSummary, mergeSha, sourcePrNumber, mergeAuthor, issueAuthor, mergedAt, deployedAt

Stream id: {repo}#{issueNumber} (the standard issue stream — the projection's Apply(ReleaseNotePosted) is sticky, so a replay never re-posts or overwrites the original summary).

Trigger gate (in ReleaseNoteService after consuming MERGED): 1. Skip when IssueStatus.ReleaseNote == true (idempotent fast-path). 2. Fetch the merged PR; gate via ReleaseNotePolicy.ShouldPost(prTitle, hasNoReleaseNoteLabel) — drops image-pin SHA-bumps and any PR carrying the no-release-note label. 3. If the PR body carries a Refs <owner/repo>#<n> trailer (rc#1438), the service traces back to the source PR and uses its body/title/author for the announcement — image-pin merges aren't true releases, but the helper degrades gracefully when the trailer is missing. 4. Attribution string is rendered by ReleaseNotePolicy.FormatAttribution(mergeAuthor, issueAuthor): bot mergers surface "shipped via …", human mergers prefer the issue author.

AI summary fallback: the production path AI-summarises the PR body with a ~10 s hard timeout. On failure (LLM unconfigured, timeout, parse error) the service falls back to the first ~3 sentences of the PR body via ReleaseNoteService.SummariseFallback. Both paths emit the same shape of RELEASE_NOTE_POSTED.

Required env: DISCORD_RELEASES_WEBHOOK_URL (the #releases channel webhook URL). When absent the service still consumes events but the post step is a no-op — the projection only flips when the service successfully emits RELEASE_NOTE_POSTED.

Accessibility (a11y) Events

Tier 3 of the test strategy (rig-conductor#543). Emitted by the Tier 2 Playwright workflow's optional axe-core a11y job (gated on a non-empty a11y_pages input). One event per scan run, repo-scoped — stream ID is {repo}#0 (the repo-level sentinel; not tied to any issue).

Event Description Key Fields
A11Y_PASSED Scan completed with zero violations at-or-above the configured severity repo, a11yPages[] (with violations[]), a11ySeverity, commitSha, runUrl
A11Y_FAILED Scan found ≥1 violation at-or-above severity on ≥1 page repo, a11yPages[], a11ySeverity, a11yTotalViolations, commitSha, runUrl

Each a11yPages[] entry carries { url, violations: [ { id, impact, description, nodesCount } ] }. violations[] is severity-filtered upstream by the Playwright workflow — it contains only entries with impact >= a11ySeverity. Sub-threshold violations are dropped before the event is posted, so consumers can treat any non-empty violations[] as "this page failed at the configured severity" without re-filtering. The full per-page list is still carried on both A11Y_PASSED and A11Y_FAILED (passing pages just have violations: []) so the projection can record the scan's coverage.

The A11yDiscordListener background service watches these events and posts a violation summary to DISCORD_A11Y_WEBHOOK_URL (falling back to DISCORD_ADMIN_WEBHOOK_URL): - Pass: ♿ a11y 3/3 ✅ — 0 serious violations on dashe-website - Fail: 🔴 a11y 2/3 on dashe-website — / has 4 serious: color-contrast (3 nodes), label (1 node)

The dashboard renders the latest scan per repo via GET /api/a11y (read model: A11yStatus).

Observability Events

Emitted by agents for analytics. Stored in Marten and queryable via SQL. Discord posting is analytics-only by default; set DISCORD_TOOL_EVENTS=true to also forward TOOL_USED to Discord threads.

Tool Usage

Event Description Key Fields
TOOL_USED Every tool call from every agent repo, issueNumber, agentId, toolName, turn, targetPreview (≤120 chars), durationMs, isMcp

Memory Tracking

Event Description Key Fields
MEMORY_WRITE Agent writes a memory entry repo, issueNumber, agentId, scope, kind, memoryId, tokens
MEMORY_READ Agent queries the memory store repo, issueNumber, agentId, query, hits, tokensLoaded
MEMORY_HIT_USED Whether a recalled memory influenced output memoryId, agentId, usedInOutput

MEMORY_HIT_USED is a critical quality signal. Aggregate usedInOutput over time to measure whether memory retrieval actually improves agent behaviour.

Stream IDs for observability events: - TOOL_USED, MEMORY_WRITE, MEMORY_READ{repo}#{issueNumber} (issue stream) - MEMORY_HIT_USED{agentId} (agent stream — no issue context needed)

Sample Queries

Top 10 tools by usage per repo (Marten / PostgreSQL):

SELECT
    data->>'Repo'      AS repo,
    data->>'ToolName'  AS tool_name,
    COUNT(*)           AS uses,
    AVG((data->>'DurationMs')::bigint) AS avg_duration_ms,
    SUM(CASE WHEN data->>'IsMcp' = 'true' THEN 1 ELSE 0 END) AS mcp_calls
FROM mt_events
WHERE type = 'tool_used'
GROUP BY repo, tool_name
ORDER BY uses DESC
LIMIT 10;

Memory quality: recall hit-rate by agent:

SELECT
    data->>'AgentId'      AS agent_id,
    COUNT(*)              AS total_hits,
    SUM(CASE WHEN data->>'UsedInOutput' = 'true' THEN 1 ELSE 0 END) AS used_count,
    ROUND(
        100.0 * SUM(CASE WHEN data->>'UsedInOutput' = 'true' THEN 1 ELSE 0 END) / COUNT(*), 1
    ) AS used_pct
FROM mt_events
WHERE type = 'memory_hit_used'
GROUP BY agent_id
ORDER BY used_pct DESC;

Submitting Events

curl -X POST http://rig-conductor-api:8080/api/events \
  -H "Content-Type: application/json" \
  -d '{
    "type": "ISSUE_APPROVED",
    "repo": "dashecorp/rig-conductor",
    "issueNumber": 42,
    "title": "feat: Add health check endpoint",
    "priority": "normal",
    "dependsOn": []
  }'

Idempotency key

Add idempotencyKey to any event request to deduplicate retries. When the same key is submitted within 24 hours the second call returns null (same as "unknown type") and no event is appended. The key is arbitrary but should encode enough context to distinguish legitimate duplicate events (e.g. different PRs closing the same issue) from retries.

Recommended format: {type}:{repo}#{issueNumber}:{discriminator}.

curl -X POST http://rig-conductor-api:8080/api/events \
  -H "Content-Type: application/json" \
  -d '{
    "type": "MERGED",
    "repo": "dashecorp/rig-conductor",
    "issueNumber": 133,
    "prNumber": 141,
    "agentId": "github",
    "idempotencyKey": "MERGED:dashecorp/rig-conductor#133:141:abc-delivery-id"
  }'

Response:

{
  "streamId": "dashecorp/rig-conductor#42",
  "type": "ISSUE_APPROVED",
  "timestamp": "2026-04-02T06:56:22Z"
}

Querying Events

# Get all events for an issue stream (URL-encode the #)
curl "http://rig-conductor-api:8080/api/events/stream?id=dashecorp/rig-conductor%2342"

Stream Identity

String-based (configured via StreamIdentity.AsString):

  • Issue streams: {repo}#{issueNumber} (e.g., dashecorp/rig-conductor#42)
  • Agent streams: {agentId} (e.g., dev-e-1) — used for heartbeats, provider/integration health snapshots, and direct agent-specific events (TokenUsage, AgentQuotaReported, MemoryQueried).

Agent-status routing (post-rc#1133, shipped in rc#1165)

AgentStatusProjection is a MultiStreamProjection<AgentStatus, string> that derives its identity from each event's AgentId payload field via explicit Identity<TEvent>(e => e.AgentId) selectors. Events that carry an AgentId (Heartbeat, IssueAssigned, WorkStarted, IssueUnassigned, AgentStuck, TokenUsage, AgentQuotaReported, MemoryQueried) route to the agent's document regardless of which stream they physically landed on. IssueAssigned / WorkStarted / IssueUnassigned / AgentStuck are emitted only on the issue stream — AgentStatusProjection picks them up via the Identity selectors. No dual-write needed.

Pre-rc#1133 the projection was SingleStreamProjection<AgentStatus, string> and the identity was the stream key, so the agent's document only got updated when the event physically landed on the agent's stream. That required a dual-write in SubmitEvent.cs to append assignment/lifecycle events to both streams. The dual-write design accreted ~367 polluted AgentStatus documents keyed by issue-stream IDs (one per {repo}#{issueNumber} that ever emitted a handled event), broke every codex pod's heartbeats with optimistic-concurrency batch conflicts, and was removed in rc#1165.

Events that carry no AgentId (ISSUE_DONE, ISSUE_CANCELLED) are appended to the issue stream only and are NOT routed to AgentStatusProjection. The agent's working state clears via the next Heartbeat (carrying Status=idle, CurrentIssue=null) or via IssueUnassigned when the conductor explicitly releases the claim.

See docs/2026-05-19-agent-stream-projection-multistream.md for the full migration design and prod evidence.

Heartbeat Capability Snapshot

Heartbeat events can also carry the agent's current runtime capability view:

  • activeProvider — currently selected AI service for the agent
  • availableProviders — provider names that operators can switch to
  • providers[] — provider health entries with:
  • name
  • status such as ready, authenticated, auth_required, cooldown, missing_auth
  • details
  • active
  • integrations[] — integration health entries with:
  • name
  • status
  • details

rig-conductor projects this onto AgentStatus, so /api/agents becomes the canonical live overview of:

  • which agents are online
  • which provider each agent is actively using
  • which alternative providers are available
  • which integrations are healthy or missing

Recovery Paths

CI_FAILED → Agent fixes → new commit → CI re-runs → CI_PASSED

SMOKE_FAILED (code) → Agent fixes → redeploy → retry (max 3)

SMOKE_FAILED (external_dependency) → ESCALATED → human decides

AGENT_STUCK (1st) → ISSUE_UNASSIGNED → ISSUE_ASSIGNED (new agent)

AGENT_STUCK (2nd) → ESCALATED → human decides

Stale in_progress/failed + linked open PR exists on GitHub
  → MissingPrCreatedRecovery (5-min sweep)
  → emit PrCreated
  → in_review

Database

Marten auto-creates schema on startup (ApplyAllDatabaseChangesOnStartup). Core tables:

Table Purpose
mt_events All events (append-only)
mt_streams Event streams (one per issue or agent)
mt_doc_issuestatus IssueStatus projection
mt_doc_agentstatus AgentStatus projection
mt_doc_tenant Tenant allowlist (multi-tenancy, rc#1459)
mt_doc_repotenant Repo → tenant mapping (rc#1459)

Implementation

Events are pure records in ConductorE.Core.Domain (zero dependencies). The SubmitEvent use case maps API requests to domain events and appends via the IEventStore port. The MartenEventStore adapter implements the port using Marten sessions.

Multi-Tenancy (rc#1459)

Every event carries a server-resolved tenant_id so the rig can serve isolated customers (tenant #0 = invotek, GitHub org dashecorp). Phase-0 is a zero-behavior-change retrofit; see docs/2026-06-07-multi-tenancy-tenant-id-keystone.md for the full record.

  • Carrier: a Marten event header (Events.MetadataConfig.HeadersEnabled = true; set via session.SetHeader("tenant_id", …)), not a field on any event record and not a wrapping envelope. Read back on EventRecord.TenantId.
  • Resolution: ITenantResolver (over the Tenant/RepoTenant allowlist) maps the trusted GitHub installation_id / org / repo to a tenant_id at the ingress boundary. Unknown ⟹ null (rejected — never auto-provisioned). The value lives in a scoped ITenantContext, never on SubmitEventRequest, so it cannot be set from a request body, tool argument, or LLM output.
  • Backfill: pre-keystone events have no header and read back as invotek via a read-time coalesce — the immutable event store is never rewritten.

Phase 1 — DB-per-tenant (rc#1477 PR-2)

The data plane is now siloed per tenant: Marten runs master-table tenancy (MultiTenantedDatabasesWithMasterDatabaseTable), one Postgres database per tenant (rig_t_<id>_evt), with the registry in the control database rig_control. Master-table tenancy has no default tenant, so every session names one.

  • Routing: tenant-0 invotek is statically registered to the legacy rig_conductor database; other tenants are provisioned at runtime by TenantProvisioner. A request-scoped tenant-bound session factory opens for the resolved ITenantContext.TenantId, defaulting to invotek when unset (single-tenant behavior unchanged).
  • The tenant_id header is retained for audit/display, but it is no longer a cost filter — the database is the tenant boundary. Cost/usage reads (MartenCostQuery/MartenUsageQuery) query per-tenant databases: ?tenantId=X reads X's database; unscoped reads aggregate across all registered tenant databases (ITenantDatabaseRegistry).
  • No data migration on cutover: master-table separate-DB tenancy treats the database as the boundary and does not filter rows by the tenant_id column, so legacy *DEFAULT* events stay visible under invotek. Verified — see docs/2026-06-08-multi-tenancy-db-per-tenant-pr2-cost-model.md.
  • Still deferred (later slices): fail-closed RequireTenant for background consumers, per-tenant least-privilege DB roles, control-plane allowlist relocation to rig_control, per-tenant App installs/dispatch, namespace + pgvector/secret isolation, the cutover hardening (AutoCreate.None).