Execution Evidence¶
Execution evidence provides cryptographic proof of workflow execution in Adaptive Sentience. Every tool invocation generates tamper-proof evidence including trace IDs, execution paths, signatures, and timestamps - enabling audit compliance, debugging, and incident investigation.
Overview¶
Problem: In distributed systems, it's difficult to prove what happened, when, and by whom. Compliance requirements demand audit trails, but traditional logging is vulnerable to tampering and doesn't provide cryptographic guarantees.
Solution: Adaptive Sentience generates cryptographic execution evidence for every tool invocation:
- Trace IDs: Unique identifier for request tracking
- Execution Path: Which nodes handled the request
- Signatures: Cryptographic proof from executing nodes
- Timestamps: When execution occurred
- Result Hash: Tamper detection for outputs
- Verification Status: Whether all signatures validated
Core Principles¶
1. Immutable Evidence¶
Execution evidence is cryptographically signed and cannot be tampered with:
# Evidence includes signature from executing node
evidence = {
"trace_id": "trace:a1b2c3d4",
"execution_path": ["local:abc123"],
"node_signature": "base64_signature...",
"result_hash": "sha256:e5f6g7h8",
"timestamp": "2024-01-15T10:00:00Z"
}
# Any tampering invalidates the signature
2. Complete Audit Trail¶
Every request generates evidence, even failures:
# Successful execution
{
"ok": true,
"verified": true,
"trace_id": "trace:success123"
}
# Failed execution (still has evidence)
{
"ok": false,
"verified": true,
"trace_id": "trace:failed456",
"error": "Tool execution failed"
}
3. Privacy-Preserving¶
Evidence proves execution occurred without revealing sensitive data:
# Evidence contains result hash, not actual result
evidence = {
"result_hash": "sha256:a1b2c3d4", # Hash of result
# Actual result not in evidence (may contain PII)
}
Evidence Structure¶
Basic Evidence¶
{
"trace_id": "trace:a1b2c3d4e5f6",
"execution_path": ["local:abc123"],
"verified": true,
"degraded": false,
"timestamp": "2024-01-15T10:00:00Z"
}
Full Evidence with Signatures¶
{
"trace_id": "trace:a1b2c3d4e5f6",
"execution_path": ["local:abc123", "local:def456"],
"verified": true,
"degraded": false,
"signatures": [
{
"node_id": "local:abc123",
"signature": "base64_ed25519_signature...",
"timestamp": "2024-01-15T10:00:00Z",
"tool_hash": "sha256:tool123",
"tool_version": "1.0.0"
},
{
"node_id": "local:def456",
"signature": "base64_ed25519_signature...",
"timestamp": "2024-01-15T10:00:05Z",
"tool_hash": "sha256:tool456",
"tool_version": "1.1.0"
}
],
"result_hash": "sha256:result789",
"latency_ms": 145,
"metadata": {
"client_id": "client:web_app",
"workflow_name": "pii_redaction_workflow"
}
}
Evidence Fields¶
| Field | Type | Description |
|---|---|---|
trace_id |
string | Unique identifier (format: trace:<uuid>) |
execution_path |
array | Node IDs that handled request |
verified |
boolean | Whether all signatures validated |
degraded |
boolean | Whether fallback nodes were used |
signatures |
array | Cryptographic signatures from nodes |
result_hash |
string | SHA256 hash of result |
timestamp |
string | ISO 8601 timestamp |
latency_ms |
integer | Total execution time |
metadata |
object | Application-specific metadata |
Trace IDs¶
What is a Trace ID?¶
A trace ID uniquely identifies a request through its entire lifecycle:
All logs, evidence, and audit entries reference the same trace ID.
Trace ID Format¶
Generating Trace IDs¶
import uuid
def generate_trace_id():
"""Generate unique trace ID."""
return f"trace:{uuid.uuid4().hex[:12]}"
# Example
trace_id = generate_trace_id()
print(trace_id) # trace:a1b2c3d4e5f6
Using Trace IDs¶
In tool call request:
{
"tool_name": "pii_redact",
"tool_args": {...},
"trace_id": "trace:a1b2c3d4e5f6" // Optional - gateway generates if not provided
}
In response:
In audit logs:
{"timestamp": "2024-01-15T10:00:00Z", "trace_id": "trace:a1b2c3d4", "event": "plan"}
{"timestamp": "2024-01-15T10:00:01Z", "trace_id": "trace:a1b2c3d4", "event": "execution"}
Execution Path¶
What is an Execution Path?¶
The execution path tracks which nodes handled a request:
This shows:
1. Request received by gateway:abc123
2. Forwarded to local:def456 for execution
Multi-Step Workflows¶
For workflows with multiple steps:
{
"execution_path": [
"gateway:abc123", // Orchestrator
"local:def456", // Step 1: PII redaction
"local:ghi789", // Step 2: Summarization
"local:def456" // Step 3: Validation
]
}
Degraded Execution¶
When primary nodes are unavailable, fallback nodes are used:
{
"execution_path": ["local:fallback001"],
"degraded": true, // Indicates fallback was used
"preferred_node": "local:primary001" // Original target
}
Cryptographic Signatures¶
Node Signatures¶
Each node signs its execution evidence with its Ed25519 private key:
import json
import hashlib
from cryptography.hazmat.primitives.asymmetric import ed25519
def generate_execution_signature(private_key, evidence):
"""Sign execution evidence.
Args:
private_key: Ed25519PrivateKey
evidence: Evidence dict (without signature)
Returns:
Base64-encoded signature
"""
# Canonical JSON
canonical = json.dumps(evidence, sort_keys=True)
# Sign
signature = private_key.sign(canonical.encode())
# Encode
import base64
return base64.b64encode(signature).decode()
Signature Contents¶
Each signature covers:
{
"trace_id": "trace:a1b2c3d4",
"node_id": "local:abc123",
"tool_name": "pii_redact",
"tool_version": "1.0.0",
"tool_hash": "sha256:tool123",
"result_hash": "sha256:result456",
"timestamp": "2024-01-15T10:00:00Z",
"latency_ms": 42
}
The signature does not include: - Actual input/output (may contain PII) - Capability tokens (security) - Intermediate steps
Signature Verification¶
Verifying signatures:
def verify_execution_signature(public_key, evidence, signature):
"""Verify execution signature.
Args:
public_key: Ed25519PublicKey
evidence: Evidence dict (without signature)
signature: Base64-encoded signature
Returns:
True if valid, False otherwise
"""
try:
# Canonical JSON
canonical = json.dumps(evidence, sort_keys=True)
# Decode signature
import base64
signature_bytes = base64.b64decode(signature)
# Verify
public_key.verify(signature_bytes, canonical.encode())
return True
except Exception as e:
print(f"Signature verification failed: {e}")
return False
Result Hashing¶
Why Hash Results?¶
Result hashes provide tamper detection without revealing sensitive data:
# PII in result
result = {
"redacted_text": "Contact [REDACTED]",
"redactions": [{"type": "email", "count": 1}]
}
# Evidence contains hash only
result_hash = hashlib.sha256(
json.dumps(result, sort_keys=True).encode()
).hexdigest()
evidence = {
"result_hash": f"sha256:{result_hash}",
# Actual result NOT in evidence
}
Computing Result Hashes¶
import hashlib
import json
def compute_result_hash(result):
"""Compute SHA256 hash of result.
Args:
result: Result dict
Returns:
Hash string (format: sha256:<hex>)
"""
canonical = json.dumps(result, sort_keys=True)
hash_bytes = hashlib.sha256(canonical.encode()).digest()
return f"sha256:{hash_bytes.hex()}"
Verifying Result Integrity¶
To verify a result matches the evidence:
def verify_result_integrity(result, evidence):
"""Verify result matches evidence hash.
Args:
result: Actual result dict
evidence: Evidence with result_hash
Returns:
True if hash matches
"""
expected_hash = evidence["result_hash"]
actual_hash = compute_result_hash(result)
return expected_hash == actual_hash
Audit Logging¶
Audit Log Structure¶
Audit logs are stored as JSONL (JSON Lines):
{"timestamp": "2024-01-15T10:00:00Z", "trace_id": "trace:abc", "event": "plan", ...}
{"timestamp": "2024-01-15T10:00:01Z", "trace_id": "trace:abc", "event": "execution", ...}
{"timestamp": "2024-01-15T10:00:15Z", "trace_id": "trace:def", "event": "plan", ...}
Each line is a complete JSON object.
Audit Log Location¶
# Default location
./audit/audit.log
# Custom location
export AUDIT_LOG_PATH=/var/log/adaptive_sentience/audit.log
Audit Events¶
Plan Event¶
Logged when a workflow is planned:
{
"timestamp": "2024-01-15T10:00:00Z",
"trace_id": "trace:a1b2c3d4",
"event": "plan",
"workflow_name": "pii_redaction_workflow",
"user_text": "Redact PII from customer report",
"public_reasoning": "Selected PII redaction workflow",
"internal_reasoning": "Chain-of-thought reasoning...",
"policy": {
"privacy_strict": true,
"schema_strict": true
}
}
Execution Event¶
Logged when a workflow executes:
{
"timestamp": "2024-01-15T10:00:01Z",
"trace_id": "trace:a1b2c3d4",
"event": "execution",
"workflow_name": "pii_redaction_workflow",
"ok": true,
"degraded": false,
"verified": true,
"steps_count": 3,
"errors": []
}
Custom Events¶
Log application-specific events:
from audit.audit_log import AuditLogger
audit_logger = AuditLogger()
audit_logger.log_custom("token_issued", {
"token_id": "token:abc123",
"capabilities": ["tool:pii_redact"],
"issued_to": "client:web_app"
})
Querying Audit Logs¶
By trace ID:
audit_logger = AuditLogger()
# Get all events for a trace ID
events = audit_logger.read_logs(trace_id="trace:a1b2c3d4")
for event in events:
print(f"{event['timestamp']} - {event['event']}")
By event type:
# Get all execution events
executions = audit_logger.read_logs(event_type="execution")
# Get last 10 execution events
recent = audit_logger.read_logs(event_type="execution", limit=10)
Compliance Use Cases¶
1. HIPAA Compliance¶
Requirement: Audit trail of all PHI access.
Solution:
# Log PHI access
audit_logger.log_custom("phi_access", {
"trace_id": trace_id,
"patient_id": patient_id,
"accessed_by": user_id,
"tool_name": tool_name,
"result_hash": result_hash # Hash, not actual PHI
})
Evidence: - Who accessed PHI (user_id) - When (timestamp) - What operation (tool_name) - Proof of execution (signatures)
2. SOC 2 Compliance¶
Requirement: Demonstrate security controls.
Solution:
# Log capability token validation
audit_logger.log_custom("token_validation", {
"trace_id": trace_id,
"token_id": token_id,
"capabilities_required": ["tool:pii_redact"],
"capabilities_provided": token["capabilities"],
"authorized": True
})
Evidence: - Capability-based access control - Least privilege enforcement - Audit trail of authorizations
3. GDPR Compliance¶
Requirement: Demonstrate data processing activities.
Solution:
# Log PII processing
audit_logger.log_custom("pii_processing", {
"trace_id": trace_id,
"tool_name": "pii_redact",
"purpose": "Customer support ticket redaction",
"legal_basis": "Legitimate interest",
"data_subject_consent": True
})
Evidence: - What PII was processed (via tool_name) - Why (purpose, legal_basis) - When (timestamp) - Consent status
4. Financial Services (PCI DSS)¶
Requirement: Audit trail of payment data access.
Solution:
# Log payment data access
audit_logger.log_custom("payment_data_access", {
"trace_id": trace_id,
"transaction_id": txn_id,
"tool_name": "validate_payment",
"result_hash": result_hash,
"pci_scope": True
})
Evidence: - Payment data access logged - Cryptographic proof (signatures) - Tamper-proof (Ed25519 signatures)
Evidence Retention¶
Retention Policies¶
Configure evidence retention based on compliance requirements:
# retention_policy.py
RETENTION_POLICIES = {
"hipaa": {
"audit_logs": "6 years",
"execution_evidence": "6 years"
},
"gdpr": {
"audit_logs": "7 years",
"execution_evidence": "30 days" # Unless required longer
},
"default": {
"audit_logs": "1 year",
"execution_evidence": "90 days"
}
}
Archiving Evidence¶
Archive old evidence to cold storage:
import gzip
import shutil
from datetime import datetime, timedelta
def archive_old_logs(days_old=90):
"""Archive audit logs older than X days.
Args:
days_old: Archive logs older than this
"""
cutoff = datetime.utcnow() - timedelta(days=days_old)
# Read audit log
with open("audit/audit.log") as f:
lines = f.readlines()
# Separate old and recent
old_lines = []
recent_lines = []
for line in lines:
try:
event = json.loads(line)
timestamp = datetime.fromisoformat(event["timestamp"].replace("Z", ""))
if timestamp < cutoff:
old_lines.append(line)
else:
recent_lines.append(line)
except Exception:
continue
# Archive old logs
if old_lines:
archive_path = f"audit/archive/audit_{cutoff.date()}.log.gz"
with gzip.open(archive_path, "wt") as f:
f.writelines(old_lines)
print(f"Archived {len(old_lines)} old log entries to {archive_path}")
# Write recent logs back
with open("audit/audit.log", "w") as f:
f.writelines(recent_lines)
Evidence Deletion¶
Delete evidence after retention period:
# Cron job to delete old archives
0 3 * * 0 find /var/log/audit/archive -mtime +2190 -delete # 6 years
Deletion Policy
Ensure deletion policies comply with legal requirements. Some regulations require indefinite retention.
Debugging with Evidence¶
Tracing Request Flow¶
Use trace IDs to debug request flow:
# Find all events for a trace ID
grep "trace:a1b2c3d4" audit/audit.log
# Output:
# {"timestamp": "...", "trace_id": "trace:a1b2c3d4", "event": "plan", ...}
# {"timestamp": "...", "trace_id": "trace:a1b2c3d4", "event": "execution", ...}
Analyzing Failures¶
When a request fails, examine evidence:
# Get execution event
events = audit_logger.read_logs(trace_id="trace:failed123")
execution = [e for e in events if e["event"] == "execution"][0]
print(f"OK: {execution['ok']}")
print(f"Errors: {execution['errors']}")
print(f"Steps: {execution['steps_count']}")
Performance Analysis¶
Use evidence for latency analysis:
# Analyze latency
events = audit_logger.read_logs(event_type="execution", limit=1000)
latencies = [e.get("latency_ms") for e in events if e.get("latency_ms")]
avg_latency = sum(latencies) / len(latencies)
max_latency = max(latencies)
print(f"Average latency: {avg_latency}ms")
print(f"Max latency: {max_latency}ms")
Advanced Topics¶
Chain-of-Custody¶
For high-security scenarios, maintain chain-of-custody:
evidence = {
"trace_id": "trace:a1b2c3d4",
"chain_of_custody": [
{
"node_id": "gateway:abc123",
"timestamp": "2024-01-15T10:00:00Z",
"action": "received_request",
"signature": "..."
},
{
"node_id": "local:def456",
"timestamp": "2024-01-15T10:00:01Z",
"action": "executed_tool",
"signature": "..."
},
{
"node_id": "gateway:abc123",
"timestamp": "2024-01-15T10:00:02Z",
"action": "returned_result",
"signature": "..."
}
]
}
Merkle Trees for Batch Verification¶
For efficient verification of large evidence batches:
import hashlib
def build_merkle_tree(evidences):
"""Build Merkle tree of execution evidences.
Args:
evidences: List of evidence dicts
Returns:
Merkle root hash
"""
if not evidences:
return None
# Hash each evidence
leaves = [
hashlib.sha256(json.dumps(e, sort_keys=True).encode()).digest()
for e in evidences
]
# Build tree
while len(leaves) > 1:
new_level = []
for i in range(0, len(leaves), 2):
if i + 1 < len(leaves):
combined = leaves[i] + leaves[i + 1]
else:
combined = leaves[i]
new_level.append(hashlib.sha256(combined).digest())
leaves = new_level
return leaves[0].hex()
Evidence Aggregation¶
Aggregate evidence across time periods:
def aggregate_evidence(start_date, end_date):
"""Aggregate evidence for reporting.
Args:
start_date: Start date (ISO 8601)
end_date: End date (ISO 8601)
Returns:
Aggregated statistics
"""
events = audit_logger.read_logs()
# Filter by date range
filtered = [
e for e in events
if start_date <= e["timestamp"] <= end_date
]
# Aggregate
total_executions = len([e for e in filtered if e["event"] == "execution"])
successful = len([e for e in filtered if e.get("ok")])
failed = total_executions - successful
return {
"period": {"start": start_date, "end": end_date},
"total_executions": total_executions,
"successful": successful,
"failed": failed,
"success_rate": successful / total_executions if total_executions > 0 else 0
}
Security Considerations¶
1. Evidence Integrity¶
Protect evidence from tampering:
- Cryptographic signatures: Ed25519 signatures on all evidence
- Append-only logs: Use append-only file systems
- Write-once storage: Consider WORM (Write Once Read Many) storage
2. Evidence Confidentiality¶
Evidence may contain sensitive metadata:
# Encrypt evidence at rest
from cryptography.fernet import Fernet
key = Fernet.generate_key()
f = Fernet(key)
# Encrypt evidence before storage
encrypted = f.encrypt(json.dumps(evidence).encode())
3. Access Control¶
Restrict evidence access:
4. Evidence Transport¶
When transmitting evidence:
- Use TLS/HTTPS
- Verify signatures before accepting
- Reject evidence from untrusted nodes
Best Practices¶
1. Always Generate Trace IDs¶
Every request should have a trace ID:
# Good
trace_id = generate_trace_id()
response = call_tool(..., trace_id=trace_id)
# Bad
response = call_tool(...) # No trace ID
2. Include Metadata¶
Add context to evidence:
evidence = {
"trace_id": trace_id,
"metadata": {
"client_id": "web_app",
"user_id": "user:alice",
"request_source": "customer_portal",
"environment": "production"
}
}
3. Verify Signatures¶
Always verify signatures when receiving evidence:
if not verify_execution_signature(public_key, evidence, signature):
raise ValueError("Invalid signature")
4. Archive Regularly¶
Archive old evidence to prevent log bloat:
5. Monitor Evidence Generation¶
Alert on missing evidence:
# Check that all executions generate evidence
if execution_count != evidence_count:
alert("Evidence generation failure")
Troubleshooting¶
Missing Trace ID¶
Symptom: Evidence has no trace ID.
Solution:
-
Check request: Ensure trace ID is generated
-
Check gateway: Verify gateway generates trace IDs if not provided
Signature Verification Failed¶
Symptom: verified: false in evidence.
Solutions:
- Check trust store: Ensure node is trusted
- Verify public key: Correct public key in trust store
- Check clock sync: Nodes must have synchronized clocks
Large Audit Logs¶
Symptom: Audit log grows too large.
Solutions:
- Archive old logs: Move to compressed archive
- Rotate logs: Use logrotate
- Reduce verbosity: Only log important events
Evidence Missing¶
Symptom: No evidence for a trace ID.
Solutions:
-
Check audit log: Search for trace ID
-
Check timestamp: Evidence may be in archived logs
- Check retention policy: Evidence may have been deleted
Reference¶
Evidence Response Structure¶
{
"ok": boolean,
"verified": boolean,
"result": object | null,
"error": string | null,
"trace_id": "trace:<uuid>",
"execution_path": ["node_id", ...],
"degraded": boolean,
"signatures": [
{
"node_id": "string",
"signature": "base64",
"timestamp": "ISO8601",
"tool_hash": "sha256:<hex>",
"tool_version": "semver"
}
],
"result_hash": "sha256:<hex>",
"latency_ms": integer,
"metadata": object
}
AuditLogger Python Class¶
class AuditLogger:
"""JSONL audit logger for workflow execution."""
def __init__(self, log_path: Optional[Path] = None)
def log_plan(
self,
trace_id: str,
workflow_name: str,
internal_reasoning: str,
public_reasoning: str,
policy: Dict[str, bool],
user_text: str
)
def log_execution(
self,
trace_id: str,
workflow_name: str,
ok: bool,
degraded: bool,
verified: bool,
steps_count: int,
errors: list
)
def log_custom(
self,
event_type: str,
data: Dict[str, Any]
)
def read_logs(
self,
trace_id: Optional[str] = None,
event_type: Optional[str] = None,
limit: Optional[int] = None
) -> list
Next Steps¶
- Trust Pairing - Cryptographic node pairing
- Capability Tokens - Authorization with tokens
- Offline Operation - Evidence in offline scenarios
- Tool Contracts - Define tools for execution