Edictum
Reference

Audit and Observability

Every contract evaluation in Edictum produces an `AuditEvent`.

AI Assistance

Right page if: you need to configure where audit events go -- built-in sinks (Stdout, File, Collecting, Server), custom sinks, RedactionPolicy, or CompositeSink fan-out. Wrong page if: you need OTel span attributes and dashboard queries -- see https://docs.edictum.ai/docs/reference/telemetry. For the observability setup walkthrough, see https://docs.edictum.ai/docs/guides/observability. Gotcha: if no audit_sink is provided, only the in-memory CollectingAuditSink is active (accessible via `guard.local_sink`). RedactionPolicy auto-detects secrets by value pattern (sk-*, AKIA*, eyJ*) regardless of key name.

Every contract evaluation in Edictum produces an AuditEvent. Audit sinks consume these events and route them to local storage, while OpenTelemetry integration enables routing enforcement spans to any observability backend.


The AuditSink Protocol

Any class that implements the AuditSink protocol can receive audit events. The protocol requires a single async method:

from edictum.audit import AuditSink

class MyCustomSink:
    async def emit(self, event: AuditEvent) -> None:
        # process the event
        ...

Edictum checks conformance at runtime via @runtime_checkable, so there is no need to inherit from a base class. Implement emit and you are done.

Register a sink when constructing your Edictum instance:

from edictum import Edictum
from edictum.audit import FileAuditSink

guard = Edictum(
    audit_sink=FileAuditSink("/var/log/edictum/events.jsonl"),
)

You can also pass a list of sinks — they are automatically wrapped in a CompositeSink:

from edictum.audit import StdoutAuditSink, FileAuditSink

guard = Edictum(
    audit_sink=[StdoutAuditSink(), FileAuditSink("audit.jsonl")],
)

If no audit_sink is provided, only the built-in CollectingAuditSink is used (accessible via guard.local_sink). To also print events to stdout, pass audit_sink=StdoutAuditSink() or set observability.stdout: true in YAML.


AuditEvent Fields

Every audit event contains the following fields:

Identity

FieldTypeDescription
schema_versionstrEvent schema version (currently "0.3.0")
timestampdatetimeUTC timestamp of the event
run_idstrUnique ID for the agent run
call_idstrUnique ID for this specific tool call
call_indexintSequential call number within the run
parent_call_idstr | NoneParent call ID for nested invocations

Tool

FieldTypeDescription
tool_namestrName of the tool being called
tool_argsdictArguments passed to the tool
side_effectstrSide-effect classification: pure, read, write, irreversible
environmentstrDeployment environment (e.g. production, staging)

Principal

FieldTypeDescription
principaldict | NoneIdentity context: user_id, service_id, org_id, role, ticket_ref, claims

Enforcement Decision

FieldTypeDescription
actionAuditActionOne of: call_denied, call_would_deny, call_allowed, call_executed, call_failed, call_approval_requested, call_approval_granted, call_approval_denied, call_approval_timeout. The enum also defines postcondition_warning but the pipeline does not currently emit it.
decision_sourcestr | NoneWhat produced the decision: hook, precondition, session_contract, attempt_limit, operation_limit
decision_namestr | NoneName of the specific hook or contract
reasonstr | NoneHuman-readable denial reason
hooks_evaluatedlist[dict]Each hook with its name, result, and reason
contracts_evaluatedlist[dict]Each contract with name, type, passed, and message

Execution

FieldTypeDescription
tool_successbool | NoneWhether the tool call succeeded (set after execution)
postconditions_passedbool | NoneWhether all postconditions passed
duration_msintTool execution time in milliseconds. Reserved -- always 0 in the current pipeline.
errorstr | NoneError message if the tool failed
result_summarystr | NoneTruncated summary of the tool result. Reserved -- always None in the current pipeline.

Counters

FieldTypeDescription
session_attempt_countintTotal attempts in this session (including denials)
session_execution_countintTotal executions in this session

Policy

FieldTypeDescription
policy_versionstr | NoneSHA-256 hash of the YAML contract file
policy_errorboolTrue if there was an error loading contracts
modestrenforce or observe

