Skip to content

Mesh Transport API Reference

The Mesh Transport layer provides secure, encrypted communication between nodes in the Adaptive Sentience network. It implements UDP multicast discovery for node announcement, HTTP-based task delegation, and store-and-forward messaging for offline-tolerant operation.

Overview

Mesh Transport consists of four key components:

  1. UDP Multicast Discovery: Nodes broadcast announcements on the local network
  2. HTTP Communication: Tool calls and task delegation over HTTP
  3. E2EE Task Envelopes: Encrypted, signed messages for agent-to-agent communication
  4. Store-and-Forward Mailbox: Retry logic for offline/intermittent connectivity

Architecture

┌─────────────────────────────────────────────────────────────┐
│ Discovery Layer (UDP Multicast)                             │
│ ┌─────────────┐        ┌─────────────┐                      │
│ │   DISCOVER  │───────>│  ADVERTISE  │                      │
│ │  (broadcast)│        │  (response) │                      │
│ └─────────────┘        └─────────────┘                      │
│      Port: 37020          Signed with Ed25519               │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ HTTP Communication Layer                                    │
│ ┌──────────────┐      ┌──────────────┐                      │
│ │   Gateway    │─────>│  Edge Node   │                      │
│ │ /v1/tool_call│      │ /capabilities│                      │
│ └──────────────┘      └──────────────┘                      │
│      Timeout: 10s         Verified: Ed25519 signature       │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ E2EE Mesh Transport (Agent-to-Agent)                        │
│ ┌────────────────┐    ┌────────────────┐                   │
│ │ TaskEnvelope   │───>│  X25519 ECDH   │                   │
│ │ (signed)       │    │  + ChaCha20    │                   │
│ └────────────────┘    └────────────────┘                   │
│      TTL: 24h            Replay Protection                  │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Store-and-Forward Mailbox                                   │
│ ┌──────────┐  ┌──────────┐  ┌──────────┐                   │
│ │  Queue   │─>│  Retry   │─>│ Deliver  │                   │
│ │ Messages │  │ (exp BO) │  │  (HTTP)  │                   │
│ └──────────┘  └──────────┘  └──────────┘                   │
│   SQLite DB     Max: 10 retries   Interval: 10s            │
└─────────────────────────────────────────────────────────────┘

UDP Multicast Discovery

Discovery Protocol

Nodes discover each other via UDP broadcast on the local network.

Discovery Port: 37020

Broadcast Address: 255.255.255.255

Default Scan Duration: 6 seconds

Socket Timeout: 0.5 seconds

Message Format

DISCOVER Message

Scanner broadcasts a DISCOVER message to find active nodes.

Message Type: DISCOVER

Format:

{
  "type": "DISCOVER",
  "nonce": "unique_16char_id",
  "reply_to": 37020,
  "timestamp": "2026-01-24T12:00:00Z"
}

Fields:

Field Type Description
type string Always "DISCOVER"
nonce string 16-character unique identifier (UUID prefix)
reply_to integer Port for replies (0 = same as broadcast port)
timestamp string ISO 8601 timestamp (UTC)

Python Example:

import socket
import json
import uuid
from datetime import datetime

# Create DISCOVER message
message = {
    "type": "DISCOVER",
    "nonce": str(uuid.uuid4())[:16],
    "reply_to": 37020,
    "timestamp": datetime.utcnow().isoformat() + "Z"
}

# Broadcast
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(
    json.dumps(message).encode(),
    ("255.255.255.255", 37020)
)

Nodes respond with signed ADVERTISE messages.

Message Type: ADVERTISE

Format:

{
  "type": "ADVERTISE",
  "payload": {
    "node_id": "node_abc123",
    "node_type": "edge",
    "http_url": "http://192.168.1.100:8000",
    "tools_hash": "sha256:abc123def456",
    "location": {
      "lat": 37.7749,
      "lon": -122.4194,
      "accuracy_m": 50,
      "timestamp": "2026-01-24T12:00:00Z"
    }
  },
  "signature": "base64_ed25519_signature",
  "public_key": "base64_ed25519_pubkey"
}

