Skip to content

Offline Operation

Offline operation is a core capability of Adaptive Sentience, enabling reliable workflow execution in environments with unreliable connectivity. Using store-and-forward messaging and per-node mailboxes, requests are queued when nodes are offline and automatically delivered when connectivity returns.


Overview

Problem: Field operations often occur in environments with intermittent or no network connectivity. Traditional cloud-based systems fail immediately when offline, resulting in lost data, failed workflows, and frustrated users.

Solution: Adaptive Sentience uses a store-and-forward architecture where:

  • Requests are queued locally when target nodes are offline
  • Messages are automatically delivered when nodes come back online
  • No data is lost during network outages
  • Workflows continue executing as connectivity permits

Core Principles

1. Offline-First Architecture

The system assumes connectivity is unreliable and designs accordingly:

Client → Gateway → [Queue if offline] → Edge Node
              Store-and-Forward
              [Retry with backoff]
              Deliver when online

2. Per-Node Mailboxes

Each node has its own persistent mailbox (SQLite database):

/data/mailbox.db
├── outbox (messages to send)
└── inbox (messages received)

Messages persist across restarts and network outages.

3. Automatic Retry with Exponential Backoff

Failed deliveries are automatically retried:

Attempt 1: Immediate
Attempt 2: 5 seconds
Attempt 3: 10 seconds
Attempt 4: 20 seconds
Attempt 5: 40 seconds
Attempt 6+: 5 minutes (capped)

4. No Data Loss

Messages persist until: - Successfully delivered, OR - Expired (based on TTL)

Network outages never result in data loss.


Store-and-Forward Mechanism

How It Works

1. Message Queuing:

# Client sends request to gateway
response = requests.post(
    "http://gateway:8787/v1/tool_call",
    json={
        "target": {"node_id": "local:abc123"},
        "tool_name": "pii_redact",
        "tool_args": {"text": "..."}
    }
)

# If node offline:
# - Gateway queues message in mailbox
# - Returns queued status to client

2. Background Delivery:

# Gateway runs background delivery loop
while True:
    # Get pending deliveries
    pending = mailbox.get_pending_deliveries(limit=10)

    for message in pending:
        try:
            # Attempt delivery
            deliver_message(message)
            mailbox.mark_delivered(message["message_id"])
        except ConnectionError:
            # Failed - update retry backoff
            mailbox.mark_failed(message["message_id"])

    time.sleep(5)  # Check every 5 seconds

3. Receipt Acknowledgment:

# Recipient node receives message
inbox_message = mailbox.get_inbox_unacked()

# Process message
result = process_task(inbox_message["payload"])

# Acknowledge receipt
mailbox.ack_inbox(inbox_message["message_id"])

Message Lifecycle

Created → Queued in Outbox → Delivery Attempt → Failed?
                                                    ├─ Yes → Retry (backoff)
                                                    └─ No → Delivered
                                                     Stored in Inbox
                                                     Processed
                                                     Acknowledged
                                                     Removed from Inbox

Mailbox System

Mailbox Structure

Each node has a SQLite database:

-- Outbox: Messages queued for delivery
CREATE TABLE outbox (
    message_id TEXT PRIMARY KEY,
    recipient_node_id TEXT NOT NULL,
    recipient_http_url TEXT NOT NULL,
    envelope_json TEXT NOT NULL,
    attempts INTEGER DEFAULT 0,
    next_attempt_at TEXT NOT NULL,
    expires_at TEXT NOT NULL,
    created_at TEXT NOT NULL
);

-- Inbox: Received messages
CREATE TABLE inbox (
    message_id TEXT PRIMARY KEY,  -- Enforces replay protection
    sender_node_id TEXT NOT NULL,
    received_at TEXT NOT NULL,
    payload_type TEXT NOT NULL,
    decrypted_payload_json TEXT NOT NULL,
    acknowledged INTEGER DEFAULT 0
);

Outbox Operations

Queue Message:

from gateway.mesh_transport.mailbox import Mailbox

mailbox = Mailbox(db_path="data/mailbox.db")