Built-in Sinks

StdoutAuditSink

Prints each event as a single JSON line to stdout. Useful for development and for piping into log aggregators.

from edictum.audit import StdoutAuditSink, RedactionPolicy

sink = StdoutAuditSink(redaction=RedactionPolicy())

Parameters:

ParameterTypeDefaultDescription
redactionRedactionPolicy | NoneNoneRedaction policy. When None, a default RedactionPolicy() is created internally.

FileAuditSink

Appends each event as a JSON line to a file. Creates the file if it does not exist. Suitable for local audit logs and offline analysis.

from edictum.audit import FileAuditSink, RedactionPolicy

sink = FileAuditSink(
    path="/var/log/edictum/events.jsonl",
    redaction=RedactionPolicy(),
)

Parameters:

ParameterTypeDefaultDescription
pathstr | Path(required)File path for the JSONL output
redactionRedactionPolicy | NoneNoneRedaction policy. When None, a default RedactionPolicy() is created internally.

CompositeSink

Fan-out sink that emits every event to multiple sinks sequentially. Useful when you need both terminal output and a persistent log file, or any combination of sinks.

from edictum.audit import CompositeSink, StdoutAuditSink, FileAuditSink

sink = CompositeSink([
    StdoutAuditSink(),
    FileAuditSink("/var/log/edictum/events.jsonl"),
])

The Edictum constructor also accepts a list of sinks directly — it auto-wraps them in a CompositeSink:

from edictum import Edictum
from edictum.audit import StdoutAuditSink, FileAuditSink

guard = Edictum(
    audit_sink=[
        StdoutAuditSink(),
        FileAuditSink("audit.jsonl"),
    ],
)

Parameters:

ParameterTypeDefaultDescription
sinkslist[AuditSink](required)One or more sinks to emit to, in order

Sinks are called in order. All sinks are always attempted — a failure in one sink does not prevent the others from receiving the event. If any sinks raise, their exceptions are collected and re-raised together as an ExceptionGroup after all sinks have been tried. ExceptionGroup requires Python 3.11+.

To inspect individual sink errors, use except* syntax:

try:
    await sink.emit(event)
except* Exception as eg:
    for exc in eg.exceptions:
        print(f"Sink error: {exc}")

When to use CompositeSink

ScenarioSinksWho benefits
Dev debugging + persistent audit trailStdoutAuditSink + FileAuditSinkDeveloper debugging locally — real-time terminal output plus a .jsonl file for later analysis
Multi-destination complianceFileAuditSink + custom sinkPlatform team — file for regulatory retention plus a custom sink pushing to an internal dashboard
Gradual migrationStdoutAuditSink + new sinkAnyone migrating — keep existing stdout while adding a new destination, without changing constructor code
Custom sink stackingFileAuditSink + KafkaAuditSinkCompliance — redundant audit trails from a one-liner, each sink independently processes the same events

CompositeSink is about the structured event log, not observability traces. OTel spans operate independently and are complementary — use both in production.

CollectingAuditSink

In-memory sink for programmatic inspection of governance decisions. Stores events in a bounded ring buffer with mark-based windowed queries.

from edictum import Edictum
from edictum.audit import AuditAction

guard = Edictum.from_yaml("contracts.yaml")

# Take a mark before the tool call
mark = guard.local_sink.mark()

# ... tool call happens via adapter or guard.run() ...

# Inspect what the pipeline decided
events = guard.local_sink.since_mark(mark)
for ev in events:
    if ev.action == AuditAction.CALL_DENIED:
        print(f"Denied: {ev.tool_name}")

Parameters:

ParameterTypeDefaultDescription
max_eventsint50_000Maximum events in the ring buffer. Oldest events are evicted when full.

When to use CollectingAuditSink

ScenarioWho benefits
Adapter demos — classify each tool call's outcome (denied, redacted, allowed) for displayDevelopers building demo scripts that show governance in action
Programmatic post-hoc inspection — react to governance decisions in application code (logging, conditional logic, routing)Application developers integrating Edictum into agent workflows
Server mode debugging — inspect what the pipeline decided locally without querying the serverDevelopers troubleshooting contract behavior when connected to edictum-console
Testing — assert that specific audit events were emitted during a test scenarioTest authors verifying contract behavior end-to-end

