Offline Operation¶
Offline operation is a core capability of Adaptive Sentience, enabling reliable workflow execution in environments with unreliable connectivity. Using store-and-forward messaging and per-node mailboxes, requests are queued when nodes are offline and automatically delivered when connectivity returns.
Overview¶
Problem: Field operations often occur in environments with intermittent or no network connectivity. Traditional cloud-based systems fail immediately when offline, resulting in lost data, failed workflows, and frustrated users.
Solution: Adaptive Sentience uses a store-and-forward architecture where:
- Requests are queued locally when target nodes are offline
- Messages are automatically delivered when nodes come back online
- No data is lost during network outages
- Workflows continue executing as connectivity permits
Core Principles¶
1. Offline-First Architecture¶
The system assumes connectivity is unreliable and designs accordingly:
Client → Gateway → [Queue if offline] → Edge Node
↓
Store-and-Forward
↓
[Retry with backoff]
↓
Deliver when online
2. Per-Node Mailboxes¶
Each node has its own persistent mailbox (SQLite database):
Messages persist across restarts and network outages.
3. Automatic Retry with Exponential Backoff¶
Failed deliveries are automatically retried:
Attempt 1: Immediate
Attempt 2: 5 seconds
Attempt 3: 10 seconds
Attempt 4: 20 seconds
Attempt 5: 40 seconds
Attempt 6+: 5 minutes (capped)
4. No Data Loss¶
Messages persist until: - Successfully delivered, OR - Expired (based on TTL)
Network outages never result in data loss.
Store-and-Forward Mechanism¶
How It Works¶
1. Message Queuing:
# Client sends request to gateway
response = requests.post(
"http://gateway:8787/v1/tool_call",
json={
"target": {"node_id": "local:abc123"},
"tool_name": "pii_redact",
"tool_args": {"text": "..."}
}
)
# If node offline:
# - Gateway queues message in mailbox
# - Returns queued status to client
2. Background Delivery:
# Gateway runs background delivery loop
while True:
# Get pending deliveries
pending = mailbox.get_pending_deliveries(limit=10)
for message in pending:
try:
# Attempt delivery
deliver_message(message)
mailbox.mark_delivered(message["message_id"])
except ConnectionError:
# Failed - update retry backoff
mailbox.mark_failed(message["message_id"])
time.sleep(5) # Check every 5 seconds
3. Receipt Acknowledgment:
# Recipient node receives message
inbox_message = mailbox.get_inbox_unacked()
# Process message
result = process_task(inbox_message["payload"])
# Acknowledge receipt
mailbox.ack_inbox(inbox_message["message_id"])
Message Lifecycle¶
Created → Queued in Outbox → Delivery Attempt → Failed?
├─ Yes → Retry (backoff)
└─ No → Delivered
↓
Stored in Inbox
↓
Processed
↓
Acknowledged
↓
Removed from Inbox
Mailbox System¶
Mailbox Structure¶
Each node has a SQLite database:
-- Outbox: Messages queued for delivery
CREATE TABLE outbox (
message_id TEXT PRIMARY KEY,
recipient_node_id TEXT NOT NULL,
recipient_http_url TEXT NOT NULL,
envelope_json TEXT NOT NULL,
attempts INTEGER DEFAULT 0,
next_attempt_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
created_at TEXT NOT NULL
);
-- Inbox: Received messages
CREATE TABLE inbox (
message_id TEXT PRIMARY KEY, -- Enforces replay protection
sender_node_id TEXT NOT NULL,
received_at TEXT NOT NULL,
payload_type TEXT NOT NULL,
decrypted_payload_json TEXT NOT NULL,
acknowledged INTEGER DEFAULT 0
);
Outbox Operations¶
Queue Message:
from gateway.mesh_transport.mailbox import Mailbox
mailbox = Mailbox(db_path="data/mailbox.db")
mailbox.queue_outbox(
envelope={
"message_id": "msg:a1b2c3d4",
"recipient_node_id": "local:abc123",
"sender_node_id": "gateway:def456",
"payload_type": "TASK_REQUEST",
"encrypted_payload": "...",
"signature": "..."
},
recipient_http_url="http://192.168.1.100:8000",
ttl_seconds=86400 # 24 hours
)
Get Pending Deliveries:
# Get next 10 messages ready for delivery
pending = mailbox.get_pending_deliveries(limit=10)
for msg in pending:
print(f"Message {msg['message_id']} to {msg['recipient_node_id']}")
print(f"Attempts: {msg['attempts']}")
print(f"Envelope: {msg['envelope']}")
Mark Delivered:
Mark Failed:
# Update retry backoff after failed delivery
mailbox.mark_failed("msg:a1b2c3d4")
# Next attempt scheduled based on exponential backoff
Inbox Operations¶
Store Received Message:
# Store incoming message (replay protection via PRIMARY KEY)
try:
mailbox.store_inbox(
message_id="msg:a1b2c3d4",
sender_node_id="gateway:def456",
payload_type="TASK_REQUEST",
payload={
"tool_name": "pii_redact",
"tool_args": {"text": "..."},
"token": {...}
}
)
except sqlite3.IntegrityError:
# Duplicate message_id - replay attack detected
print("Replay attack detected")
Get Unacknowledged Messages:
# Get all unprocessed messages
unacked = mailbox.get_inbox_unacked()
for msg in unacked:
print(f"Message {msg['message_id']} from {msg['sender_node_id']}")
print(f"Payload: {msg['payload']}")
Acknowledge Message:
Request Queuing¶
Client-Side Queuing¶
When the gateway is unreachable, clients should queue requests locally:
import sqlite3
import json
from datetime import datetime
class ClientQueue:
"""Local request queue for offline clients."""
def __init__(self, db_path="client_queue.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS queued_requests (
request_id TEXT PRIMARY KEY,
gateway_url TEXT NOT NULL,
request_json TEXT NOT NULL,
created_at TEXT NOT NULL,
attempts INTEGER DEFAULT 0
)
"""
)
conn.commit()
conn.close()
def queue_request(self, gateway_url, request_data):
"""Queue a request for delivery."""
request_id = f"req:{uuid.uuid4().hex[:12]}"
created_at = datetime.utcnow().isoformat() + "Z"
conn = sqlite3.connect(self.db_path)
conn.execute(
"""
INSERT INTO queued_requests (request_id, gateway_url, request_json, created_at)
VALUES (?, ?, ?, ?)
""",
(request_id, gateway_url, json.dumps(request_data), created_at)
)
conn.commit()
conn.close()
print(f"Queued request {request_id}")
def get_pending_requests(self):
"""Get all pending requests."""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
"SELECT request_id, gateway_url, request_json, attempts FROM queued_requests"
)
requests = [
{
"request_id": row[0],
"gateway_url": row[1],
"request_data": json.loads(row[2]),
"attempts": row[3]
}
for row in cursor.fetchall()
]
conn.close()
return requests
def mark_delivered(self, request_id):
"""Remove delivered request."""
conn = sqlite3.connect(self.db_path)
conn.execute("DELETE FROM queued_requests WHERE request_id = ?", (request_id,))
conn.commit()
conn.close()
def mark_failed(self, request_id):
"""Increment attempt counter."""
conn = sqlite3.connect(self.db_path)
conn.execute(
"UPDATE queued_requests SET attempts = attempts + 1 WHERE request_id = ?",
(request_id,)
)
conn.commit()
conn.close()
Usage:
queue = ClientQueue()
try:
# Try direct delivery
response = requests.post(gateway_url, json=request_data)
except requests.ConnectionError:
# Gateway offline - queue for later
queue.queue_request(gateway_url, request_data)
print("Gateway offline - request queued")
# Background delivery loop
while True:
pending = queue.get_pending_requests()
for req in pending:
try:
response = requests.post(req["gateway_url"], json=req["request_data"])
queue.mark_delivered(req["request_id"])
print(f"Delivered {req['request_id']}")
except requests.ConnectionError:
queue.mark_failed(req["request_id"])
time.sleep(10)
Gateway-Side Queuing¶
The gateway queues messages for offline edge nodes:
# In gateway router
def route_to_node(node_id, request):
"""Route request to node, queue if offline."""
node = node_registry.get(node_id)
if not node:
raise ValueError(f"Node {node_id} not found")
try:
# Try direct delivery
response = requests.post(
node["http_url"],
json=request,
timeout=5.0
)
return response.json()
except requests.RequestException:
# Node offline - queue in mailbox
envelope = create_envelope(request, node_id)
mailbox.queue_outbox(
envelope=envelope,
recipient_http_url=node["http_url"]
)
return {
"ok": False,
"queued": True,
"message": f"Node {node_id} offline - request queued",
"message_id": envelope["message_id"]
}
Delivery and Retry Logic¶
Exponential Backoff¶
Failed deliveries use exponential backoff to avoid overwhelming offline nodes:
def calculate_backoff(attempts):
"""Calculate retry backoff.
Args:
attempts: Number of previous attempts
Returns:
Seconds to wait before next attempt
"""
# 2^attempts * 5 seconds, capped at 300 seconds (5 minutes)
return min(2 ** attempts * 5, 300)
# Examples:
# Attempt 0: 5s
# Attempt 1: 10s
# Attempt 2: 20s
# Attempt 3: 40s
# Attempt 4: 80s
# Attempt 5: 160s
# Attempt 6+: 300s (capped)
Delivery Loop¶
from datetime import datetime, timedelta, timezone
def delivery_loop(mailbox):
"""Background delivery loop."""
while True:
try:
# Get pending deliveries
pending = mailbox.get_pending_deliveries(limit=10)
for msg in pending:
try:
# Attempt delivery
response = requests.post(
msg["recipient_http_url"],
json=msg["envelope"],
timeout=30.0
)
if response.status_code == 200:
# Success - remove from outbox
mailbox.mark_delivered(msg["message_id"])
print(f"Delivered {msg['message_id']}")
else:
# Failed - retry later
mailbox.mark_failed(msg["message_id"])
print(f"Delivery failed (HTTP {response.status_code})")
except requests.RequestException as e:
# Connection error - retry later
mailbox.mark_failed(msg["message_id"])
print(f"Delivery failed: {e}")
# Clean up expired messages
mailbox.cleanup_expired()
except Exception as e:
print(f"Delivery loop error: {e}")
# Check for new messages every 5 seconds
time.sleep(5)
Message Expiration¶
Messages have a TTL (time-to-live) and are removed when expired:
# Queue with 24 hour TTL
mailbox.queue_outbox(
envelope=envelope,
recipient_http_url=url,
ttl_seconds=86400 # 24 hours
)
# Cleanup expired messages
deleted = mailbox.cleanup_expired()
print(f"Removed {deleted} expired messages")
Offline Scenarios¶
Scenario 1: Edge Node Temporarily Offline¶
Setup: - Gateway online - Edge node loses connectivity for 30 minutes
Behavior:
- Client sends request to gateway
- Gateway attempts to forward to edge node
- Connection fails - gateway queues message
- Gateway returns
{"queued": true}to client - Edge node comes back online
- Gateway delivery loop detects node is online
- Message delivered automatically
- Result returned to client (if client still connected)
Timeline:
T+0:00 Client → Gateway (success)
T+0:01 Gateway → Edge Node (FAIL - offline)
T+0:01 Gateway queues message
T+0:06 Retry 1 (5s backoff) - FAIL
T+0:16 Retry 2 (10s backoff) - FAIL
T+0:36 Retry 3 (20s backoff) - FAIL
T+1:16 Retry 4 (40s backoff) - FAIL
...
T+30:00 Edge node comes online
T+30:05 Retry N - SUCCESS
T+30:05 Message delivered
Scenario 2: Gateway Temporarily Offline¶
Setup: - Edge nodes online - Gateway loses connectivity for 1 hour
Behavior:
- Client sends request to gateway
- Connection fails - client queues locally
- Client retries with exponential backoff
- Gateway comes back online
- Client delivers queued request
- Workflow executes normally
Timeline:
T+0:00 Client → Gateway (FAIL - offline)
T+0:00 Client queues request
T+0:05 Retry 1 - FAIL
T+0:15 Retry 2 - FAIL
T+0:35 Retry 3 - FAIL
...
T+60:00 Gateway comes online
T+60:05 Retry N - SUCCESS
T+60:06 Workflow executes
Scenario 3: Mesh Partition (Split-Brain)¶
Setup: - Gateway can reach Node A - Gateway cannot reach Node B - Nodes A and B can reach each other
Behavior:
- Gateway routes requests for B-only tools to B
- Connection fails - queued in gateway mailbox
- Requests for A-only tools succeed
- Network partition heals
- Queued messages for B delivered
Mitigation:
- Use mesh-aware routing to find alternate paths
- Enable peer-to-peer message forwarding
- Configure fallback nodes for critical tools
Scenario 4: Fully Offline Operation¶
Setup: - Edge node operates completely offline - No gateway connectivity
Behavior:
- Client connects directly to edge node HTTP endpoint
- Tools execute locally without gateway
- Results returned immediately
- When connectivity returns, sync audit logs to gateway
Example:
# Direct edge node connection (offline)
response = requests.post(
"http://192.168.1.100:8000/v1/tool_call",
json={
"tool_name": "pii_redact",
"tool_args": {"text": "..."},
"token": {"capabilities": ["tool:pii_redact"]}
}
)
# Works offline - no gateway required
Failure Scenarios and Recovery¶
Network Partition¶
Failure: Network partition separates gateway from edge nodes.
Detection: - Delivery attempts fail - Nodes show as offline in mesh scan
Recovery: - Messages queue in gateway mailbox - Automatic delivery when partition heals - No manual intervention required
Prevention: - Use multiple gateways in different network segments - Enable peer-to-peer message forwarding
Node Crash¶
Failure: Edge node crashes while processing message.
Detection: - Message delivery succeeds but no response - Timeout in gateway
Recovery: - Idempotency keys prevent duplicate execution - Retry logic with idempotency check - Message re-queued if no acknowledgment
Prevention: - Use job IDs for idempotency - Persist job state before execution
Database Corruption¶
Failure: Mailbox SQLite database corrupted.
Detection: - SQLite errors when accessing mailbox - Delivery loop crashes
Recovery:
-
Stop node:
-
Backup corrupted DB:
-
Recover from WAL:
-
If recovery fails, restore from backup:
-
Restart node:
Prevention: - Use SQLite WAL mode (enabled by default) - Regular backups - Database integrity checks
Clock Skew¶
Failure: Node clocks are out of sync.
Detection: - Timestamps in messages don't match expectations - Signature verification may fail
Recovery: - Sync clocks using NTP:
Prevention: - Run NTP daemon on all nodes - Monitor clock drift
Offline Testing¶
Simulating Network Outages¶
Disconnect node from network:
# Linux - block outbound traffic
sudo iptables -A OUTPUT -p tcp --dport 8787 -j DROP
# macOS - disable interface
sudo ifconfig en0 down
Restore connectivity:
Testing Queuing Behavior¶
import time
import requests
# 1. Send request while node online
response = requests.post("http://localhost:8787/v1/tool_call", json={...})
assert response.json()["ok"]
# 2. Disconnect node
subprocess.run(["sudo", "iptables", "-A", "OUTPUT", "-p", "tcp", "--dport", "8000", "-j", "DROP"])
# 3. Send request while node offline
response = requests.post("http://localhost:8787/v1/tool_call", json={...})
assert response.json()["queued"] # Queued for delivery
# 4. Reconnect node
subprocess.run(["sudo", "iptables", "-D", "OUTPUT", "-p", "tcp", "--dport", "8000", "-j", "DROP"])
# 5. Wait for automatic delivery
time.sleep(10)
# 6. Verify message was delivered
# (Check audit logs or query mailbox)
Testing Retry Logic¶
# Monitor retry attempts
def monitor_retries(mailbox, message_id):
"""Watch retry behavior for a message."""
while True:
conn = sqlite3.connect(mailbox.db_path)
cursor = conn.execute(
"SELECT attempts, next_attempt_at FROM outbox WHERE message_id = ?",
(message_id,)
)
row = cursor.fetchone()
conn.close()
if not row:
print(f"{message_id} delivered!")
break
attempts, next_attempt = row
print(f"Attempts: {attempts}, Next retry: {next_attempt}")
time.sleep(5)
# Monitor a queued message
monitor_retries(mailbox, "msg:abc123")
Performance Considerations¶
Mailbox Database Size¶
Monitor mailbox database growth:
import os
def get_mailbox_size(db_path):
"""Get mailbox database size in MB."""
size_bytes = os.path.getsize(db_path)
return size_bytes / (1024 * 1024)
size_mb = get_mailbox_size("data/mailbox.db")
if size_mb > 100:
print(f"Warning: Mailbox database is {size_mb:.1f} MB")
Message Throughput¶
Tune delivery loop for throughput:
# Process more messages per iteration
pending = mailbox.get_pending_deliveries(limit=100) # Increased from 10
# Parallel delivery
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(deliver_message, msg) for msg in pending]
for future in futures:
future.result()
Database Optimization¶
Optimize SQLite performance:
conn = sqlite3.connect("data/mailbox.db")
# Enable WAL mode for better concurrency
conn.execute("PRAGMA journal_mode=WAL")
# Increase cache size
conn.execute("PRAGMA cache_size=-64000") # 64MB cache
# Optimize indexes
conn.execute("ANALYZE")
conn.close()
Best Practices¶
1. Set Reasonable TTLs¶
Don't use infinite TTLs:
# Good - 24 hour TTL
mailbox.queue_outbox(envelope, url, ttl_seconds=86400)
# Bad - 1 year TTL
mailbox.queue_outbox(envelope, url, ttl_seconds=31536000)
2. Monitor Queue Depth¶
Alert on growing queues:
def check_queue_depth(mailbox):
"""Alert if queue depth exceeds threshold."""
conn = sqlite3.connect(mailbox.db_path)
cursor = conn.execute("SELECT COUNT(*) FROM outbox")
count = cursor.fetchone()[0]
conn.close()
if count > 1000:
alert(f"Queue depth critical: {count} messages")
3. Regular Cleanup¶
Clean up old messages:
4. Backup Mailboxes¶
Backup mailbox databases:
# Backup while node is running (using WAL mode)
sqlite3 data/mailbox.db ".backup /backup/mailbox_$(date +\%Y\%m\%d).db"
5. Use Idempotency¶
Always use job IDs for idempotency:
response = requests.post("http://gateway/v1/tool_call", json={
"job_id": "job:unique123", # Prevents duplicate execution
"tool_name": "pii_redact",
"tool_args": {...}
})
Advanced Topics¶
Peer-to-Peer Message Forwarding¶
Allow edge nodes to forward messages to each other:
# Node A receives message for Node B
# Node A can reach B, but gateway cannot
# Node A forwards to B on gateway's behalf
def forward_message(envelope, recipient_url):
"""Forward message to peer node."""
try:
response = requests.post(recipient_url, json=envelope)
return response.json()
except requests.RequestException:
# Peer also offline - don't queue (gateway will retry)
return None
Message Priority¶
Prioritize urgent messages:
-- Add priority field to outbox
ALTER TABLE outbox ADD COLUMN priority INTEGER DEFAULT 0;
-- Get pending with priority
SELECT * FROM outbox
WHERE next_attempt_at <= ?
ORDER BY priority DESC, next_attempt_at ASC
LIMIT ?
Compression¶
Compress large payloads:
import gzip
import base64
def compress_payload(payload):
"""Compress payload using gzip."""
json_bytes = json.dumps(payload).encode()
compressed = gzip.compress(json_bytes)
return base64.b64encode(compressed).decode()
def decompress_payload(compressed):
"""Decompress payload."""
compressed_bytes = base64.b64decode(compressed)
json_bytes = gzip.decompress(compressed_bytes)
return json.loads(json_bytes)
Troubleshooting¶
Messages Not Delivering¶
Symptom: Messages stuck in outbox.
Solutions:
-
Check network connectivity:
-
Check delivery loop is running:
-
Check outbox:
-
Manual delivery:
Database Locked¶
Symptom: database is locked error.
Solutions:
-
Enable WAL mode:
-
Increase timeout:
-
Reduce concurrent access:
- Limit delivery loop workers
- Use connection pooling
Replay Attacks¶
Symptom: Duplicate messages being processed.
Solutions:
-
Verify PRIMARY KEY constraint:
-
Check for inbox acknowledgment:
Reference¶
Mailbox Python Class¶
class Mailbox:
"""SQLite mailbox for store-and-forward messaging."""
def __init__(self, db_path: str = "data/mailbox.db")
def queue_outbox(
self,
envelope: dict,
recipient_http_url: str,
ttl_seconds: int = 86400
)
def get_pending_deliveries(self, limit: int = 10) -> List[dict]
def mark_delivered(self, message_id: str)
def mark_failed(self, message_id: str)
def store_inbox(
self,
message_id: str,
sender_node_id: str,
payload_type: str,
payload: dict
)
def get_inbox_unacked(
self,
payload_type: Optional[str] = None
) -> List[dict]
def ack_inbox(self, message_id: str)
def cleanup_expired(self) -> int
Message Envelope Structure¶
{
"message_id": "msg:<uuid>",
"sender_node_id": "gateway:abc123",
"recipient_node_id": "local:def456",
"timestamp": "2024-01-15T10:00:00Z",
"payload_type": "TASK_REQUEST",
"encrypted_payload": "<base64>",
"signature": "<base64>"
}
Next Steps¶
- Trust Pairing - Secure offline node pairing
- Execution Evidence - Audit trails in offline scenarios
- Capability Tokens - Offline authorization
- Tool Contracts - Define tools for edge execution