mailbox.queue_outbox(
    envelope={
        "message_id": "msg:a1b2c3d4",
        "recipient_node_id": "local:abc123",
        "sender_node_id": "gateway:def456",
        "payload_type": "TASK_REQUEST",
        "encrypted_payload": "...",
        "signature": "..."
    },
    recipient_http_url="http://192.168.1.100:8000",
    ttl_seconds=86400  # 24 hours
)

Get Pending Deliveries:

# Get next 10 messages ready for delivery
pending = mailbox.get_pending_deliveries(limit=10)

for msg in pending:
    print(f"Message {msg['message_id']} to {msg['recipient_node_id']}")
    print(f"Attempts: {msg['attempts']}")
    print(f"Envelope: {msg['envelope']}")

Mark Delivered:

# Remove from outbox after successful delivery
mailbox.mark_delivered("msg:a1b2c3d4")

Mark Failed:

# Update retry backoff after failed delivery
mailbox.mark_failed("msg:a1b2c3d4")
# Next attempt scheduled based on exponential backoff

Inbox Operations

Store Received Message:

# Store incoming message (replay protection via PRIMARY KEY)
try:
    mailbox.store_inbox(
        message_id="msg:a1b2c3d4",
        sender_node_id="gateway:def456",
        payload_type="TASK_REQUEST",
        payload={
            "tool_name": "pii_redact",
            "tool_args": {"text": "..."},
            "token": {...}
        }
    )
except sqlite3.IntegrityError:
    # Duplicate message_id - replay attack detected
    print("Replay attack detected")

Get Unacknowledged Messages:

# Get all unprocessed messages
unacked = mailbox.get_inbox_unacked()

for msg in unacked:
    print(f"Message {msg['message_id']} from {msg['sender_node_id']}")
    print(f"Payload: {msg['payload']}")

Acknowledge Message:

# Mark message as processed
mailbox.ack_inbox("msg:a1b2c3d4")

Request Queuing

Client-Side Queuing

When the gateway is unreachable, clients should queue requests locally:

import sqlite3
import json
from datetime import datetime

class ClientQueue:
    """Local request queue for offline clients."""

    def __init__(self, db_path="client_queue.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS queued_requests (
                request_id TEXT PRIMARY KEY,
                gateway_url TEXT NOT NULL,
                request_json TEXT NOT NULL,
                created_at TEXT NOT NULL,
                attempts INTEGER DEFAULT 0
            )
            """
        )
        conn.commit()
        conn.close()

    def queue_request(self, gateway_url, request_data):
        """Queue a request for delivery."""
        request_id = f"req:{uuid.uuid4().hex[:12]}"
        created_at = datetime.utcnow().isoformat() + "Z"

        conn = sqlite3.connect(self.db_path)
        conn.execute(
            """
            INSERT INTO queued_requests (request_id, gateway_url, request_json, created_at)
            VALUES (?, ?, ?, ?)
            """,
            (request_id, gateway_url, json.dumps(request_data), created_at)
        )
        conn.commit()
        conn.close()

        print(f"Queued request {request_id}")

    def get_pending_requests(self):
        """Get all pending requests."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute(
            "SELECT request_id, gateway_url, request_json, attempts FROM queued_requests"
        )
        requests = [
            {
                "request_id": row[0],
                "gateway_url": row[1],
                "request_data": json.loads(row[2]),
                "attempts": row[3]
            }
            for row in cursor.fetchall()
        ]
        conn.close()
        return requests

    def mark_delivered(self, request_id):
        """Remove delivered request."""
        conn = sqlite3.connect(self.db_path)
        conn.execute("DELETE FROM queued_requests WHERE request_id = ?", (request_id,))
        conn.commit()
        conn.close()

    def mark_failed(self, request_id):
        """Increment attempt counter."""
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "UPDATE queued_requests SET attempts = attempts + 1 WHERE request_id = ?",
            (request_id,)
        )
        conn.commit()
        conn.close()

Usage:

queue = ClientQueue()

try:
    # Try direct delivery
    response = requests.post(gateway_url, json=request_data)