Every Edictum instance has a local_sink property that returns the always-present CollectingAuditSink. It works identically regardless of construction method (__init__, from_yaml(), from_server()).

Mark-Based Queries

Marks are absolute positions in the event stream. Use mark() before a tool call and since_mark() after to get exactly the events from that window:

mark = guard.local_sink.mark()
# ... tool calls ...
events = guard.local_sink.since_mark(mark)

If events referenced by a mark have been evicted from the ring buffer (because more than max_events events were emitted since the mark was taken), since_mark() raises MarkEvictedError:

from edictum.audit import MarkEvictedError

try:
    events = guard.local_sink.since_mark(old_mark)
except MarkEvictedError:
    # Events were evicted — reset the mark
    old_mark = guard.local_sink.mark()

Other Methods

MethodDescription
eventsAll buffered events (returns a defensive copy)
last()Most recent event (raises IndexError if empty)
filter(action)Events matching a specific AuditAction
clear()Discard all events. Pre-clear marks raise MarkEvictedError.

ServerAuditSink (edictum[server])

Batches audit events and sends them to edictum-console via HTTP. Events are buffered in memory and flushed when the batch is full or after a timer interval.

pip install edictum[server]
from edictum.server import EdictumServerClient, ServerAuditSink

client = EdictumServerClient(
    "https://edictum.example.com",
    api_key="...",
    agent_id="my-agent",
    env="production",
    bundle_name="devops-agent",
)
sink = ServerAuditSink(client, batch_size=50, flush_interval=5.0)

guard = Edictum(audit_sink=sink)

Parameters:

ParameterTypeDefaultDescription
clientEdictumServerClient(required)Configured server client
batch_sizeint50Flush when this many events are buffered
flush_intervalfloat5.0Seconds between automatic flushes
max_buffer_sizeint10_000Maximum events held in memory. New events are dropped when the buffer is full.

Events are mapped to the server's ingest format (POST /api/v1/events) with call_id, agent_id, tool_name, verdict, mode, timestamp, and a payload dict containing the full enforcement context including bundle_name and environment (falls back to the client's env if not set on the event). If a flush fails, events are retained in the buffer for the next attempt.

Call await sink.close() to flush remaining events and stop the background timer.


OpenTelemetry Span Emission

For production observability, Edictum emits edictum.* spans for every enforcement decision via OpenTelemetry. These spans can be routed to any OTel-compatible backend -- Datadog, Splunk, Grafana, Jaeger, or any service that accepts OTLP.

pip install edictum[otel]

Programmatic Configuration

from edictum.otel import configure_otel

configure_otel(
    service_name="my-agent",
    endpoint="http://localhost:4317",
    protocol="grpc",  # or "http"
    resource_attributes={"deployment.environment": "production"},
)

Parameters:

ParameterTypeDefaultDescription
service_namestr"edictum-agent"OTel service name resource attribute
endpointstr"http://localhost:4317"OTLP collector endpoint
protocolstr"grpc"Transport protocol: "grpc" or "http"
resource_attributesdict | NoneNoneAdditional OTel resource attributes
edictum_versionstr | NoneNoneEdictum version tag
forceboolFalseForce re-initialization even if already configured
insecureboolTrueUse insecure (non-TLS) connection to the collector

Standard OTel environment variables (OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_PROTOCOL, OTEL_SERVICE_NAME, OTEL_RESOURCE_ATTRIBUTES) override the programmatic values when set.

YAML Configuration

The observability block in a contract bundle configures OTel alongside the local sinks:

apiVersion: edictum/v1
kind: ContractBundle

metadata:
  name: my-policy

observability:
  otel:
    enabled: true
    endpoint: "http://localhost:4317"
    protocol: grpc
    service_name: my-agent
    resource_attributes:
      deployment.environment: production
  stdout: true
  file: /var/log/edictum/events.jsonl