Fields:

Field Type Description
type string Always "ADVERTISE"
payload object Node advertisement data
payload.node_id string Node identifier
payload.node_type string Platform type (android, macos, popos, etc.)
payload.http_url string HTTP endpoint URL
payload.tools_hash string SHA256 hash of available tools
payload.location object Geographic location (optional)
signature string Base64-encoded Ed25519 signature
public_key string Base64-encoded Ed25519 public key

Signature Verification:

from edge_node.verify import verify_signed_dict

# Verify ADVERTISE message
valid = verify_signed_dict(
    data=message["payload"],
    signature_b64=message["signature"],
    public_key_b64=message["public_key"]
)

if not valid:
    print("WARNING: Signature verification failed!")

Discovery Scan Flow

Scanner                         Node A                      Node B
   |                               |                           |
   |-- DISCOVER (broadcast) ------>|                           |
   |                               |                           |
   |                               |<-- ADVERTISE (signed) ----|
   |<-- ADVERTISE (signed) --------|                           |
   |                               |                           |
   |-- GET /capabilities --------->|                           |
   |<-- {tools, location} ---------|                           |
   |                               |                           |

Step-by-step:

  1. Broadcast DISCOVER: Scanner sends UDP broadcast
  2. Receive ADVERTISE: Nodes respond with signed advertisements
  3. Verify Signatures: Scanner verifies Ed25519 signatures
  4. Enrich (optional): Scanner fetches /capabilities via HTTP
  5. Build Snapshot: Aggregated mesh snapshot with all discovered nodes

Python Example: Complete Discovery Scan

from discovery import scan_nodes, MeshSnapshotBuilder

# Scan for nodes (6 seconds)
scan_results = scan_nodes(duration=6.0, verbose=True)

# Build mesh snapshot
snapshot = MeshSnapshotBuilder.build(scan_results)

print(f"Discovered {len(snapshot.nodes)} nodes:")
for node in snapshot.nodes:
    print(f"  {node.node_id}: {node.node_type} at {node.http_url}")
    print(f"    Verified: {node.verified}")
    if node.location:
        print(f"    Location: {node.location['lat']}, {node.location['lon']}")

HTTP Communication Between Nodes

Node-to-Node HTTP Endpoints

Edge nodes expose HTTP endpoints for tool execution and capabilities.

GET /capabilities

Retrieve node capabilities, tools, and location.

Request:

curl http://192.168.1.100:8000/capabilities

Response:

{
  "node_id": "node_abc123",
  "node_type": "edge",
  "tools": [
    {
      "name": "pii_redact",
      "version": "1.0.0",
      "description": "Redact PII from text",
      "required_capability": "tool:pii_redact"
    },
    {
      "name": "unit_convert",
      "version": "1.0.0",
      "description": "Convert between units",
      "required_capability": "tool:unit_convert"
    }
  ],
  "location": {
    "lat": 37.7749,
    "lon": -122.4194,
    "accuracy_m": 50
  }
}

POST /execute_tool

Execute a tool on the edge node.

Request:

curl -X POST http://192.168.1.100:8000/execute_tool \
  -H "Content-Type: application/json" \
  -d '{
    "tool_name": "unit_convert",
    "tool_args": {
      "value": 10,
      "from_unit": "miles",
      "to_unit": "km"
    },
    "trace_id": "trace_abc123",
    "capability_token": null,
    "job_id": "job_xyz789"
  }'

Request Fields:

Field Type Required Description
tool_name string Yes Tool to execute
tool_args object Yes Tool-specific arguments
trace_id string No Trace ID for tracking
capability_token object No Capability token (required in production)
job_id string No Job ID for idempotency

Response:

{
  "ok": true,
  "result": {
    "value": 16.0934,
    "from_unit": "miles",
    "to_unit": "km"
  },
  "error": null,
  "trace_id": "trace_abc123",
  "tool_name": "unit_convert",
  "tool_version": "1.0.0",
  "node_id": "node_abc123",
  "verified": true,
  "signature": "base64_ed25519_signature",
  "public_key": "base64_ed25519_pubkey",
  "execution_time_ms": 5
}