except requests.ConnectionError:
    # Gateway offline - queue for later
    queue.queue_request(gateway_url, request_data)
    print("Gateway offline - request queued")

# Background delivery loop
while True:
    pending = queue.get_pending_requests()

    for req in pending:
        try:
            response = requests.post(req["gateway_url"], json=req["request_data"])
            queue.mark_delivered(req["request_id"])
            print(f"Delivered {req['request_id']}")
        except requests.ConnectionError:
            queue.mark_failed(req["request_id"])

    time.sleep(10)

Gateway-Side Queuing

The gateway queues messages for offline edge nodes:

# In gateway router
def route_to_node(node_id, request):
    """Route request to node, queue if offline."""
    node = node_registry.get(node_id)

    if not node:
        raise ValueError(f"Node {node_id} not found")

    try:
        # Try direct delivery
        response = requests.post(
            node["http_url"],
            json=request,
            timeout=5.0
        )
        return response.json()

    except requests.RequestException:
        # Node offline - queue in mailbox
        envelope = create_envelope(request, node_id)
        mailbox.queue_outbox(
            envelope=envelope,
            recipient_http_url=node["http_url"]
        )

        return {
            "ok": False,
            "queued": True,
            "message": f"Node {node_id} offline - request queued",
            "message_id": envelope["message_id"]
        }

Delivery and Retry Logic

Exponential Backoff

Failed deliveries use exponential backoff to avoid overwhelming offline nodes:

def calculate_backoff(attempts):
    """Calculate retry backoff.

    Args:
        attempts: Number of previous attempts

    Returns:
        Seconds to wait before next attempt
    """
    # 2^attempts * 5 seconds, capped at 300 seconds (5 minutes)
    return min(2 ** attempts * 5, 300)

# Examples:
# Attempt 0: 5s
# Attempt 1: 10s
# Attempt 2: 20s
# Attempt 3: 40s
# Attempt 4: 80s
# Attempt 5: 160s
# Attempt 6+: 300s (capped)

Delivery Loop

from datetime import datetime, timedelta, timezone

def delivery_loop(mailbox):
    """Background delivery loop."""
    while True:
        try:
            # Get pending deliveries
            pending = mailbox.get_pending_deliveries(limit=10)

            for msg in pending:
                try:
                    # Attempt delivery
                    response = requests.post(
                        msg["recipient_http_url"],
                        json=msg["envelope"],
                        timeout=30.0
                    )

                    if response.status_code == 200:
                        # Success - remove from outbox
                        mailbox.mark_delivered(msg["message_id"])
                        print(f"Delivered {msg['message_id']}")
                    else:
                        # Failed - retry later
                        mailbox.mark_failed(msg["message_id"])
                        print(f"Delivery failed (HTTP {response.status_code})")

                except requests.RequestException as e:
                    # Connection error - retry later
                    mailbox.mark_failed(msg["message_id"])
                    print(f"Delivery failed: {e}")

            # Clean up expired messages
            mailbox.cleanup_expired()

        except Exception as e:
            print(f"Delivery loop error: {e}")

        # Check for new messages every 5 seconds
        time.sleep(5)

Message Expiration

Messages have a TTL (time-to-live) and are removed when expired:

# Queue with 24 hour TTL
mailbox.queue_outbox(
    envelope=envelope,
    recipient_http_url=url,
    ttl_seconds=86400  # 24 hours
)

# Cleanup expired messages
deleted = mailbox.cleanup_expired()
print(f"Removed {deleted} expired messages")

Offline Scenarios

Scenario 1: Edge Node Temporarily Offline

Setup: - Gateway online - Edge node loses connectivity for 30 minutes

Behavior:

  1. Client sends request to gateway
  2. Gateway attempts to forward to edge node
  3. Connection fails - gateway queues message
  4. Gateway returns {"queued": true} to client
  5. Edge node comes back online
  6. Gateway delivery loop detects node is online
  7. Message delivered automatically
  8. Result returned to client (if client still connected)

Timeline:

