Skip to content

Execution Evidence

Execution evidence provides cryptographic proof of workflow execution in Adaptive Sentience. Every tool invocation generates tamper-proof evidence including trace IDs, execution paths, signatures, and timestamps - enabling audit compliance, debugging, and incident investigation.


Overview

Problem: In distributed systems, it's difficult to prove what happened, when, and by whom. Compliance requirements demand audit trails, but traditional logging is vulnerable to tampering and doesn't provide cryptographic guarantees.

Solution: Adaptive Sentience generates cryptographic execution evidence for every tool invocation:

  • Trace IDs: Unique identifier for request tracking
  • Execution Path: Which nodes handled the request
  • Signatures: Cryptographic proof from executing nodes
  • Timestamps: When execution occurred
  • Result Hash: Tamper detection for outputs
  • Verification Status: Whether all signatures validated

Core Principles

1. Immutable Evidence

Execution evidence is cryptographically signed and cannot be tampered with:

# Evidence includes signature from executing node
evidence = {
    "trace_id": "trace:a1b2c3d4",
    "execution_path": ["local:abc123"],
    "node_signature": "base64_signature...",
    "result_hash": "sha256:e5f6g7h8",
    "timestamp": "2024-01-15T10:00:00Z"
}

# Any tampering invalidates the signature

2. Complete Audit Trail

Every request generates evidence, even failures:

# Successful execution
{
  "ok": true,
  "verified": true,
  "trace_id": "trace:success123"
}

# Failed execution (still has evidence)
{
  "ok": false,
  "verified": true,
  "trace_id": "trace:failed456",
  "error": "Tool execution failed"
}

3. Privacy-Preserving

Evidence proves execution occurred without revealing sensitive data:

# Evidence contains result hash, not actual result
evidence = {
    "result_hash": "sha256:a1b2c3d4",  # Hash of result
    # Actual result not in evidence (may contain PII)
}

Evidence Structure

Basic Evidence

{
  "trace_id": "trace:a1b2c3d4e5f6",
  "execution_path": ["local:abc123"],
  "verified": true,
  "degraded": false,
  "timestamp": "2024-01-15T10:00:00Z"
}

Full Evidence with Signatures

{
  "trace_id": "trace:a1b2c3d4e5f6",
  "execution_path": ["local:abc123", "local:def456"],
  "verified": true,
  "degraded": false,
  "signatures": [
    {
      "node_id": "local:abc123",
      "signature": "base64_ed25519_signature...",
      "timestamp": "2024-01-15T10:00:00Z",
      "tool_hash": "sha256:tool123",
      "tool_version": "1.0.0"
    },
    {
      "node_id": "local:def456",
      "signature": "base64_ed25519_signature...",
      "timestamp": "2024-01-15T10:00:05Z",
      "tool_hash": "sha256:tool456",
      "tool_version": "1.1.0"
    }
  ],
  "result_hash": "sha256:result789",
  "latency_ms": 145,
  "metadata": {
    "client_id": "client:web_app",
    "workflow_name": "pii_redaction_workflow"
  }
}

Evidence Fields

Field Type Description
trace_id string Unique identifier (format: trace:<uuid>)
execution_path array Node IDs that handled request
verified boolean Whether all signatures validated
degraded boolean Whether fallback nodes were used
signatures array Cryptographic signatures from nodes
result_hash string SHA256 hash of result
timestamp string ISO 8601 timestamp
latency_ms integer Total execution time
metadata object Application-specific metadata

Trace IDs

What is a Trace ID?

A trace ID uniquely identifies a request through its entire lifecycle:

Client Request → trace:a1b2c3d4 → Gateway → Edge Node → Result

All logs, evidence, and audit entries reference the same trace ID.

Trace ID Format

trace:<12-character-hex>

Examples:
trace:a1b2c3d4e5f6
trace:1a2b3c4d5e6f

Generating Trace IDs

import uuid

def generate_trace_id():
    """Generate unique trace ID."""
    return f"trace:{uuid.uuid4().hex[:12]}"

# Example
trace_id = generate_trace_id()
print(trace_id)  # trace:a1b2c3d4e5f6

Using Trace IDs

In tool call request:

{
  "tool_name": "pii_redact",
  "tool_args": {...},
  "trace_id": "trace:a1b2c3d4e5f6"  // Optional - gateway generates if not provided
}

In response:

{
  "ok": true,
  "trace_id": "trace:a1b2c3d4e5f6",
  "result": {...}
}

