Event Pipeline
The SCHEMABOUND event pipeline is implemented as a Chain of Responsibility over the global
EventBus. Every domain event — query execution, validation failures, session registration,
trigger fires — flows through a registered sequence of handlers before any subscribers see it.
Handlers are synchronous, ordered, and can short-circuit the chain by returning
HandleOutcome::Stop. The default pipeline always returns HandleOutcome::Continue so the
full chain runs for every event.
Handler Trait
#![allow(unused)]
fn main() {
use schemabound::interceptor::{Event, EventHandler, HandleOutcome};
pub struct MyHandler;
impl EventHandler for MyHandler {
fn handle(&self, event: &Event) -> HandleOutcome {
println!("[my-handler] {}", event.event_type());
HandleOutcome::Continue
}
}
}
Register with the global bus:
#![allow(unused)]
fn main() {
use schemabound::get_event_bus;
get_event_bus().register_handler(Box::new(MyHandler))?;
}
Handlers are invoked in registration order for every dispatched event.
Built-In Handlers (schemabound::handlers)
AuditLogHandler
Writes a one-line JSON audit entry to stderr for every event. This is always the
first handler registered in the default gRPC pipeline so that every domain event is
recorded before downstream processing.
#![allow(unused)]
fn main() {
use schemabound::AuditLogHandler;
get_event_bus().register_handler(Box::new(AuditLogHandler))?;
// → [audit] {"event_type":"QueryExecuted","db_identifier":"...","query":"..."}
}
QueryMetricsHandler
Tracks cumulative counts of QueryExecuted, QueryValidationFailed, and
QueryExecutionError events through lock-free atomic counters. Readable from any thread
at any point.
#![allow(unused)]
fn main() {
use schemabound::{SharedHandler, handlers::QueryMetricsHandler};
use std::sync::Arc;
let metrics = Arc::new(QueryMetricsHandler::new());
get_event_bus().register_handler(Box::new(SharedHandler(Arc::clone(&metrics))))?;
// later — read from a health endpoint
let snap = metrics.snapshot();
println!("executed={} failures={} errors={}",
snap.queries_executed, snap.validation_failures, snap.execution_errors);
}
SessionActivityHandler
Counts SessionRegistered events so the gRPC layer can expose a live session counter
without a database round-trip.
#![allow(unused)]
fn main() {
use schemabound::{SharedHandler, handlers::SessionActivityHandler};
use std::sync::Arc;
let activity = Arc::new(SessionActivityHandler::new());
get_event_bus().register_handler(Box::new(SharedHandler(Arc::clone(&activity))))?;
println!("active sessions: {}", activity.session_count());
}
SharedHandler<T>
A newtype wrapper that lets an Arc<T: EventHandler> be registered with the bus without
transferring ownership, so the same handle can be retained for metrics reads.
#![allow(unused)]
fn main() {
use schemabound::SharedHandler;
// Arc retained for reading; clone registered with bus
get_event_bus().register_handler(Box::new(SharedHandler(Arc::clone(&my_handler))))?;
}
Default Handler Chain
DefaultHandlerChain bundles QueryMetricsHandler and SessionActivityHandler into a
single struct with pre-made Arc handles — the recommended starting point for gRPC
services.
#![allow(unused)]
fn main() {
use schemabound::{AuditLogHandler, DefaultHandlerChain, SharedHandler, get_event_bus};
let chain = DefaultHandlerChain::new();
let bus = get_event_bus();
bus.register_handler(Box::new(AuditLogHandler))?;
bus.register_handler(Box::new(SharedHandler(Arc::clone(&chain.query_metrics))))?;
bus.register_handler(Box::new(SharedHandler(Arc::clone(&chain.session_activity))))?;
// Retain chain for health probes
let queries = chain.query_metrics.snapshot().queries_executed;
let sessions = chain.session_activity.session_count();
}
Handler vs. Subscriber
| Handler | Subscriber | |
|---|---|---|
| API | register_handler | register_subscriber |
| Ordering | Explicit — registration order | Unordered |
| Short-circuit | HandleOutcome::Stop stops the chain | No chain to stop |
| Use cases | audit log, metrics, rate limiting | cache invalidation, notifications |
Use handlers when execution order or short-circuiting matters. Use subscribers for fire-and-forget side effects where order is irrelevant.
Event Reference
All events are variants of the Event enum. The table below shows which events each
built-in handler processes:
| Event variant | AuditLogHandler | QueryMetricsHandler | SessionActivityHandler |
|---|---|---|---|
QueryExecuted | ✓ | increments queries_executed | — |
QueryValidationFailed | ✓ | increments validation_failures | — |
QueryExecutionError | ✓ | increments execution_errors | — |
SessionRegistered | ✓ | — | increments sessions_registered |
TriggerFired | ✓ | — | — |
ModelChanged | ✓ | — | — |
RuntimeAugmentationAuditRecorded | ✓ | — | — |
LlmToolCallAuditRecorded | ✓ | — | — |
PromptInjectionSignalRaised | ✓ | — | — |
| (all others) | ✓ | — | — |
For full details on
LlmToolCallAuditRecordedandPromptInjectionSignalRaised, including OCSF class mapping, transport configuration, and injection policy, see Audit Logging and AI-SPM Integration.
Audit Exporters
In addition to synchronous handlers, SCHEMABOUND provides an async exporter pipeline that runs
after every dispatch. Exporters receive an AuditEventEnvelope containing the full event,
a tamper-evident SHA-256 hash chain, and distributed trace correlation fields.
Register an exporter alongside handlers:
#![allow(unused)]
fn main() {
use schemabound::{get_event_bus, AuditExporter, AuditEventEnvelope};
use async_trait::async_trait;
use std::sync::Arc;
struct MyExporter;
#[async_trait]
impl AuditExporter for MyExporter {
async fn export(&self, envelope: AuditEventEnvelope) {
// forward to SIEM, file, or webhook
}
}
get_event_bus().register_audit_exporter(Arc::new(MyExporter))?;
}
The MultiTransportExporter in schemabound-backend is the reference implementation: it fans
events to stderr (OCSF NDJSON), an HTTP webhook, and a rotating log file, all driven
by environment variables.