T+0:00  Client → Gateway (success)
T+0:01  Gateway → Edge Node (FAIL - offline)
T+0:01  Gateway queues message
T+0:06  Retry 1 (5s backoff) - FAIL
T+0:16  Retry 2 (10s backoff) - FAIL
T+0:36  Retry 3 (20s backoff) - FAIL
T+1:16  Retry 4 (40s backoff) - FAIL
...
T+30:00 Edge node comes online
T+30:05 Retry N - SUCCESS
T+30:05 Message delivered

Scenario 2: Gateway Temporarily Offline

Setup: - Edge nodes online - Gateway loses connectivity for 1 hour

Behavior:

  1. Client sends request to gateway
  2. Connection fails - client queues locally
  3. Client retries with exponential backoff
  4. Gateway comes back online
  5. Client delivers queued request
  6. Workflow executes normally

Timeline:

T+0:00  Client → Gateway (FAIL - offline)
T+0:00  Client queues request
T+0:05  Retry 1 - FAIL
T+0:15  Retry 2 - FAIL
T+0:35  Retry 3 - FAIL
...
T+60:00 Gateway comes online
T+60:05 Retry N - SUCCESS
T+60:06 Workflow executes

Scenario 3: Mesh Partition (Split-Brain)

Setup: - Gateway can reach Node A - Gateway cannot reach Node B - Nodes A and B can reach each other

Behavior:

  1. Gateway routes requests for B-only tools to B
  2. Connection fails - queued in gateway mailbox
  3. Requests for A-only tools succeed
  4. Network partition heals
  5. Queued messages for B delivered

Mitigation:

  • Use mesh-aware routing to find alternate paths
  • Enable peer-to-peer message forwarding
  • Configure fallback nodes for critical tools

Scenario 4: Fully Offline Operation

Setup: - Edge node operates completely offline - No gateway connectivity

Behavior:

  1. Client connects directly to edge node HTTP endpoint
  2. Tools execute locally without gateway
  3. Results returned immediately
  4. When connectivity returns, sync audit logs to gateway

Example:

# Direct edge node connection (offline)
response = requests.post(
    "http://192.168.1.100:8000/v1/tool_call",
    json={
        "tool_name": "pii_redact",
        "tool_args": {"text": "..."},
        "token": {"capabilities": ["tool:pii_redact"]}
    }
)

# Works offline - no gateway required

Failure Scenarios and Recovery

Network Partition

Failure: Network partition separates gateway from edge nodes.

Detection: - Delivery attempts fail - Nodes show as offline in mesh scan

Recovery: - Messages queue in gateway mailbox - Automatic delivery when partition heals - No manual intervention required

Prevention: - Use multiple gateways in different network segments - Enable peer-to-peer message forwarding

Node Crash

Failure: Edge node crashes while processing message.

Detection: - Message delivery succeeds but no response - Timeout in gateway

Recovery: - Idempotency keys prevent duplicate execution - Retry logic with idempotency check - Message re-queued if no acknowledgment

Prevention: - Use job IDs for idempotency - Persist job state before execution

Database Corruption

Failure: Mailbox SQLite database corrupted.

Detection: - SQLite errors when accessing mailbox - Delivery loop crashes

Recovery:

  1. Stop node:

    systemctl stop edge_node
    

  2. Backup corrupted DB:

    cp data/mailbox.db data/mailbox.db.corrupted
    

  3. Recover from WAL:

    sqlite3 data/mailbox.db "PRAGMA integrity_check;"
    sqlite3 data/mailbox.db "PRAGMA wal_checkpoint(FULL);"
    

  4. If recovery fails, restore from backup:

    cp /backup/mailbox.db data/mailbox.db
    

  5. Restart node:

    systemctl start edge_node
    

Prevention: - Use SQLite WAL mode (enabled by default) - Regular backups - Database integrity checks

Clock Skew

Failure: Node clocks are out of sync.

Detection: - Timestamps in messages don't match expectations - Signature verification may fail

Recovery: - Sync clocks using NTP:

sudo ntpdate -u pool.ntp.org

Prevention: - Run NTP daemon on all nodes - Monitor clock drift


