Audit and Observability
Every contract evaluation in Edictum produces an `AuditEvent`.
Right page if: you need to configure where audit events go -- built-in sinks (Stdout, File, Collecting, Server), custom sinks, RedactionPolicy, or CompositeSink fan-out. Wrong page if: you need OTel span attributes and dashboard queries -- see https://docs.edictum.ai/docs/reference/telemetry. For the observability setup walkthrough, see https://docs.edictum.ai/docs/guides/observability. Gotcha: if no audit_sink is provided, only the in-memory CollectingAuditSink is active (accessible via `guard.local_sink`). RedactionPolicy auto-detects secrets by value pattern (sk-*, AKIA*, eyJ*) regardless of key name.
Every contract evaluation in Edictum produces an AuditEvent. Audit sinks consume
these events and route them to local storage, while OpenTelemetry integration
enables routing enforcement spans to any observability backend.
The AuditSink Protocol
Any class that implements the AuditSink protocol can receive audit events. The
protocol requires a single async method:
from edictum.audit import AuditSink
class MyCustomSink:
async def emit(self, event: AuditEvent) -> None:
# process the event
...Edictum checks conformance at runtime via @runtime_checkable, so there is no need
to inherit from a base class. Implement emit and you are done.
Register a sink when constructing your Edictum instance:
from edictum import Edictum
from edictum.audit import FileAuditSink
guard = Edictum(
audit_sink=FileAuditSink("/var/log/edictum/events.jsonl"),
)You can also pass a list of sinks — they are automatically wrapped in a
CompositeSink:
from edictum.audit import StdoutAuditSink, FileAuditSink
guard = Edictum(
audit_sink=[StdoutAuditSink(), FileAuditSink("audit.jsonl")],
)If no audit_sink is provided, only the built-in CollectingAuditSink is used
(accessible via guard.local_sink). To also print events to stdout, pass
audit_sink=StdoutAuditSink() or set observability.stdout: true in YAML.
AuditEvent Fields
Every audit event contains the following fields:
Identity
| Field | Type | Description |
|---|---|---|
schema_version | str | Event schema version (currently "0.3.0") |
timestamp | datetime | UTC timestamp of the event |
run_id | str | Unique ID for the agent run |
call_id | str | Unique ID for this specific tool call |
call_index | int | Sequential call number within the run |
parent_call_id | str | None | Parent call ID for nested invocations |
Tool
| Field | Type | Description |
|---|---|---|
tool_name | str | Name of the tool being called |
tool_args | dict | Arguments passed to the tool |
side_effect | str | Side-effect classification: pure, read, write, irreversible |
environment | str | Deployment environment (e.g. production, staging) |
Principal
| Field | Type | Description |
|---|---|---|
principal | dict | None | Identity context: user_id, service_id, org_id, role, ticket_ref, claims |
Enforcement Decision
| Field | Type | Description |
|---|---|---|
action | AuditAction | One of: call_denied, call_would_deny, call_allowed, call_executed, call_failed, call_approval_requested, call_approval_granted, call_approval_denied, call_approval_timeout. The enum also defines postcondition_warning but the pipeline does not currently emit it. |
decision_source | str | None | What produced the decision: hook, precondition, session_contract, attempt_limit, operation_limit |
decision_name | str | None | Name of the specific hook or contract |
reason | str | None | Human-readable denial reason |
hooks_evaluated | list[dict] | Each hook with its name, result, and reason |
contracts_evaluated | list[dict] | Each contract with name, type, passed, and message |
Execution
| Field | Type | Description |
|---|---|---|
tool_success | bool | None | Whether the tool call succeeded (set after execution) |
postconditions_passed | bool | None | Whether all postconditions passed |
duration_ms | int | Tool execution time in milliseconds. Reserved -- always 0 in the current pipeline. |
error | str | None | Error message if the tool failed |
result_summary | str | None | Truncated summary of the tool result. Reserved -- always None in the current pipeline. |
Counters
| Field | Type | Description |
|---|---|---|
session_attempt_count | int | Total attempts in this session (including denials) |
session_execution_count | int | Total executions in this session |
Policy
| Field | Type | Description |
|---|---|---|
policy_version | str | None | SHA-256 hash of the YAML contract file |
policy_error | bool | True if there was an error loading contracts |
mode | str | enforce or observe |
Built-in Sinks
StdoutAuditSink
Prints each event as a single JSON line to stdout. Useful for development and for piping into log aggregators.
from edictum.audit import StdoutAuditSink, RedactionPolicy
sink = StdoutAuditSink(redaction=RedactionPolicy())Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
redaction | RedactionPolicy | None | None | Redaction policy. When None, a default RedactionPolicy() is created internally. |
FileAuditSink
Appends each event as a JSON line to a file. Creates the file if it does not exist. Suitable for local audit logs and offline analysis.
from edictum.audit import FileAuditSink, RedactionPolicy
sink = FileAuditSink(
path="/var/log/edictum/events.jsonl",
redaction=RedactionPolicy(),
)Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
path | str | Path | (required) | File path for the JSONL output |
redaction | RedactionPolicy | None | None | Redaction policy. When None, a default RedactionPolicy() is created internally. |
CompositeSink
Fan-out sink that emits every event to multiple sinks sequentially. Useful when you need both terminal output and a persistent log file, or any combination of sinks.
from edictum.audit import CompositeSink, StdoutAuditSink, FileAuditSink
sink = CompositeSink([
StdoutAuditSink(),
FileAuditSink("/var/log/edictum/events.jsonl"),
])The Edictum constructor also accepts a list of sinks directly — it auto-wraps
them in a CompositeSink:
from edictum import Edictum
from edictum.audit import StdoutAuditSink, FileAuditSink
guard = Edictum(
audit_sink=[
StdoutAuditSink(),
FileAuditSink("audit.jsonl"),
],
)Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
sinks | list[AuditSink] | (required) | One or more sinks to emit to, in order |
Sinks are called in order. All sinks are always attempted — a failure in one
sink does not prevent the others from receiving the event. If any sinks raise,
their exceptions are collected and re-raised together as an ExceptionGroup
after all sinks have been tried. ExceptionGroup requires Python 3.11+.
To inspect individual sink errors, use except* syntax:
try:
await sink.emit(event)
except* Exception as eg:
for exc in eg.exceptions:
print(f"Sink error: {exc}")When to use CompositeSink
| Scenario | Sinks | Who benefits |
|---|---|---|
| Dev debugging + persistent audit trail | StdoutAuditSink + FileAuditSink | Developer debugging locally — real-time terminal output plus a .jsonl file for later analysis |
| Multi-destination compliance | FileAuditSink + custom sink | Platform team — file for regulatory retention plus a custom sink pushing to an internal dashboard |
| Gradual migration | StdoutAuditSink + new sink | Anyone migrating — keep existing stdout while adding a new destination, without changing constructor code |
| Custom sink stacking | FileAuditSink + KafkaAuditSink | Compliance — redundant audit trails from a one-liner, each sink independently processes the same events |
CompositeSink is about the structured event log, not observability traces. OTel spans operate independently and are complementary — use both in production.
CollectingAuditSink
In-memory sink for programmatic inspection of governance decisions. Stores events in a bounded ring buffer with mark-based windowed queries.
from edictum import Edictum
from edictum.audit import AuditAction
guard = Edictum.from_yaml("contracts.yaml")
# Take a mark before the tool call
mark = guard.local_sink.mark()
# ... tool call happens via adapter or guard.run() ...
# Inspect what the pipeline decided
events = guard.local_sink.since_mark(mark)
for ev in events:
if ev.action == AuditAction.CALL_DENIED:
print(f"Denied: {ev.tool_name}")Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
max_events | int | 50_000 | Maximum events in the ring buffer. Oldest events are evicted when full. |
When to use CollectingAuditSink
| Scenario | Who benefits |
|---|---|
| Adapter demos — classify each tool call's outcome (denied, redacted, allowed) for display | Developers building demo scripts that show governance in action |
| Programmatic post-hoc inspection — react to governance decisions in application code (logging, conditional logic, routing) | Application developers integrating Edictum into agent workflows |
| Server mode debugging — inspect what the pipeline decided locally without querying the server | Developers troubleshooting contract behavior when connected to edictum-console |
| Testing — assert that specific audit events were emitted during a test scenario | Test authors verifying contract behavior end-to-end |
Every Edictum instance has a local_sink property that returns the always-present
CollectingAuditSink. It works identically regardless of construction method
(__init__, from_yaml(), from_server()).
Mark-Based Queries
Marks are absolute positions in the event stream. Use mark() before a tool call
and since_mark() after to get exactly the events from that window:
mark = guard.local_sink.mark()
# ... tool calls ...
events = guard.local_sink.since_mark(mark)If events referenced by a mark have been evicted from the ring buffer (because
more than max_events events were emitted since the mark was taken),
since_mark() raises MarkEvictedError:
from edictum.audit import MarkEvictedError
try:
events = guard.local_sink.since_mark(old_mark)
except MarkEvictedError:
# Events were evicted — reset the mark
old_mark = guard.local_sink.mark()Other Methods
| Method | Description |
|---|---|
events | All buffered events (returns a defensive copy) |
last() | Most recent event (raises IndexError if empty) |
filter(action) | Events matching a specific AuditAction |
clear() | Discard all events. Pre-clear marks raise MarkEvictedError. |
ServerAuditSink (edictum[server])
Batches audit events and sends them to edictum-console via HTTP. Events are buffered in memory and flushed when the batch is full or after a timer interval.
pip install edictum[server]from edictum.server import EdictumServerClient, ServerAuditSink
client = EdictumServerClient(
"https://edictum.example.com",
api_key="...",
agent_id="my-agent",
env="production",
bundle_name="devops-agent",
)
sink = ServerAuditSink(client, batch_size=50, flush_interval=5.0)
guard = Edictum(audit_sink=sink)Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
client | EdictumServerClient | (required) | Configured server client |
batch_size | int | 50 | Flush when this many events are buffered |
flush_interval | float | 5.0 | Seconds between automatic flushes |
max_buffer_size | int | 10_000 | Maximum events held in memory. New events are dropped when the buffer is full. |
Events are mapped to the server's ingest format (POST /api/v1/events) with
call_id, agent_id, tool_name, verdict, mode, timestamp, and a
payload dict containing the full enforcement context including bundle_name
and environment (falls back to the client's env if not set on the event).
If a flush fails, events are retained in the buffer for the next attempt.
Call await sink.close() to flush remaining events and stop the background timer.
OpenTelemetry Span Emission
For production observability, Edictum emits edictum.* spans for every enforcement
decision via OpenTelemetry. These spans can be routed to any OTel-compatible
backend -- Datadog, Splunk, Grafana, Jaeger, or any service that accepts OTLP.
pip install edictum[otel]Programmatic Configuration
from edictum.otel import configure_otel
configure_otel(
service_name="my-agent",
endpoint="http://localhost:4317",
protocol="grpc", # or "http"
resource_attributes={"deployment.environment": "production"},
)Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
service_name | str | "edictum-agent" | OTel service name resource attribute |
endpoint | str | "http://localhost:4317" | OTLP collector endpoint |
protocol | str | "grpc" | Transport protocol: "grpc" or "http" |
resource_attributes | dict | None | None | Additional OTel resource attributes |
edictum_version | str | None | None | Edictum version tag |
force | bool | False | Force re-initialization even if already configured |
insecure | bool | True | Use insecure (non-TLS) connection to the collector |
Standard OTel environment variables (OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_PROTOCOL, OTEL_SERVICE_NAME,
OTEL_RESOURCE_ATTRIBUTES) override the programmatic values when set.
YAML Configuration
The observability block in a contract bundle configures OTel alongside the
local sinks:
apiVersion: edictum/v1
kind: ContractBundle
metadata:
name: my-policy
observability:
otel:
enabled: true
endpoint: "http://localhost:4317"
protocol: grpc
service_name: my-agent
resource_attributes:
deployment.environment: production
stdout: true
file: /var/log/edictum/events.jsonl
defaults:
mode: enforce
contracts:
- id: block-sensitive-reads
type: pre
tool: read_file
when:
args.path:
contains_any: [".env", ".secret", "credentials"]
then:
effect: deny
message: "Sensitive file '{args.path}' denied."
tags: [secrets]Routing to Specific Backends
Edictum emits standard OTLP spans. Use an OTel Collector to route them to any backend:
Datadog: Point the OTel Collector at the Datadog Agent or use the Datadog exporter in the collector config. Enforcement spans appear in Datadog APM traces.
Splunk: Use the Splunk HEC exporter in the OTel Collector config. Spans
arrive in Splunk Observability Cloud with all edictum.* attributes intact.
Grafana / Tempo: Send OTLP directly to Grafana Tempo or via the OTel Collector. Enforcement spans appear alongside application traces.
Jaeger: Point the OTLP endpoint at a Jaeger collector. No additional configuration needed.
Graceful Degradation
If opentelemetry is not installed, all OTel instrumentation degrades to a
silent no-op. No exceptions are raised and there is no performance cost beyond
a single boolean check per call. The local sinks (StdoutAuditSink,
FileAuditSink) continue to work independently of OTel availability.
Redaction Policy
All sinks support automatic redaction of sensitive data via RedactionPolicy. If
no explicit policy is provided, StdoutAuditSink and FileAuditSink create a
default policy automatically. OTel span attributes are emitted after redaction
is applied to the underlying AuditEvent.
Sensitive Key Detection
Keys are normalized to lowercase and matched against a built-in set:
password, secret, token, api_key, apikey, api-key, authorization,
auth, credentials, private_key, privatekey, access_token,
refresh_token, client_secret, connection_string, database_url,
db_password, ssh_key, passphrase
Additionally, any key containing token, key, secret, password, or
credential as a substring is treated as sensitive.
Secret Value Pattern Detection
Values are checked against patterns for common secret formats, regardless of the key name:
| Pattern | Example |
|---|---|
sk-* | OpenAI API keys |
AKIA* | AWS access key IDs |
eyJ* | JWT tokens |
ghp_* | GitHub personal access tokens |
xox[bpas]-* | Slack tokens |
Bash Command Redaction
Bash commands in tool_args are scrubbed for inline secrets:
export SECRET_KEY=abc123becomesexport SECRET_KEY=[REDACTED]-p mypasswordbecomes-p [REDACTED]https://user:pass@hostbecomeshttps://user:[REDACTED]@host
Payload Size Cap
Payloads exceeding 32 KB are truncated. The tool_args and result_summary fields
are replaced with a marker indicating the cap was hit. This prevents audit sinks from
dropping events due to oversized payloads.
Custom Redaction
from edictum.audit import RedactionPolicy
policy = RedactionPolicy(
sensitive_keys={"my_custom_key", "internal_token"}, # merged with defaults (union)
custom_patterns=[
(r"(MY_PREFIX_)\S+", r"\1[REDACTED]"), # custom regex substitutions
],
detect_secret_values=True, # enable/disable value pattern detection
)Custom Sinks
Implement the AuditSink protocol to route events to any destination:
import json
from edictum.audit import AuditEvent, RedactionPolicy
class KafkaAuditSink:
"""Send audit events to a Kafka topic."""
def __init__(self, producer, topic: str, redaction: RedactionPolicy | None = None):
self._producer = producer
self._topic = topic
self._redaction = redaction or RedactionPolicy()
async def emit(self, event: AuditEvent) -> None:
from dataclasses import asdict
data = asdict(event)
data["timestamp"] = event.timestamp.isoformat()
data["action"] = event.action.value
data = self._redaction.cap_payload(data)
await self._producer.send(
self._topic,
json.dumps(data, default=str).encode(),
)Then register it:
guard = Edictum(
audit_sink=KafkaAuditSink(producer, "edictum-events"),
)The AuditSink protocol is @runtime_checkable, so Edictum validates your
sink at registration time. If emit is missing or has the wrong signature,
you get an immediate TypeError rather than a silent failure at event time.
Last updated on