Response Fields:

Field Type Description
ok boolean Whether execution succeeded
result object Tool execution result (null on error)
error string Error message (null on success)
trace_id string Trace ID from request
tool_name string Tool name executed
tool_version string Tool version
node_id string Node that executed the tool
verified boolean Whether response is signed
signature string Ed25519 signature (base64)
public_key string Ed25519 public key (base64)
execution_time_ms integer Execution latency in milliseconds

Timeout and Retry Behavior

Timeouts:

Operation Default Timeout Configurable
Tool Execution 10 seconds Yes (via environment)
Capabilities Fetch 5 seconds Yes
Discovery Scan 6 seconds Yes (via parameter)

Retry Logic:

# Gateway implements exponential backoff for remote tool calls
max_retries = 3
base_delay = 1.0  # seconds

for attempt in range(max_retries):
    try:
        response = requests.post(
            f"{http_url}/execute_tool",
            json=request,
            timeout=10
        )
        return response.json()
    except requests.Timeout:
        if attempt < max_retries - 1:
            delay = base_delay * (2 ** attempt)
            time.sleep(delay)
        else:
            raise

Failover:

If primary target fails, gateway can failover to backup nodes:

from gateway.router import ToolRouter

router = ToolRouter(unified_client, mesh_cache, trust_store)

result = router.route_tool_call(
    tool_name="pii_redact",
    tool_args={"text": "Email: admin@example.com"},
    failover_enabled=True
)

if result["degraded"]:
    print(f"Primary error: {result['primary_error']}")
    print(f"Final target: {result['final_target']}")

E2EE Task Envelopes

Task envelopes provide end-to-end encryption for agent-to-agent task delegation.

Cryptographic Design

Key Agreement: X25519 Elliptic Curve Diffie-Hellman (ECDH)

Encryption: ChaCha20-Poly1305 AEAD (Authenticated Encryption with Associated Data)

Authentication: Ed25519 Digital Signatures

Replay Protection: Message ID deduplication

TTL Enforcement: Timestamp-based expiry

Message Types

from enum import Enum

class MessageType(str, Enum):
    TASK_REQUEST = "TASK_REQUEST"   # Request task execution
    TASK_RESULT = "TASK_RESULT"     # Return task result

Envelope Structure

{
  "message_id": "unique_uuid",
  "type": "TASK_REQUEST",
  "sender_node_id": "node_sender",
  "recipient_node_id": "node_recipient",
  "created_at": "2026-01-24T12:00:00+00:00",
  "ttl_s": 86400,
  "enc": {
    "nonce": "base64_nonce",
    "ciphertext": "base64_encrypted_payload"
  },
  "signature": "base64_ed25519_signature"
}

Fields:

Field Type Description
message_id string Unique UUID for replay protection
type string TASK_REQUEST or TASK_RESULT
sender_node_id string Sender's node ID
recipient_node_id string Recipient's node ID
created_at string ISO 8601 timestamp (UTC)
ttl_s integer Time-to-live in seconds
enc object Encrypted payload
enc.nonce string ChaCha20 nonce (base64)
enc.ciphertext string Encrypted payload (base64)
signature string Ed25519 signature over canonical JSON

Creating an Envelope

from gateway.mesh_transport import TaskEnvelope, MessageType, MeshCrypto
from edge_node.identity import NodeIdentity

# Initialize
identity = NodeIdentity("gateway_identity.pem")
mesh_crypto = MeshCrypto("keys/")

envelope_handler = TaskEnvelope(identity, mesh_crypto)

# Create task request
payload = {
    "tool_name": "pii_redact",
    "tool_args": {
        "text": "Sensitive data: admin@example.com"
    },
    "trace_id": "trace_abc123"
}

envelope = envelope_handler.create(
    message_type=MessageType.TASK_REQUEST,
    payload_dict=payload,
    recipient_node_id="node_edge_1",
    recipient_x25519_pubkey="base64_x25519_pubkey",
    ttl_seconds=86400  # 24 hours
)