Offline Testing

Simulating Network Outages

Disconnect node from network:

# Linux - block outbound traffic
sudo iptables -A OUTPUT -p tcp --dport 8787 -j DROP

# macOS - disable interface
sudo ifconfig en0 down

Restore connectivity:

# Linux
sudo iptables -D OUTPUT -p tcp --dport 8787 -j DROP

# macOS
sudo ifconfig en0 up

Testing Queuing Behavior

import time
import requests

# 1. Send request while node online
response = requests.post("http://localhost:8787/v1/tool_call", json={...})
assert response.json()["ok"]

# 2. Disconnect node
subprocess.run(["sudo", "iptables", "-A", "OUTPUT", "-p", "tcp", "--dport", "8000", "-j", "DROP"])

# 3. Send request while node offline
response = requests.post("http://localhost:8787/v1/tool_call", json={...})
assert response.json()["queued"]  # Queued for delivery

# 4. Reconnect node
subprocess.run(["sudo", "iptables", "-D", "OUTPUT", "-p", "tcp", "--dport", "8000", "-j", "DROP"])

# 5. Wait for automatic delivery
time.sleep(10)

# 6. Verify message was delivered
# (Check audit logs or query mailbox)

Testing Retry Logic

# Monitor retry attempts
def monitor_retries(mailbox, message_id):
    """Watch retry behavior for a message."""
    while True:
        conn = sqlite3.connect(mailbox.db_path)
        cursor = conn.execute(
            "SELECT attempts, next_attempt_at FROM outbox WHERE message_id = ?",
            (message_id,)
        )
        row = cursor.fetchone()
        conn.close()

        if not row:
            print(f"{message_id} delivered!")
            break

        attempts, next_attempt = row
        print(f"Attempts: {attempts}, Next retry: {next_attempt}")
        time.sleep(5)

# Monitor a queued message
monitor_retries(mailbox, "msg:abc123")

Performance Considerations

Mailbox Database Size

Monitor mailbox database growth:

import os

def get_mailbox_size(db_path):
    """Get mailbox database size in MB."""
    size_bytes = os.path.getsize(db_path)
    return size_bytes / (1024 * 1024)

size_mb = get_mailbox_size("data/mailbox.db")
if size_mb > 100:
    print(f"Warning: Mailbox database is {size_mb:.1f} MB")

Message Throughput

Tune delivery loop for throughput:

# Process more messages per iteration
pending = mailbox.get_pending_deliveries(limit=100)  # Increased from 10

# Parallel delivery
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(deliver_message, msg) for msg in pending]
    for future in futures:
        future.result()

Database Optimization

Optimize SQLite performance:

conn = sqlite3.connect("data/mailbox.db")

# Enable WAL mode for better concurrency
conn.execute("PRAGMA journal_mode=WAL")

# Increase cache size
conn.execute("PRAGMA cache_size=-64000")  # 64MB cache

# Optimize indexes
conn.execute("ANALYZE")

conn.close()

Best Practices

1. Set Reasonable TTLs

Don't use infinite TTLs:

# Good - 24 hour TTL
mailbox.queue_outbox(envelope, url, ttl_seconds=86400)

# Bad - 1 year TTL
mailbox.queue_outbox(envelope, url, ttl_seconds=31536000)

2. Monitor Queue Depth

Alert on growing queues:

def check_queue_depth(mailbox):
    """Alert if queue depth exceeds threshold."""
    conn = sqlite3.connect(mailbox.db_path)
    cursor = conn.execute("SELECT COUNT(*) FROM outbox")
    count = cursor.fetchone()[0]
    conn.close()

    if count > 1000:
        alert(f"Queue depth critical: {count} messages")

3. Regular Cleanup

Clean up old messages:

# Daily cleanup of expired messages
0 2 * * * /usr/local/bin/cleanup_mailbox.sh

4. Backup Mailboxes

Backup mailbox databases:

# Backup while node is running (using WAL mode)
sqlite3 data/mailbox.db ".backup /backup/mailbox_$(date +\%Y\%m\%d).db"