In audit logs:

{"timestamp": "2024-01-15T10:00:00Z", "trace_id": "trace:a1b2c3d4", "event": "plan"}
{"timestamp": "2024-01-15T10:00:01Z", "trace_id": "trace:a1b2c3d4", "event": "execution"}

Execution Path

What is an Execution Path?

The execution path tracks which nodes handled a request:

{
  "execution_path": ["gateway:abc123", "local:def456"]
}

This shows: 1. Request received by gateway:abc123 2. Forwarded to local:def456 for execution

Multi-Step Workflows

For workflows with multiple steps:

{
  "execution_path": [
    "gateway:abc123",      // Orchestrator
    "local:def456",        // Step 1: PII redaction
    "local:ghi789",        // Step 2: Summarization
    "local:def456"         // Step 3: Validation
  ]
}

Degraded Execution

When primary nodes are unavailable, fallback nodes are used:

{
  "execution_path": ["local:fallback001"],
  "degraded": true,  // Indicates fallback was used
  "preferred_node": "local:primary001"  // Original target
}

Cryptographic Signatures

Node Signatures

Each node signs its execution evidence with its Ed25519 private key:

import json
import hashlib
from cryptography.hazmat.primitives.asymmetric import ed25519

def generate_execution_signature(private_key, evidence):
    """Sign execution evidence.

    Args:
        private_key: Ed25519PrivateKey
        evidence: Evidence dict (without signature)

    Returns:
        Base64-encoded signature
    """
    # Canonical JSON
    canonical = json.dumps(evidence, sort_keys=True)

    # Sign
    signature = private_key.sign(canonical.encode())

    # Encode
    import base64
    return base64.b64encode(signature).decode()

Signature Contents

Each signature covers:

{
  "trace_id": "trace:a1b2c3d4",
  "node_id": "local:abc123",
  "tool_name": "pii_redact",
  "tool_version": "1.0.0",
  "tool_hash": "sha256:tool123",
  "result_hash": "sha256:result456",
  "timestamp": "2024-01-15T10:00:00Z",
  "latency_ms": 42
}

The signature does not include: - Actual input/output (may contain PII) - Capability tokens (security) - Intermediate steps

Signature Verification

Verifying signatures:

def verify_execution_signature(public_key, evidence, signature):
    """Verify execution signature.

    Args:
        public_key: Ed25519PublicKey
        evidence: Evidence dict (without signature)
        signature: Base64-encoded signature

    Returns:
        True if valid, False otherwise
    """
    try:
        # Canonical JSON
        canonical = json.dumps(evidence, sort_keys=True)

        # Decode signature
        import base64
        signature_bytes = base64.b64decode(signature)

        # Verify
        public_key.verify(signature_bytes, canonical.encode())
        return True

    except Exception as e:
        print(f"Signature verification failed: {e}")
        return False

Result Hashing

Why Hash Results?

Result hashes provide tamper detection without revealing sensitive data:

# PII in result
result = {
    "redacted_text": "Contact [REDACTED]",
    "redactions": [{"type": "email", "count": 1}]
}

# Evidence contains hash only
result_hash = hashlib.sha256(
    json.dumps(result, sort_keys=True).encode()
).hexdigest()

evidence = {
    "result_hash": f"sha256:{result_hash}",
    # Actual result NOT in evidence
}

Computing Result Hashes

import hashlib
import json

def compute_result_hash(result):
    """Compute SHA256 hash of result.

    Args:
        result: Result dict

    Returns:
        Hash string (format: sha256:<hex>)
    """
    canonical = json.dumps(result, sort_keys=True)
    hash_bytes = hashlib.sha256(canonical.encode()).digest()
    return f"sha256:{hash_bytes.hex()}"

Verifying Result Integrity

To verify a result matches the evidence:

def verify_result_integrity(result, evidence):
    """Verify result matches evidence hash.

    Args:
        result: Actual result dict
        evidence: Evidence with result_hash

    Returns:
        True if hash matches
    """
    expected_hash = evidence["result_hash"]
    actual_hash = compute_result_hash(result)
    return expected_hash == actual_hash

Audit Logging

Audit Log Structure

Audit logs are stored as JSONL (JSON Lines):

{"timestamp": "2024-01-15T10:00:00Z", "trace_id": "trace:abc", "event": "plan", ...}
{"timestamp": "2024-01-15T10:00:01Z", "trace_id": "trace:abc", "event": "execution", ...}
{"timestamp": "2024-01-15T10:00:15Z", "trace_id": "trace:def", "event": "plan", ...}