print(envelope)

Verifying and Decrypting an Envelope

# Receive envelope from network
received_envelope = {
    "message_id": "...",
    "type": "TASK_REQUEST",
    # ... full envelope
}

# Verify and decrypt
metadata, payload = envelope_handler.verify_and_decrypt(
    envelope=received_envelope,
    sender_ed25519_pubkey_b64="sender_ed25519_pubkey",
    sender_x25519_pubkey_b64="sender_x25519_pubkey"
)

print(f"Message ID: {metadata['message_id']}")
print(f"Sender: {metadata['sender_node_id']}")
print(f"Type: {metadata['type']}")
print(f"Payload: {payload}")

Verification Failures:

try:
    metadata, payload = envelope_handler.verify_and_decrypt(
        envelope=envelope,
        sender_ed25519_pubkey_b64=pubkey,
        sender_x25519_pubkey_b64=x25519_pubkey
    )
except ValueError as e:
    if "Signature verification failed" in str(e):
        print("Invalid signature - possible tampering")
    elif "expired" in str(e):
        print("Message TTL expired")
    elif "decryption" in str(e):
        print("Decryption failed - wrong keys or corrupted")

Packet Format Example

Encrypted TASK_REQUEST:

{
  "message_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "type": "TASK_REQUEST",
  "sender_node_id": "node_gateway_abc",
  "recipient_node_id": "node_edge_xyz",
  "created_at": "2026-01-24T12:00:00+00:00",
  "ttl_s": 86400,
  "enc": {
    "nonce": "AAAAAAAAAAAAAAAAAAAAAA==",
    "ciphertext": "ZW5jcnlwdGVkX3BheWxvYWRfaGVyZQ=="
  },
  "signature": "c2lnbmF0dXJlX2Jhc2U2NF9lbmNvZGVk"
}

Decrypted Payload:

{
  "tool_name": "pii_redact",
  "tool_args": {
    "text": "Contact admin@example.com for support"
  },
  "trace_id": "trace_abc123"
}

Store-and-Forward Mailbox

The mailbox provides offline-tolerant messaging with retry logic.

Mailbox Database Schema

SQLite Database: data/mailbox.db

Table: messages

CREATE TABLE messages (
    message_id TEXT PRIMARY KEY,
    recipient_node_id TEXT NOT NULL,
    envelope TEXT NOT NULL,
    created_at TIMESTAMP NOT NULL,
    expires_at TIMESTAMP NOT NULL,
    delivered BOOLEAN DEFAULT 0,
    delivered_at TIMESTAMP,
    retry_count INTEGER DEFAULT 0,
    last_retry TIMESTAMP
);

CREATE INDEX idx_recipient ON messages(recipient_node_id);
CREATE INDEX idx_delivered ON messages(delivered);
CREATE INDEX idx_expires ON messages(expires_at);

Mailbox Operations

Queue Message

from gateway.mesh_transport import Mailbox

mailbox = Mailbox("data/mailbox.db")

# Queue encrypted envelope for delivery
mailbox.queue(
    message_id=envelope["message_id"],
    recipient_node_id=envelope["recipient_node_id"],
    envelope=envelope
)

print("Message queued for delivery")

Mark Delivered

# After successful delivery
mailbox.mark_delivered(message_id="a1b2c3d4-...")

print("Message marked as delivered")

Get Pending Messages

# Get all undelivered messages for a recipient
pending = mailbox.get_pending(recipient_node_id="node_edge_xyz")

for msg in pending:
    print(f"Message: {msg['message_id']}")
    print(f"  Created: {msg['created_at']}")
    print(f"  Retries: {msg['retry_count']}")
    print(f"  Last retry: {msg['last_retry']}")

Update Retry

# Increment retry count
mailbox.update_retry(message_id="a1b2c3d4-...")

Cleanup Expired

# Remove expired messages
expired_count = mailbox.cleanup_expired()

print(f"Removed {expired_count} expired messages")