defaults:
  mode: enforce

contracts:
  - id: block-sensitive-reads
    type: pre
    tool: read_file
    when:
      args.path:
        contains_any: [".env", ".secret", "credentials"]
    then:
      effect: deny
      message: "Sensitive file '{args.path}' denied."
      tags: [secrets]

Routing to Specific Backends

Edictum emits standard OTLP spans. Use an OTel Collector to route them to any backend:

Datadog: Point the OTel Collector at the Datadog Agent or use the Datadog exporter in the collector config. Enforcement spans appear in Datadog APM traces.

Splunk: Use the Splunk HEC exporter in the OTel Collector config. Spans arrive in Splunk Observability Cloud with all edictum.* attributes intact.

Grafana / Tempo: Send OTLP directly to Grafana Tempo or via the OTel Collector. Enforcement spans appear alongside application traces.

Jaeger: Point the OTLP endpoint at a Jaeger collector. No additional configuration needed.

Graceful Degradation

If opentelemetry is not installed, all OTel instrumentation degrades to a silent no-op. No exceptions are raised and there is no performance cost beyond a single boolean check per call. The local sinks (StdoutAuditSink, FileAuditSink) continue to work independently of OTel availability.


Redaction Policy

All sinks support automatic redaction of sensitive data via RedactionPolicy. If no explicit policy is provided, StdoutAuditSink and FileAuditSink create a default policy automatically. OTel span attributes are emitted after redaction is applied to the underlying AuditEvent.

Sensitive Key Detection

Keys are normalized to lowercase and matched against a built-in set:

password, secret, token, api_key, apikey, api-key, authorization, auth, credentials, private_key, privatekey, access_token, refresh_token, client_secret, connection_string, database_url, db_password, ssh_key, passphrase

Additionally, any key containing token, key, secret, password, or credential as a substring is treated as sensitive.

Secret Value Pattern Detection

Values are checked against patterns for common secret formats, regardless of the key name:

PatternExample
sk-*OpenAI API keys
AKIA*AWS access key IDs
eyJ*JWT tokens
ghp_*GitHub personal access tokens
xox[bpas]-*Slack tokens

Bash Command Redaction

Bash commands in tool_args are scrubbed for inline secrets:

  • export SECRET_KEY=abc123 becomes export SECRET_KEY=[REDACTED]
  • -p mypassword becomes -p [REDACTED]
  • https://user:pass@host becomes https://user:[REDACTED]@host

Payload Size Cap

Payloads exceeding 32 KB are truncated. The tool_args and result_summary fields are replaced with a marker indicating the cap was hit. This prevents audit sinks from dropping events due to oversized payloads.

Custom Redaction

from edictum.audit import RedactionPolicy

policy = RedactionPolicy(
    sensitive_keys={"my_custom_key", "internal_token"},  # merged with defaults (union)
    custom_patterns=[
        (r"(MY_PREFIX_)\S+", r"\1[REDACTED]"),           # custom regex substitutions
    ],
    detect_secret_values=True,                            # enable/disable value pattern detection
)

Custom Sinks

Implement the AuditSink protocol to route events to any destination:

import json
from edictum.audit import AuditEvent, RedactionPolicy

class KafkaAuditSink:
    """Send audit events to a Kafka topic."""

    def __init__(self, producer, topic: str, redaction: RedactionPolicy | None = None):
        self._producer = producer
        self._topic = topic
        self._redaction = redaction or RedactionPolicy()

    async def emit(self, event: AuditEvent) -> None:
        from dataclasses import asdict
        data = asdict(event)
        data["timestamp"] = event.timestamp.isoformat()
        data["action"] = event.action.value
        data = self._redaction.cap_payload(data)
        await self._producer.send(
            self._topic,
            json.dumps(data, default=str).encode(),
        )

Then register it:

guard = Edictum(
    audit_sink=KafkaAuditSink(producer, "edictum-events"),
)

The AuditSink protocol is @runtime_checkable, so Edictum validates your sink at registration time. If emit is missing or has the wrong signature, you get an immediate TypeError rather than a silent failure at event time.

Last updated on

On this page