Each line is a complete JSON object.

Audit Log Location

# Default location
./audit/audit.log

# Custom location
export AUDIT_LOG_PATH=/var/log/adaptive_sentience/audit.log

Audit Events

Plan Event

Logged when a workflow is planned:

{
  "timestamp": "2024-01-15T10:00:00Z",
  "trace_id": "trace:a1b2c3d4",
  "event": "plan",
  "workflow_name": "pii_redaction_workflow",
  "user_text": "Redact PII from customer report",
  "public_reasoning": "Selected PII redaction workflow",
  "internal_reasoning": "Chain-of-thought reasoning...",
  "policy": {
    "privacy_strict": true,
    "schema_strict": true
  }
}

Execution Event

Logged when a workflow executes:

{
  "timestamp": "2024-01-15T10:00:01Z",
  "trace_id": "trace:a1b2c3d4",
  "event": "execution",
  "workflow_name": "pii_redaction_workflow",
  "ok": true,
  "degraded": false,
  "verified": true,
  "steps_count": 3,
  "errors": []
}

Custom Events

Log application-specific events:

from audit.audit_log import AuditLogger

audit_logger = AuditLogger()

audit_logger.log_custom("token_issued", {
    "token_id": "token:abc123",
    "capabilities": ["tool:pii_redact"],
    "issued_to": "client:web_app"
})

Querying Audit Logs

By trace ID:

audit_logger = AuditLogger()

# Get all events for a trace ID
events = audit_logger.read_logs(trace_id="trace:a1b2c3d4")

for event in events:
    print(f"{event['timestamp']} - {event['event']}")

By event type:

# Get all execution events
executions = audit_logger.read_logs(event_type="execution")

# Get last 10 execution events
recent = audit_logger.read_logs(event_type="execution", limit=10)

Compliance Use Cases

1. HIPAA Compliance

Requirement: Audit trail of all PHI access.

Solution:

# Log PHI access
audit_logger.log_custom("phi_access", {
    "trace_id": trace_id,
    "patient_id": patient_id,
    "accessed_by": user_id,
    "tool_name": tool_name,
    "result_hash": result_hash  # Hash, not actual PHI
})

Evidence: - Who accessed PHI (user_id) - When (timestamp) - What operation (tool_name) - Proof of execution (signatures)

2. SOC 2 Compliance

Requirement: Demonstrate security controls.

Solution:

# Log capability token validation
audit_logger.log_custom("token_validation", {
    "trace_id": trace_id,
    "token_id": token_id,
    "capabilities_required": ["tool:pii_redact"],
    "capabilities_provided": token["capabilities"],
    "authorized": True
})

Evidence: - Capability-based access control - Least privilege enforcement - Audit trail of authorizations

3. GDPR Compliance

Requirement: Demonstrate data processing activities.

Solution:

# Log PII processing
audit_logger.log_custom("pii_processing", {
    "trace_id": trace_id,
    "tool_name": "pii_redact",
    "purpose": "Customer support ticket redaction",
    "legal_basis": "Legitimate interest",
    "data_subject_consent": True
})

Evidence: - What PII was processed (via tool_name) - Why (purpose, legal_basis) - When (timestamp) - Consent status

4. Financial Services (PCI DSS)

Requirement: Audit trail of payment data access.

Solution:

# Log payment data access
audit_logger.log_custom("payment_data_access", {
    "trace_id": trace_id,
    "transaction_id": txn_id,
    "tool_name": "validate_payment",
    "result_hash": result_hash,
    "pci_scope": True
})

Evidence: - Payment data access logged - Cryptographic proof (signatures) - Tamper-proof (Ed25519 signatures)


Evidence Retention

Retention Policies

Configure evidence retention based on compliance requirements:

# retention_policy.py
RETENTION_POLICIES = {
    "hipaa": {
        "audit_logs": "6 years",
        "execution_evidence": "6 years"
    },
    "gdpr": {
        "audit_logs": "7 years",
        "execution_evidence": "30 days"  # Unless required longer
    },
    "default": {
        "audit_logs": "1 year",
        "execution_evidence": "90 days"
    }
}

Archiving Evidence

Archive old evidence to cold storage:

import gzip
import shutil
from datetime import datetime, timedelta