Delivery Worker

The delivery worker runs in the background and attempts to deliver pending messages.

Retry Strategy: Exponential backoff with max retries

Retry Schedule:

Attempt Delay Backoff Factor
1 0s -
2 10s 2x
3 20s 2x
4 40s 2x
5 80s 2x
6-10 160s 2x (capped)

Max Retries: 10

Worker Interval: 10 seconds (configurable via DWO_DELIVERY_INTERVAL)

Starting the Delivery Worker

from gateway.mesh_transport import DeliveryWorker, Mailbox

mailbox = Mailbox("data/mailbox.db")
worker = DeliveryWorker(mailbox, interval_seconds=10)

# Start background worker
worker.start()

# Worker runs indefinitely, retrying failed deliveries

# Stop worker (graceful shutdown)
worker.stop()

Delivery Flow

Mailbox                    DeliveryWorker                 Recipient Node
   |                              |                              |
   |-- queue(envelope) ---------->|                              |
   |                              |                              |
   |                              |-- POST /mesh/receive ------->|
   |                              |                              |
   |                              |<-- 200 OK -------------------|
   |<-- mark_delivered() ---------|                              |
   |                              |                              |
   |                         (if failed)                         |
   |                              |                              |
   |<-- update_retry() -----------|                              |
   |                              |                              |
   |                         (wait backoff)                      |
   |                              |                              |
   |                              |-- POST /mesh/receive ------->|

HTTP Endpoint for Receiving Messages

Edge nodes expose /mesh/receive to accept task envelopes:

curl -X POST http://192.168.1.100:8000/mesh/receive \
  -H "Content-Type: application/json" \
  -d '{
    "message_id": "a1b2c3d4-...",
    "type": "TASK_REQUEST",
    "sender_node_id": "node_gateway",
    "recipient_node_id": "node_edge_1",
    "created_at": "2026-01-24T12:00:00+00:00",
    "ttl_s": 86400,
    "enc": {
      "nonce": "...",
      "ciphertext": "..."
    },
    "signature": "..."
  }'

Response (Success):

{
  "status": "received",
  "message_id": "a1b2c3d4-..."
}

Response (Error):

{
  "status": "error",
  "message": "Signature verification failed"
}

Security Considerations

Cryptographic Guarantees

  1. Confidentiality: X25519 ECDH + ChaCha20-Poly1305 ensures only recipient can decrypt
  2. Authenticity: Ed25519 signatures prove sender identity
  3. Integrity: AEAD guarantees message hasn't been tampered
  4. Replay Protection: Message ID deduplication prevents replay attacks
  5. Forward Secrecy: Ephemeral keys (if implemented) provide forward secrecy

Trust Model

  • Trust on First Use (TOFU): Initial trust established via QR code pairing
  • Trust Store: Maintains list of trusted/blocked nodes
  • Signature Verification: All messages verified before processing
  • TTL Enforcement: Old messages automatically rejected

Attack Resistance

Attack Mitigation
Man-in-the-Middle Ed25519 signatures + E2EE
Replay Attacks Message ID deduplication
Eavesdropping ChaCha20-Poly1305 encryption
Tampering AEAD authentication tag
Denial of Service Rate limiting (future), TTL expiry
Sybil Attacks Trust store + signature verification

Performance Metrics

Discovery Performance

Metric Value Notes
Scan Duration 6 seconds Configurable
Max Nodes Discovered 100+ Limited by network bandwidth
Packet Size ~500 bytes ADVERTISE message
Network Bandwidth ~1 KB/s During discovery

E2EE Performance

Operation Latency Notes
Envelope Creation 1-3ms Includes encryption + signing
Envelope Verification 2-5ms Includes signature verification + decryption
Key Generation (X25519) 5-10ms One-time per session
Key Generation (Ed25519) 5-10ms One-time per node

Mailbox Performance

Operation Latency Notes
Queue Message 1-2ms SQLite INSERT
Mark Delivered 1-2ms SQLite UPDATE
Get Pending 5-10ms SQLite SELECT
Cleanup Expired 10-50ms Depends on message count

