Skip to content

Event Store

PostgreSQL-based event sourcing using Marten v8. Every action in the Engineering Rig emits an event. The event store is the source of truth for all coordination state.

Event Types

All event types are mapped in the SubmitEvent use case. Every type has unit test coverage.

Lifecycle Events

Event Description Key Fields
ISSUE_APPROVED Issue enters the rig repo, issueNumber, title, priority, dependsOn
ISSUE_ASSIGNED Conductor assigns to agent agentId, attempt
ISSUE_UNASSIGNED Agent removed (reassignment) agentId, reason
WORK_STARTED Agent begins implementation agentId, branch
BRANCH_CREATED Feature branch created agentId, branch
PR_CREATED Pull request opened agentId, prNumber, prUrl, branch
CI_PASSED All CI checks green prNumber
CI_FAILED CI check failed prNumber, failedChecks, logs
REVIEW_PASSED Review-E approved prNumber
REVIEW_DISPUTED Agent disagrees with review prNumber, attempt (iteration)
HUMAN_GATE_TRIGGERED Sensitive code detected prNumber, reason
MERGED PR merged to main prNumber, mergeSha
DEPLOYED_STAGING Deployed to staging reason (environment)
SMOKE_PASSED Staging smoke tests passed
SMOKE_FAILED Smoke test failed reason, attempt (retryCount), maxRetries=3
DEPLOYED_PRODUCTION Deployed to production reason (environment)
VERIFIED Production verification passed
ISSUE_DONE Issue complete durationMinutes

Health Events

Event Description Key Fields
HEARTBEAT Agent alive signal and capability snapshot agentId, status, currentIssue, currentRepo, activeProvider, availableProviders, providers[], integrations[]
AGENT_STUCK Agent unresponsive or SLA exceeded agentId, reason, attempt

Escalation Events

Event Description Key Fields
ESCALATED Issue escalated to human reason, attempts
HUMAN_GATE_REMINDER 30-min reminder prNumber, attempt (waitMinutes)
MILESTONE_COMPLETE All issues in milestone done milestone, issueNumber (issueCount)

Observability Events

Emitted by agents for analytics. Stored in Marten and queryable via SQL. Discord posting is analytics-only by default; set DISCORD_TOOL_EVENTS=true to also forward TOOL_USED to Discord threads.

Tool Usage

Event Description Key Fields
TOOL_USED Every tool call from every agent repo, issueNumber, agentId, toolName, turn, targetPreview (≤120 chars), durationMs, isMcp

Memory Tracking

Event Description Key Fields
MEMORY_WRITE Agent writes a memory entry repo, issueNumber, agentId, scope, kind, memoryId, tokens
MEMORY_READ Agent queries the memory store repo, issueNumber, agentId, query, hits, tokensLoaded
MEMORY_HIT_USED Whether a recalled memory influenced output memoryId, agentId, usedInOutput

MEMORY_HIT_USED is a critical quality signal. Aggregate usedInOutput over time to measure whether memory retrieval actually improves agent behaviour.

Stream IDs for observability events: - TOOL_USED, MEMORY_WRITE, MEMORY_READ{repo}#{issueNumber} (issue stream) - MEMORY_HIT_USED{agentId} (agent stream — no issue context needed)

Sample Queries

Top 10 tools by usage per repo (Marten / PostgreSQL):

SELECT
    data->>'Repo'      AS repo,
    data->>'ToolName'  AS tool_name,
    COUNT(*)           AS uses,
    AVG((data->>'DurationMs')::bigint) AS avg_duration_ms,
    SUM(CASE WHEN data->>'IsMcp' = 'true' THEN 1 ELSE 0 END) AS mcp_calls
FROM mt_events
WHERE type = 'tool_used'
GROUP BY repo, tool_name
ORDER BY uses DESC
LIMIT 10;

Memory quality: recall hit-rate by agent:

SELECT
    data->>'AgentId'      AS agent_id,
    COUNT(*)              AS total_hits,
    SUM(CASE WHEN data->>'UsedInOutput' = 'true' THEN 1 ELSE 0 END) AS used_count,
    ROUND(
        100.0 * SUM(CASE WHEN data->>'UsedInOutput' = 'true' THEN 1 ELSE 0 END) / COUNT(*), 1
    ) AS used_pct
FROM mt_events
WHERE type = 'memory_hit_used'
GROUP BY agent_id
ORDER BY used_pct DESC;

Submitting Events

curl -X POST http://conductor-e-api:8080/api/events \
  -H "Content-Type: application/json" \
  -d '{
    "type": "ISSUE_APPROVED",
    "repo": "dashecorp/conductor-e",
    "issueNumber": 42,
    "title": "feat: Add health check endpoint",
    "priority": "normal",
    "dependsOn": []
  }'

Response:

{
  "streamId": "dashecorp/conductor-e#42",
  "type": "ISSUE_APPROVED",
  "timestamp": "2026-04-02T06:56:22Z"
}

Querying Events

# Get all events for an issue stream (URL-encode the #)
curl "http://conductor-e-api:8080/api/events/stream?id=dashecorp/conductor-e%2342"

Stream Identity

String-based (configured via StreamIdentity.AsString):

  • Issue streams: {repo}#{issueNumber} (e.g., dashecorp/conductor-e#42)
  • Agent streams: {agentId} (e.g., dev-e-1) — used for heartbeats and the latest provider/integration health snapshot

Heartbeat Capability Snapshot

Heartbeat events can also carry the agent's current runtime capability view:

  • activeProvider — currently selected AI service for the agent
  • availableProviders — provider names that operators can switch to
  • providers[] — provider health entries with:
  • name
  • status such as ready, authenticated, auth_required, cooldown, missing_auth
  • details
  • active
  • integrations[] — integration health entries with:
  • name
  • status
  • details

Conductor-E projects this onto AgentStatus, so /api/agents becomes the canonical live overview of:

  • which agents are online
  • which provider each agent is actively using
  • which alternative providers are available
  • which integrations are healthy or missing

Recovery Paths

CI_FAILED → Agent fixes → new commit → CI re-runs → CI_PASSED

SMOKE_FAILED (code) → Agent fixes → redeploy → retry (max 3)

SMOKE_FAILED (external_dependency) → ESCALATED → human decides

AGENT_STUCK (1st) → ISSUE_UNASSIGNED → ISSUE_ASSIGNED (new agent)

AGENT_STUCK (2nd) → ESCALATED → human decides

Database

Marten auto-creates schema on startup (ApplyAllDatabaseChangesOnStartup). Core tables:

Table Purpose
mt_events All events (append-only)
mt_streams Event streams (one per issue or agent)
mt_doc_issuestatus IssueStatus projection
mt_doc_agentstatus AgentStatus projection

Implementation

Events are pure records in ConductorE.Core.Domain (zero dependencies). The SubmitEvent use case maps API requests to domain events and appends via the IEventStore port. The MartenEventStore adapter implements the port using Marten sessions.