def archive_old_logs(days_old=90):
    """Archive audit logs older than X days.

    Args:
        days_old: Archive logs older than this
    """
    cutoff = datetime.utcnow() - timedelta(days=days_old)

    # Read audit log
    with open("audit/audit.log") as f:
        lines = f.readlines()

    # Separate old and recent
    old_lines = []
    recent_lines = []

    for line in lines:
        try:
            event = json.loads(line)
            timestamp = datetime.fromisoformat(event["timestamp"].replace("Z", ""))

            if timestamp < cutoff:
                old_lines.append(line)
            else:
                recent_lines.append(line)
        except Exception:
            continue

    # Archive old logs
    if old_lines:
        archive_path = f"audit/archive/audit_{cutoff.date()}.log.gz"
        with gzip.open(archive_path, "wt") as f:
            f.writelines(old_lines)

        print(f"Archived {len(old_lines)} old log entries to {archive_path}")

    # Write recent logs back
    with open("audit/audit.log", "w") as f:
        f.writelines(recent_lines)

Evidence Deletion

Delete evidence after retention period:

# Cron job to delete old archives
0 3 * * 0 find /var/log/audit/archive -mtime +2190 -delete  # 6 years

Deletion Policy

Ensure deletion policies comply with legal requirements. Some regulations require indefinite retention.


Debugging with Evidence

Tracing Request Flow

Use trace IDs to debug request flow:

# Find all events for a trace ID
grep "trace:a1b2c3d4" audit/audit.log

# Output:
# {"timestamp": "...", "trace_id": "trace:a1b2c3d4", "event": "plan", ...}
# {"timestamp": "...", "trace_id": "trace:a1b2c3d4", "event": "execution", ...}

Analyzing Failures

When a request fails, examine evidence:

# Get execution event
events = audit_logger.read_logs(trace_id="trace:failed123")
execution = [e for e in events if e["event"] == "execution"][0]

print(f"OK: {execution['ok']}")
print(f"Errors: {execution['errors']}")
print(f"Steps: {execution['steps_count']}")

Performance Analysis

Use evidence for latency analysis:

# Analyze latency
events = audit_logger.read_logs(event_type="execution", limit=1000)
latencies = [e.get("latency_ms") for e in events if e.get("latency_ms")]

avg_latency = sum(latencies) / len(latencies)
max_latency = max(latencies)

print(f"Average latency: {avg_latency}ms")
print(f"Max latency: {max_latency}ms")

Advanced Topics

Chain-of-Custody

For high-security scenarios, maintain chain-of-custody:

evidence = {
    "trace_id": "trace:a1b2c3d4",
    "chain_of_custody": [
        {
            "node_id": "gateway:abc123",
            "timestamp": "2024-01-15T10:00:00Z",
            "action": "received_request",
            "signature": "..."
        },
        {
            "node_id": "local:def456",
            "timestamp": "2024-01-15T10:00:01Z",
            "action": "executed_tool",
            "signature": "..."
        },
        {
            "node_id": "gateway:abc123",
            "timestamp": "2024-01-15T10:00:02Z",
            "action": "returned_result",
            "signature": "..."
        }
    ]
}

Merkle Trees for Batch Verification

For efficient verification of large evidence batches:

import hashlib

def build_merkle_tree(evidences):
    """Build Merkle tree of execution evidences.

    Args:
        evidences: List of evidence dicts

    Returns:
        Merkle root hash
    """
    if not evidences:
        return None

    # Hash each evidence
    leaves = [
        hashlib.sha256(json.dumps(e, sort_keys=True).encode()).digest()
        for e in evidences
    ]

    # Build tree
    while len(leaves) > 1:
        new_level = []
        for i in range(0, len(leaves), 2):
            if i + 1 < len(leaves):
                combined = leaves[i] + leaves[i + 1]
            else:
                combined = leaves[i]
            new_level.append(hashlib.sha256(combined).digest())
        leaves = new_level

    return leaves[0].hex()

Evidence Aggregation

Aggregate evidence across time periods:

def aggregate_evidence(start_date, end_date):
    """Aggregate evidence for reporting.

    Args:
        start_date: Start date (ISO 8601)
        end_date: End date (ISO 8601)

    Returns:
        Aggregated statistics
    """
    events = audit_logger.read_logs()

    # Filter by date range
    filtered = [
        e for e in events
        if start_date <= e["timestamp"] <= end_date
    ]

    # Aggregate
    total_executions = len([e for e in filtered if e["event"] == "execution"])
    successful = len([e for e in filtered if e.get("ok")])
    failed = total_executions - successful

    return {
        "period": {"start": start_date, "end": end_date},
        "total_executions": total_executions,
        "successful": successful,
        "failed": failed,
        "success_rate": successful / total_executions if total_executions > 0 else 0
    }