Troubleshooting

Discovery Issues

Problem: No nodes discovered

Solutions:

# Check UDP port is open
sudo lsof -i UDP:37020

# Check firewall
sudo ufw status

# Test manual broadcast
python -c "
import socket, json, uuid
from datetime import datetime
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
msg = {'type': 'DISCOVER', 'nonce': str(uuid.uuid4())[:16], 'reply_to': 37020, 'timestamp': datetime.utcnow().isoformat() + 'Z'}
sock.sendto(json.dumps(msg).encode(), ('255.255.255.255', 37020))
print('Sent DISCOVER')
"

Problem: Signature verification failing

Solution: Check public key format and trust store

from trust.store import TrustStore

trust = TrustStore()
nodes = trust.list_all()

for node in nodes["trusted"]:
    print(f"{node['node_id']}: {node['public_key_fingerprint']}")

E2EE Issues

Problem: Decryption failing

Solution: Verify keys match

# Check X25519 public key
from gateway.mesh_transport import MeshCrypto

crypto = MeshCrypto("keys/")
pubkey = crypto.get_public_key()
print(f"My X25519 public key: {pubkey}")

# Verify recipient has correct sender key

Problem: "Message expired" error

Solution: Check system time sync

# Check time sync
timedatectl status

# Sync time
sudo ntpdate -s time.nist.gov

Mailbox Issues

Problem: Messages not being delivered

Solution: Check delivery worker status

from gateway.mesh_transport import DeliveryWorker, Mailbox

mailbox = Mailbox("data/mailbox.db")

# Check pending messages
pending = mailbox.get_pending()
print(f"Pending messages: {len(pending)}")

for msg in pending:
    print(f"  {msg['message_id']}: retries={msg['retry_count']}")

# Check if worker is running
# (should see periodic delivery attempts in logs)

Complete Example: Sending Encrypted Task

from gateway.mesh_transport import (
    TaskEnvelope, MessageType, MeshCrypto, Mailbox, DeliveryWorker
)
from edge_node.identity import NodeIdentity
from trust.store import TrustStore

# Initialize
gateway_identity = NodeIdentity("gateway_identity.pem")
mesh_crypto = MeshCrypto("keys/")
mailbox = Mailbox("data/mailbox.db")
trust_store = TrustStore()

# Get recipient info from trust store
recipient_node_id = "node_edge_1"
recipient_info = trust_store.get_node(recipient_node_id)

if not recipient_info:
    raise ValueError(f"Node {recipient_node_id} not in trust store")

recipient_x25519_pubkey = recipient_info["x25519_pubkey"]
recipient_http_url = recipient_info["http_url"]

# Create envelope handler
envelope_handler = TaskEnvelope(gateway_identity, mesh_crypto)

# Create task request
payload = {
    "tool_name": "pii_redact",
    "tool_args": {
        "text": "Contact admin@company.com for support"
    },
    "trace_id": "trace_abc123"
}

# Create signed + encrypted envelope
envelope = envelope_handler.create(
    message_type=MessageType.TASK_REQUEST,
    payload_dict=payload,
    recipient_node_id=recipient_node_id,
    recipient_x25519_pubkey=recipient_x25519_pubkey,
    ttl_seconds=86400
)

print(f"Created envelope: {envelope['message_id']}")

# Queue for delivery
mailbox.queue(
    message_id=envelope["message_id"],
    recipient_node_id=recipient_node_id,
    envelope=envelope
)

print("Envelope queued for delivery")

# Start delivery worker
worker = DeliveryWorker(mailbox, interval_seconds=10)
worker.start()

print("Delivery worker started - will retry until delivered")

Changelog

  • v1.0.0 (2026-01-24): Initial mesh transport implementation
  • UDP multicast discovery with Ed25519 signatures
  • HTTP communication for tool execution
  • E2EE task envelopes with X25519 + ChaCha20-Poly1305
  • Store-and-forward mailbox with exponential backoff
  • Replay protection and TTL enforcement