5. Use Idempotency

Always use job IDs for idempotency:

response = requests.post("http://gateway/v1/tool_call", json={
    "job_id": "job:unique123",  # Prevents duplicate execution
    "tool_name": "pii_redact",
    "tool_args": {...}
})

Advanced Topics

Peer-to-Peer Message Forwarding

Allow edge nodes to forward messages to each other:

# Node A receives message for Node B
# Node A can reach B, but gateway cannot
# Node A forwards to B on gateway's behalf

def forward_message(envelope, recipient_url):
    """Forward message to peer node."""
    try:
        response = requests.post(recipient_url, json=envelope)
        return response.json()
    except requests.RequestException:
        # Peer also offline - don't queue (gateway will retry)
        return None

Message Priority

Prioritize urgent messages:

-- Add priority field to outbox
ALTER TABLE outbox ADD COLUMN priority INTEGER DEFAULT 0;

-- Get pending with priority
SELECT * FROM outbox
WHERE next_attempt_at <= ?
ORDER BY priority DESC, next_attempt_at ASC
LIMIT ?

Compression

Compress large payloads:

import gzip
import base64

def compress_payload(payload):
    """Compress payload using gzip."""
    json_bytes = json.dumps(payload).encode()
    compressed = gzip.compress(json_bytes)
    return base64.b64encode(compressed).decode()

def decompress_payload(compressed):
    """Decompress payload."""
    compressed_bytes = base64.b64decode(compressed)
    json_bytes = gzip.decompress(compressed_bytes)
    return json.loads(json_bytes)

Troubleshooting

Messages Not Delivering

Symptom: Messages stuck in outbox.

Solutions:

  1. Check network connectivity:

    ping 192.168.1.100
    curl http://192.168.1.100:8000/health
    

  2. Check delivery loop is running:

    ps aux | grep delivery_loop
    

  3. Check outbox:

    sqlite3 data/mailbox.db "SELECT COUNT(*) FROM outbox"
    

  4. Manual delivery:

    # Force immediate delivery attempt
    pending = mailbox.get_pending_deliveries(limit=1000)
    for msg in pending:
        deliver_message(msg)
    

Database Locked

Symptom: database is locked error.

Solutions:

  1. Enable WAL mode:

    PRAGMA journal_mode=WAL;
    

  2. Increase timeout:

    conn = sqlite3.connect("data/mailbox.db", timeout=30.0)
    

  3. Reduce concurrent access:

  4. Limit delivery loop workers
  5. Use connection pooling

Replay Attacks

Symptom: Duplicate messages being processed.

Solutions:

  1. Verify PRIMARY KEY constraint:

    SELECT sql FROM sqlite_master WHERE name='inbox';
    -- Should show: message_id TEXT PRIMARY KEY
    

  2. Check for inbox acknowledgment:

    # Always acknowledge after processing
    mailbox.ack_inbox(message_id)
    


Reference

Mailbox Python Class

class Mailbox:
    """SQLite mailbox for store-and-forward messaging."""

    def __init__(self, db_path: str = "data/mailbox.db")

    def queue_outbox(
        self,
        envelope: dict,
        recipient_http_url: str,
        ttl_seconds: int = 86400
    )

    def get_pending_deliveries(self, limit: int = 10) -> List[dict]

    def mark_delivered(self, message_id: str)

    def mark_failed(self, message_id: str)

    def store_inbox(
        self,
        message_id: str,
        sender_node_id: str,
        payload_type: str,
        payload: dict
    )

    def get_inbox_unacked(
        self,
        payload_type: Optional[str] = None
    ) -> List[dict]

    def ack_inbox(self, message_id: str)

    def cleanup_expired(self) -> int

Message Envelope Structure

{
  "message_id": "msg:<uuid>",
  "sender_node_id": "gateway:abc123",
  "recipient_node_id": "local:def456",
  "timestamp": "2024-01-15T10:00:00Z",
  "payload_type": "TASK_REQUEST",
  "encrypted_payload": "<base64>",
  "signature": "<base64>"
}

Next Steps


Further Reading