Security Considerations

1. Evidence Integrity

Protect evidence from tampering:

  • Cryptographic signatures: Ed25519 signatures on all evidence
  • Append-only logs: Use append-only file systems
  • Write-once storage: Consider WORM (Write Once Read Many) storage

2. Evidence Confidentiality

Evidence may contain sensitive metadata:

# Encrypt evidence at rest
from cryptography.fernet import Fernet

key = Fernet.generate_key()
f = Fernet(key)

# Encrypt evidence before storage
encrypted = f.encrypt(json.dumps(evidence).encode())

3. Access Control

Restrict evidence access:

# File permissions
chmod 600 audit/audit.log
chown audit_service:audit_service audit/audit.log

4. Evidence Transport

When transmitting evidence:

  • Use TLS/HTTPS
  • Verify signatures before accepting
  • Reject evidence from untrusted nodes

Best Practices

1. Always Generate Trace IDs

Every request should have a trace ID:

# Good
trace_id = generate_trace_id()
response = call_tool(..., trace_id=trace_id)

# Bad
response = call_tool(...)  # No trace ID

2. Include Metadata

Add context to evidence:

evidence = {
    "trace_id": trace_id,
    "metadata": {
        "client_id": "web_app",
        "user_id": "user:alice",
        "request_source": "customer_portal",
        "environment": "production"
    }
}

3. Verify Signatures

Always verify signatures when receiving evidence:

if not verify_execution_signature(public_key, evidence, signature):
    raise ValueError("Invalid signature")

4. Archive Regularly

Archive old evidence to prevent log bloat:

# Weekly archival
0 3 * * 0 /usr/local/bin/archive_evidence.sh

5. Monitor Evidence Generation

Alert on missing evidence:

# Check that all executions generate evidence
if execution_count != evidence_count:
    alert("Evidence generation failure")

Troubleshooting

Missing Trace ID

Symptom: Evidence has no trace ID.

Solution:

  1. Check request: Ensure trace ID is generated

    trace_id = generate_trace_id()
    

  2. Check gateway: Verify gateway generates trace IDs if not provided

Signature Verification Failed

Symptom: verified: false in evidence.

Solutions:

  1. Check trust store: Ensure node is trusted
  2. Verify public key: Correct public key in trust store
  3. Check clock sync: Nodes must have synchronized clocks

Large Audit Logs

Symptom: Audit log grows too large.

Solutions:

  1. Archive old logs: Move to compressed archive
  2. Rotate logs: Use logrotate
  3. Reduce verbosity: Only log important events

Evidence Missing

Symptom: No evidence for a trace ID.

Solutions:

  1. Check audit log: Search for trace ID

    grep "trace:abc123" audit/audit.log
    

  2. Check timestamp: Evidence may be in archived logs

  3. Check retention policy: Evidence may have been deleted

Reference

Evidence Response Structure

{
  "ok": boolean,
  "verified": boolean,
  "result": object | null,
  "error": string | null,
  "trace_id": "trace:<uuid>",
  "execution_path": ["node_id", ...],
  "degraded": boolean,
  "signatures": [
    {
      "node_id": "string",
      "signature": "base64",
      "timestamp": "ISO8601",
      "tool_hash": "sha256:<hex>",
      "tool_version": "semver"
    }
  ],
  "result_hash": "sha256:<hex>",
  "latency_ms": integer,
  "metadata": object
}

AuditLogger Python Class

class AuditLogger:
    """JSONL audit logger for workflow execution."""

    def __init__(self, log_path: Optional[Path] = None)

    def log_plan(
        self,
        trace_id: str,
        workflow_name: str,
        internal_reasoning: str,
        public_reasoning: str,
        policy: Dict[str, bool],
        user_text: str
    )

    def log_execution(
        self,
        trace_id: str,
        workflow_name: str,
        ok: bool,
        degraded: bool,
        verified: bool,
        steps_count: int,
        errors: list
    )

    def log_custom(
        self,
        event_type: str,
        data: Dict[str, Any]
    )

    def read_logs(
        self,
        trace_id: Optional[str] = None,
        event_type: Optional[str] = None,
        limit: Optional[int] = None
    ) -> list

Next Steps


Further Reading