Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Event Pipeline

The SCHEMABOUND event pipeline is implemented as a Chain of Responsibility over the global EventBus. Every domain event — query execution, validation failures, session registration, trigger fires — flows through a registered sequence of handlers before any subscribers see it.

Handlers are synchronous, ordered, and can short-circuit the chain by returning HandleOutcome::Stop. The default pipeline always returns HandleOutcome::Continue so the full chain runs for every event.

Handler Trait

#![allow(unused)]
fn main() {
use schemabound::interceptor::{Event, EventHandler, HandleOutcome};

pub struct MyHandler;

impl EventHandler for MyHandler {
    fn handle(&self, event: &Event) -> HandleOutcome {
        println!("[my-handler] {}", event.event_type());
        HandleOutcome::Continue
    }
}
}

Register with the global bus:

#![allow(unused)]
fn main() {
use schemabound::get_event_bus;

get_event_bus().register_handler(Box::new(MyHandler))?;
}

Handlers are invoked in registration order for every dispatched event.

Built-In Handlers (schemabound::handlers)

AuditLogHandler

Writes a one-line JSON audit entry to stderr for every event. This is always the first handler registered in the default gRPC pipeline so that every domain event is recorded before downstream processing.

#![allow(unused)]
fn main() {
use schemabound::AuditLogHandler;

get_event_bus().register_handler(Box::new(AuditLogHandler))?;
// → [audit] {"event_type":"QueryExecuted","db_identifier":"...","query":"..."}
}

QueryMetricsHandler

Tracks cumulative counts of QueryExecuted, QueryValidationFailed, and QueryExecutionError events through lock-free atomic counters. Readable from any thread at any point.

#![allow(unused)]
fn main() {
use schemabound::{SharedHandler, handlers::QueryMetricsHandler};
use std::sync::Arc;

let metrics = Arc::new(QueryMetricsHandler::new());
get_event_bus().register_handler(Box::new(SharedHandler(Arc::clone(&metrics))))?;

// later — read from a health endpoint
let snap = metrics.snapshot();
println!("executed={} failures={} errors={}",
    snap.queries_executed, snap.validation_failures, snap.execution_errors);
}

SessionActivityHandler

Counts SessionRegistered events so the gRPC layer can expose a live session counter without a database round-trip.

#![allow(unused)]
fn main() {
use schemabound::{SharedHandler, handlers::SessionActivityHandler};
use std::sync::Arc;

let activity = Arc::new(SessionActivityHandler::new());
get_event_bus().register_handler(Box::new(SharedHandler(Arc::clone(&activity))))?;

println!("active sessions: {}", activity.session_count());
}

SharedHandler<T>

A newtype wrapper that lets an Arc<T: EventHandler> be registered with the bus without transferring ownership, so the same handle can be retained for metrics reads.

#![allow(unused)]
fn main() {
use schemabound::SharedHandler;

// Arc retained for reading; clone registered with bus
get_event_bus().register_handler(Box::new(SharedHandler(Arc::clone(&my_handler))))?;
}

Default Handler Chain

DefaultHandlerChain bundles QueryMetricsHandler and SessionActivityHandler into a single struct with pre-made Arc handles — the recommended starting point for gRPC services.

#![allow(unused)]
fn main() {
use schemabound::{AuditLogHandler, DefaultHandlerChain, SharedHandler, get_event_bus};

let chain = DefaultHandlerChain::new();
let bus = get_event_bus();

bus.register_handler(Box::new(AuditLogHandler))?;
bus.register_handler(Box::new(SharedHandler(Arc::clone(&chain.query_metrics))))?;
bus.register_handler(Box::new(SharedHandler(Arc::clone(&chain.session_activity))))?;

// Retain chain for health probes
let queries = chain.query_metrics.snapshot().queries_executed;
let sessions = chain.session_activity.session_count();
}

Handler vs. Subscriber

HandlerSubscriber
APIregister_handlerregister_subscriber
OrderingExplicit — registration orderUnordered
Short-circuitHandleOutcome::Stop stops the chainNo chain to stop
Use casesaudit log, metrics, rate limitingcache invalidation, notifications

Use handlers when execution order or short-circuiting matters. Use subscribers for fire-and-forget side effects where order is irrelevant.

Event Reference

All events are variants of the Event enum. The table below shows which events each built-in handler processes:

Event variantAuditLogHandlerQueryMetricsHandlerSessionActivityHandler
QueryExecutedincrements queries_executed
QueryValidationFailedincrements validation_failures
QueryExecutionErrorincrements execution_errors
SessionRegisteredincrements sessions_registered
TriggerFired
ModelChanged
RuntimeAugmentationAuditRecorded
LlmToolCallAuditRecorded
PromptInjectionSignalRaised
(all others)

For full details on LlmToolCallAuditRecorded and PromptInjectionSignalRaised, including OCSF class mapping, transport configuration, and injection policy, see Audit Logging and AI-SPM Integration.

Audit Exporters

In addition to synchronous handlers, SCHEMABOUND provides an async exporter pipeline that runs after every dispatch. Exporters receive an AuditEventEnvelope containing the full event, a tamper-evident SHA-256 hash chain, and distributed trace correlation fields.

Register an exporter alongside handlers:

#![allow(unused)]
fn main() {
use schemabound::{get_event_bus, AuditExporter, AuditEventEnvelope};
use async_trait::async_trait;
use std::sync::Arc;

struct MyExporter;

#[async_trait]
impl AuditExporter for MyExporter {
    async fn export(&self, envelope: AuditEventEnvelope) {
        // forward to SIEM, file, or webhook
    }
}

get_event_bus().register_audit_exporter(Arc::new(MyExporter))?;
}

The MultiTransportExporter in schemabound-backend is the reference implementation: it fans events to stderr (OCSF NDJSON), an HTTP webhook, and a rotating log file, all driven by environment variables.