Event Store¶
PostgreSQL-based event sourcing using Marten v8. Every action in the Engineering Rig emits an event. The event store is the source of truth for all coordination state.
Event Types¶
All event types are mapped in the SubmitEvent use case. Every type has unit test coverage.
Lifecycle Events¶
| Event | Description | Key Fields |
|---|---|---|
ISSUE_APPROVED |
Issue enters the rig | repo, issueNumber, title, priority, dependsOn |
ISSUE_ASSIGNED |
Conductor assigns to agent | agentId, attempt |
ISSUE_UNASSIGNED |
Agent removed (reassignment) | agentId, reason |
WORK_STARTED |
Agent begins implementation | agentId, branch |
BRANCH_CREATED |
Feature branch created | agentId, branch |
PR_CREATED |
Pull request opened | agentId, prNumber, prUrl, branch |
CI_PASSED |
All CI checks green | prNumber |
CI_FAILED |
CI check failed | prNumber, failedChecks, logs |
REVIEW_PASSED |
Review-E approved | prNumber |
REVIEW_DISPUTED |
Agent disagrees with review | prNumber, attempt (iteration) |
HUMAN_GATE_TRIGGERED |
Sensitive code detected | prNumber, reason |
MERGED |
PR merged to main | prNumber, mergeSha |
DEPLOYED_STAGING |
Deployed to staging | reason (environment) |
SMOKE_PASSED |
Staging smoke tests passed | — |
SMOKE_FAILED |
Smoke test failed | reason, attempt (retryCount), maxRetries=3 |
DEPLOYED_PRODUCTION |
Deployed to production | reason (environment) |
VERIFIED |
Production verification passed | — |
ISSUE_DONE |
Issue complete | durationMinutes |
Health Events¶
| Event | Description | Key Fields |
|---|---|---|
HEARTBEAT |
Agent alive signal and capability snapshot | agentId, status, currentIssue, currentRepo, activeProvider, availableProviders, providers[], integrations[] |
AGENT_STUCK |
Agent unresponsive or SLA exceeded | agentId, reason, attempt |
Escalation Events¶
| Event | Description | Key Fields |
|---|---|---|
ESCALATED |
Issue escalated to human | reason, attempts |
HUMAN_GATE_REMINDER |
30-min reminder | prNumber, attempt (waitMinutes) |
MILESTONE_COMPLETE |
All issues in milestone done | milestone, issueNumber (issueCount) |
Observability Events¶
Emitted by agents for analytics. Stored in Marten and queryable via SQL. Discord posting is analytics-only by default; set DISCORD_TOOL_EVENTS=true to also forward TOOL_USED to Discord threads.
Tool Usage¶
| Event | Description | Key Fields |
|---|---|---|
TOOL_USED |
Every tool call from every agent | repo, issueNumber, agentId, toolName, turn, targetPreview (≤120 chars), durationMs, isMcp |
Memory Tracking¶
| Event | Description | Key Fields |
|---|---|---|
MEMORY_WRITE |
Agent writes a memory entry | repo, issueNumber, agentId, scope, kind, memoryId, tokens |
MEMORY_READ |
Agent queries the memory store | repo, issueNumber, agentId, query, hits, tokensLoaded |
MEMORY_HIT_USED |
Whether a recalled memory influenced output | memoryId, agentId, usedInOutput |
MEMORY_HIT_USED is a critical quality signal. Aggregate usedInOutput over time to measure whether memory retrieval actually improves agent behaviour.
Stream IDs for observability events:
- TOOL_USED, MEMORY_WRITE, MEMORY_READ → {repo}#{issueNumber} (issue stream)
- MEMORY_HIT_USED → {agentId} (agent stream — no issue context needed)
Sample Queries¶
Top 10 tools by usage per repo (Marten / PostgreSQL):
SELECT
data->>'Repo' AS repo,
data->>'ToolName' AS tool_name,
COUNT(*) AS uses,
AVG((data->>'DurationMs')::bigint) AS avg_duration_ms,
SUM(CASE WHEN data->>'IsMcp' = 'true' THEN 1 ELSE 0 END) AS mcp_calls
FROM mt_events
WHERE type = 'tool_used'
GROUP BY repo, tool_name
ORDER BY uses DESC
LIMIT 10;
Memory quality: recall hit-rate by agent:
SELECT
data->>'AgentId' AS agent_id,
COUNT(*) AS total_hits,
SUM(CASE WHEN data->>'UsedInOutput' = 'true' THEN 1 ELSE 0 END) AS used_count,
ROUND(
100.0 * SUM(CASE WHEN data->>'UsedInOutput' = 'true' THEN 1 ELSE 0 END) / COUNT(*), 1
) AS used_pct
FROM mt_events
WHERE type = 'memory_hit_used'
GROUP BY agent_id
ORDER BY used_pct DESC;
Submitting Events¶
curl -X POST http://conductor-e-api:8080/api/events \
-H "Content-Type: application/json" \
-d '{
"type": "ISSUE_APPROVED",
"repo": "dashecorp/conductor-e",
"issueNumber": 42,
"title": "feat: Add health check endpoint",
"priority": "normal",
"dependsOn": []
}'
Response:
{
"streamId": "dashecorp/conductor-e#42",
"type": "ISSUE_APPROVED",
"timestamp": "2026-04-02T06:56:22Z"
}
Querying Events¶
# Get all events for an issue stream (URL-encode the #)
curl "http://conductor-e-api:8080/api/events/stream?id=dashecorp/conductor-e%2342"
Stream Identity¶
String-based (configured via StreamIdentity.AsString):
- Issue streams:
{repo}#{issueNumber}(e.g.,dashecorp/conductor-e#42) - Agent streams:
{agentId}(e.g.,dev-e-1) — used for heartbeats and the latest provider/integration health snapshot
Heartbeat Capability Snapshot¶
Heartbeat events can also carry the agent's current runtime capability view:
activeProvider— currently selected AI service for the agentavailableProviders— provider names that operators can switch toproviders[]— provider health entries with:namestatussuch asready,authenticated,auth_required,cooldown,missing_authdetailsactiveintegrations[]— integration health entries with:namestatusdetails
Conductor-E projects this onto AgentStatus, so /api/agents becomes the canonical live overview of:
- which agents are online
- which provider each agent is actively using
- which alternative providers are available
- which integrations are healthy or missing
Recovery Paths¶
CI_FAILED → Agent fixes → new commit → CI re-runs → CI_PASSED
SMOKE_FAILED (code) → Agent fixes → redeploy → retry (max 3)
SMOKE_FAILED (external_dependency) → ESCALATED → human decides
AGENT_STUCK (1st) → ISSUE_UNASSIGNED → ISSUE_ASSIGNED (new agent)
AGENT_STUCK (2nd) → ESCALATED → human decides
Database¶
Marten auto-creates schema on startup (ApplyAllDatabaseChangesOnStartup). Core tables:
| Table | Purpose |
|---|---|
mt_events |
All events (append-only) |
mt_streams |
Event streams (one per issue or agent) |
mt_doc_issuestatus |
IssueStatus projection |
mt_doc_agentstatus |
AgentStatus projection |
Implementation¶
Events are pure records in ConductorE.Core.Domain (zero dependencies). The SubmitEvent use case maps API requests to domain events and appends via the IEventStore port. The MartenEventStore adapter implements the port using Marten sessions.