Mesh Transport API Reference¶
The Mesh Transport layer provides secure, encrypted communication between nodes in the Adaptive Sentience network. It implements UDP multicast discovery for node announcement, HTTP-based task delegation, and store-and-forward messaging for offline-tolerant operation.
Overview¶
Mesh Transport consists of four key components:
- UDP Multicast Discovery: Nodes broadcast announcements on the local network
- HTTP Communication: Tool calls and task delegation over HTTP
- E2EE Task Envelopes: Encrypted, signed messages for agent-to-agent communication
- Store-and-Forward Mailbox: Retry logic for offline/intermittent connectivity
Architecture¶
┌─────────────────────────────────────────────────────────────┐
│ Discovery Layer (UDP Multicast) │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ DISCOVER │───────>│ ADVERTISE │ │
│ │ (broadcast)│ │ (response) │ │
│ └─────────────┘ └─────────────┘ │
│ Port: 37020 Signed with Ed25519 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ HTTP Communication Layer │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Gateway │─────>│ Edge Node │ │
│ │ /v1/tool_call│ │ /capabilities│ │
│ └──────────────┘ └──────────────┘ │
│ Timeout: 10s Verified: Ed25519 signature │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ E2EE Mesh Transport (Agent-to-Agent) │
│ ┌────────────────┐ ┌────────────────┐ │
│ │ TaskEnvelope │───>│ X25519 ECDH │ │
│ │ (signed) │ │ + ChaCha20 │ │
│ └────────────────┘ └────────────────┘ │
│ TTL: 24h Replay Protection │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Store-and-Forward Mailbox │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Queue │─>│ Retry │─>│ Deliver │ │
│ │ Messages │ │ (exp BO) │ │ (HTTP) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ SQLite DB Max: 10 retries Interval: 10s │
└─────────────────────────────────────────────────────────────┘
UDP Multicast Discovery¶
Discovery Protocol¶
Nodes discover each other via UDP broadcast on the local network.
Discovery Port: 37020
Broadcast Address: 255.255.255.255
Default Scan Duration: 6 seconds
Socket Timeout: 0.5 seconds
Message Format¶
DISCOVER Message¶
Scanner broadcasts a DISCOVER message to find active nodes.
Message Type: DISCOVER
Format:
{
"type": "DISCOVER",
"nonce": "unique_16char_id",
"reply_to": 37020,
"timestamp": "2026-01-24T12:00:00Z"
}
Fields:
| Field | Type | Description |
|---|---|---|
type |
string | Always "DISCOVER" |
nonce |
string | 16-character unique identifier (UUID prefix) |
reply_to |
integer | Port for replies (0 = same as broadcast port) |
timestamp |
string | ISO 8601 timestamp (UTC) |
Python Example:
import socket
import json
import uuid
from datetime import datetime
# Create DISCOVER message
message = {
"type": "DISCOVER",
"nonce": str(uuid.uuid4())[:16],
"reply_to": 37020,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
# Broadcast
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(
json.dumps(message).encode(),
("255.255.255.255", 37020)
)
ADVERTISE Message¶
Nodes respond with signed ADVERTISE messages.
Message Type: ADVERTISE
Format:
{
"type": "ADVERTISE",
"payload": {
"node_id": "node_abc123",
"node_type": "edge",
"http_url": "http://192.168.1.100:8000",
"tools_hash": "sha256:abc123def456",
"location": {
"lat": 37.7749,
"lon": -122.4194,
"accuracy_m": 50,
"timestamp": "2026-01-24T12:00:00Z"
}
},
"signature": "base64_ed25519_signature",
"public_key": "base64_ed25519_pubkey"
}
Fields:
| Field | Type | Description |
|---|---|---|
type |
string | Always "ADVERTISE" |
payload |
object | Node advertisement data |
payload.node_id |
string | Node identifier |
payload.node_type |
string | Platform type (android, macos, popos, etc.) |
payload.http_url |
string | HTTP endpoint URL |
payload.tools_hash |
string | SHA256 hash of available tools |
payload.location |
object | Geographic location (optional) |
signature |
string | Base64-encoded Ed25519 signature |
public_key |
string | Base64-encoded Ed25519 public key |
Signature Verification:
from edge_node.verify import verify_signed_dict
# Verify ADVERTISE message
valid = verify_signed_dict(
data=message["payload"],
signature_b64=message["signature"],
public_key_b64=message["public_key"]
)
if not valid:
print("WARNING: Signature verification failed!")
Discovery Scan Flow¶
Scanner Node A Node B
| | |
|-- DISCOVER (broadcast) ------>| |
| | |
| |<-- ADVERTISE (signed) ----|
|<-- ADVERTISE (signed) --------| |
| | |
|-- GET /capabilities --------->| |
|<-- {tools, location} ---------| |
| | |
Step-by-step:
- Broadcast DISCOVER: Scanner sends UDP broadcast
- Receive ADVERTISE: Nodes respond with signed advertisements
- Verify Signatures: Scanner verifies Ed25519 signatures
- Enrich (optional): Scanner fetches
/capabilitiesvia HTTP - Build Snapshot: Aggregated mesh snapshot with all discovered nodes
Python Example: Complete Discovery Scan¶
from discovery import scan_nodes, MeshSnapshotBuilder
# Scan for nodes (6 seconds)
scan_results = scan_nodes(duration=6.0, verbose=True)
# Build mesh snapshot
snapshot = MeshSnapshotBuilder.build(scan_results)
print(f"Discovered {len(snapshot.nodes)} nodes:")
for node in snapshot.nodes:
print(f" {node.node_id}: {node.node_type} at {node.http_url}")
print(f" Verified: {node.verified}")
if node.location:
print(f" Location: {node.location['lat']}, {node.location['lon']}")
HTTP Communication Between Nodes¶
Node-to-Node HTTP Endpoints¶
Edge nodes expose HTTP endpoints for tool execution and capabilities.
GET /capabilities¶
Retrieve node capabilities, tools, and location.
Request:
Response:
{
"node_id": "node_abc123",
"node_type": "edge",
"tools": [
{
"name": "pii_redact",
"version": "1.0.0",
"description": "Redact PII from text",
"required_capability": "tool:pii_redact"
},
{
"name": "unit_convert",
"version": "1.0.0",
"description": "Convert between units",
"required_capability": "tool:unit_convert"
}
],
"location": {
"lat": 37.7749,
"lon": -122.4194,
"accuracy_m": 50
}
}
POST /execute_tool¶
Execute a tool on the edge node.
Request:
curl -X POST http://192.168.1.100:8000/execute_tool \
-H "Content-Type: application/json" \
-d '{
"tool_name": "unit_convert",
"tool_args": {
"value": 10,
"from_unit": "miles",
"to_unit": "km"
},
"trace_id": "trace_abc123",
"capability_token": null,
"job_id": "job_xyz789"
}'
Request Fields:
| Field | Type | Required | Description |
|---|---|---|---|
tool_name |
string | Yes | Tool to execute |
tool_args |
object | Yes | Tool-specific arguments |
trace_id |
string | No | Trace ID for tracking |
capability_token |
object | No | Capability token (required in production) |
job_id |
string | No | Job ID for idempotency |
Response:
{
"ok": true,
"result": {
"value": 16.0934,
"from_unit": "miles",
"to_unit": "km"
},
"error": null,
"trace_id": "trace_abc123",
"tool_name": "unit_convert",
"tool_version": "1.0.0",
"node_id": "node_abc123",
"verified": true,
"signature": "base64_ed25519_signature",
"public_key": "base64_ed25519_pubkey",
"execution_time_ms": 5
}
Response Fields:
| Field | Type | Description |
|---|---|---|
ok |
boolean | Whether execution succeeded |
result |
object | Tool execution result (null on error) |
error |
string | Error message (null on success) |
trace_id |
string | Trace ID from request |
tool_name |
string | Tool name executed |
tool_version |
string | Tool version |
node_id |
string | Node that executed the tool |
verified |
boolean | Whether response is signed |
signature |
string | Ed25519 signature (base64) |
public_key |
string | Ed25519 public key (base64) |
execution_time_ms |
integer | Execution latency in milliseconds |
Timeout and Retry Behavior¶
Timeouts:
| Operation | Default Timeout | Configurable |
|---|---|---|
| Tool Execution | 10 seconds | Yes (via environment) |
| Capabilities Fetch | 5 seconds | Yes |
| Discovery Scan | 6 seconds | Yes (via parameter) |
Retry Logic:
# Gateway implements exponential backoff for remote tool calls
max_retries = 3
base_delay = 1.0 # seconds
for attempt in range(max_retries):
try:
response = requests.post(
f"{http_url}/execute_tool",
json=request,
timeout=10
)
return response.json()
except requests.Timeout:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
else:
raise
Failover:
If primary target fails, gateway can failover to backup nodes:
from gateway.router import ToolRouter
router = ToolRouter(unified_client, mesh_cache, trust_store)
result = router.route_tool_call(
tool_name="pii_redact",
tool_args={"text": "Email: admin@example.com"},
failover_enabled=True
)
if result["degraded"]:
print(f"Primary error: {result['primary_error']}")
print(f"Final target: {result['final_target']}")
E2EE Task Envelopes¶
Task envelopes provide end-to-end encryption for agent-to-agent task delegation.
Cryptographic Design¶
Key Agreement: X25519 Elliptic Curve Diffie-Hellman (ECDH)
Encryption: ChaCha20-Poly1305 AEAD (Authenticated Encryption with Associated Data)
Authentication: Ed25519 Digital Signatures
Replay Protection: Message ID deduplication
TTL Enforcement: Timestamp-based expiry
Message Types¶
from enum import Enum
class MessageType(str, Enum):
TASK_REQUEST = "TASK_REQUEST" # Request task execution
TASK_RESULT = "TASK_RESULT" # Return task result
Envelope Structure¶
{
"message_id": "unique_uuid",
"type": "TASK_REQUEST",
"sender_node_id": "node_sender",
"recipient_node_id": "node_recipient",
"created_at": "2026-01-24T12:00:00+00:00",
"ttl_s": 86400,
"enc": {
"nonce": "base64_nonce",
"ciphertext": "base64_encrypted_payload"
},
"signature": "base64_ed25519_signature"
}
Fields:
| Field | Type | Description |
|---|---|---|
message_id |
string | Unique UUID for replay protection |
type |
string | TASK_REQUEST or TASK_RESULT |
sender_node_id |
string | Sender's node ID |
recipient_node_id |
string | Recipient's node ID |
created_at |
string | ISO 8601 timestamp (UTC) |
ttl_s |
integer | Time-to-live in seconds |
enc |
object | Encrypted payload |
enc.nonce |
string | ChaCha20 nonce (base64) |
enc.ciphertext |
string | Encrypted payload (base64) |
signature |
string | Ed25519 signature over canonical JSON |
Creating an Envelope¶
from gateway.mesh_transport import TaskEnvelope, MessageType, MeshCrypto
from edge_node.identity import NodeIdentity
# Initialize
identity = NodeIdentity("gateway_identity.pem")
mesh_crypto = MeshCrypto("keys/")
envelope_handler = TaskEnvelope(identity, mesh_crypto)
# Create task request
payload = {
"tool_name": "pii_redact",
"tool_args": {
"text": "Sensitive data: admin@example.com"
},
"trace_id": "trace_abc123"
}
envelope = envelope_handler.create(
message_type=MessageType.TASK_REQUEST,
payload_dict=payload,
recipient_node_id="node_edge_1",
recipient_x25519_pubkey="base64_x25519_pubkey",
ttl_seconds=86400 # 24 hours
)
print(envelope)
Verifying and Decrypting an Envelope¶
# Receive envelope from network
received_envelope = {
"message_id": "...",
"type": "TASK_REQUEST",
# ... full envelope
}
# Verify and decrypt
metadata, payload = envelope_handler.verify_and_decrypt(
envelope=received_envelope,
sender_ed25519_pubkey_b64="sender_ed25519_pubkey",
sender_x25519_pubkey_b64="sender_x25519_pubkey"
)
print(f"Message ID: {metadata['message_id']}")
print(f"Sender: {metadata['sender_node_id']}")
print(f"Type: {metadata['type']}")
print(f"Payload: {payload}")
Verification Failures:
try:
metadata, payload = envelope_handler.verify_and_decrypt(
envelope=envelope,
sender_ed25519_pubkey_b64=pubkey,
sender_x25519_pubkey_b64=x25519_pubkey
)
except ValueError as e:
if "Signature verification failed" in str(e):
print("Invalid signature - possible tampering")
elif "expired" in str(e):
print("Message TTL expired")
elif "decryption" in str(e):
print("Decryption failed - wrong keys or corrupted")
Packet Format Example¶
Encrypted TASK_REQUEST:
{
"message_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"type": "TASK_REQUEST",
"sender_node_id": "node_gateway_abc",
"recipient_node_id": "node_edge_xyz",
"created_at": "2026-01-24T12:00:00+00:00",
"ttl_s": 86400,
"enc": {
"nonce": "AAAAAAAAAAAAAAAAAAAAAA==",
"ciphertext": "ZW5jcnlwdGVkX3BheWxvYWRfaGVyZQ=="
},
"signature": "c2lnbmF0dXJlX2Jhc2U2NF9lbmNvZGVk"
}
Decrypted Payload:
{
"tool_name": "pii_redact",
"tool_args": {
"text": "Contact admin@example.com for support"
},
"trace_id": "trace_abc123"
}
Store-and-Forward Mailbox¶
The mailbox provides offline-tolerant messaging with retry logic.
Mailbox Database Schema¶
SQLite Database: data/mailbox.db
Table: messages
CREATE TABLE messages (
message_id TEXT PRIMARY KEY,
recipient_node_id TEXT NOT NULL,
envelope TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
expires_at TIMESTAMP NOT NULL,
delivered BOOLEAN DEFAULT 0,
delivered_at TIMESTAMP,
retry_count INTEGER DEFAULT 0,
last_retry TIMESTAMP
);
CREATE INDEX idx_recipient ON messages(recipient_node_id);
CREATE INDEX idx_delivered ON messages(delivered);
CREATE INDEX idx_expires ON messages(expires_at);
Mailbox Operations¶
Queue Message¶
from gateway.mesh_transport import Mailbox
mailbox = Mailbox("data/mailbox.db")
# Queue encrypted envelope for delivery
mailbox.queue(
message_id=envelope["message_id"],
recipient_node_id=envelope["recipient_node_id"],
envelope=envelope
)
print("Message queued for delivery")
Mark Delivered¶
# After successful delivery
mailbox.mark_delivered(message_id="a1b2c3d4-...")
print("Message marked as delivered")
Get Pending Messages¶
# Get all undelivered messages for a recipient
pending = mailbox.get_pending(recipient_node_id="node_edge_xyz")
for msg in pending:
print(f"Message: {msg['message_id']}")
print(f" Created: {msg['created_at']}")
print(f" Retries: {msg['retry_count']}")
print(f" Last retry: {msg['last_retry']}")
Update Retry¶
Cleanup Expired¶
# Remove expired messages
expired_count = mailbox.cleanup_expired()
print(f"Removed {expired_count} expired messages")
Delivery Worker¶
The delivery worker runs in the background and attempts to deliver pending messages.
Retry Strategy: Exponential backoff with max retries
Retry Schedule:
| Attempt | Delay | Backoff Factor |
|---|---|---|
| 1 | 0s | - |
| 2 | 10s | 2x |
| 3 | 20s | 2x |
| 4 | 40s | 2x |
| 5 | 80s | 2x |
| 6-10 | 160s | 2x (capped) |
Max Retries: 10
Worker Interval: 10 seconds (configurable via DWO_DELIVERY_INTERVAL)
Starting the Delivery Worker¶
from gateway.mesh_transport import DeliveryWorker, Mailbox
mailbox = Mailbox("data/mailbox.db")
worker = DeliveryWorker(mailbox, interval_seconds=10)
# Start background worker
worker.start()
# Worker runs indefinitely, retrying failed deliveries
# Stop worker (graceful shutdown)
worker.stop()
Delivery Flow¶
Mailbox DeliveryWorker Recipient Node
| | |
|-- queue(envelope) ---------->| |
| | |
| |-- POST /mesh/receive ------->|
| | |
| |<-- 200 OK -------------------|
|<-- mark_delivered() ---------| |
| | |
| (if failed) |
| | |
|<-- update_retry() -----------| |
| | |
| (wait backoff) |
| | |
| |-- POST /mesh/receive ------->|
HTTP Endpoint for Receiving Messages¶
Edge nodes expose /mesh/receive to accept task envelopes:
curl -X POST http://192.168.1.100:8000/mesh/receive \
-H "Content-Type: application/json" \
-d '{
"message_id": "a1b2c3d4-...",
"type": "TASK_REQUEST",
"sender_node_id": "node_gateway",
"recipient_node_id": "node_edge_1",
"created_at": "2026-01-24T12:00:00+00:00",
"ttl_s": 86400,
"enc": {
"nonce": "...",
"ciphertext": "..."
},
"signature": "..."
}'
Response (Success):
Response (Error):
Security Considerations¶
Cryptographic Guarantees¶
- Confidentiality: X25519 ECDH + ChaCha20-Poly1305 ensures only recipient can decrypt
- Authenticity: Ed25519 signatures prove sender identity
- Integrity: AEAD guarantees message hasn't been tampered
- Replay Protection: Message ID deduplication prevents replay attacks
- Forward Secrecy: Ephemeral keys (if implemented) provide forward secrecy
Trust Model¶
- Trust on First Use (TOFU): Initial trust established via QR code pairing
- Trust Store: Maintains list of trusted/blocked nodes
- Signature Verification: All messages verified before processing
- TTL Enforcement: Old messages automatically rejected
Attack Resistance¶
| Attack | Mitigation |
|---|---|
| Man-in-the-Middle | Ed25519 signatures + E2EE |
| Replay Attacks | Message ID deduplication |
| Eavesdropping | ChaCha20-Poly1305 encryption |
| Tampering | AEAD authentication tag |
| Denial of Service | Rate limiting (future), TTL expiry |
| Sybil Attacks | Trust store + signature verification |
Performance Metrics¶
Discovery Performance¶
| Metric | Value | Notes |
|---|---|---|
| Scan Duration | 6 seconds | Configurable |
| Max Nodes Discovered | 100+ | Limited by network bandwidth |
| Packet Size | ~500 bytes | ADVERTISE message |
| Network Bandwidth | ~1 KB/s | During discovery |
E2EE Performance¶
| Operation | Latency | Notes |
|---|---|---|
| Envelope Creation | 1-3ms | Includes encryption + signing |
| Envelope Verification | 2-5ms | Includes signature verification + decryption |
| Key Generation (X25519) | 5-10ms | One-time per session |
| Key Generation (Ed25519) | 5-10ms | One-time per node |
Mailbox Performance¶
| Operation | Latency | Notes |
|---|---|---|
| Queue Message | 1-2ms | SQLite INSERT |
| Mark Delivered | 1-2ms | SQLite UPDATE |
| Get Pending | 5-10ms | SQLite SELECT |
| Cleanup Expired | 10-50ms | Depends on message count |
Troubleshooting¶
Discovery Issues¶
Problem: No nodes discovered
Solutions:
# Check UDP port is open
sudo lsof -i UDP:37020
# Check firewall
sudo ufw status
# Test manual broadcast
python -c "
import socket, json, uuid
from datetime import datetime
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
msg = {'type': 'DISCOVER', 'nonce': str(uuid.uuid4())[:16], 'reply_to': 37020, 'timestamp': datetime.utcnow().isoformat() + 'Z'}
sock.sendto(json.dumps(msg).encode(), ('255.255.255.255', 37020))
print('Sent DISCOVER')
"
Problem: Signature verification failing
Solution: Check public key format and trust store
from trust.store import TrustStore
trust = TrustStore()
nodes = trust.list_all()
for node in nodes["trusted"]:
print(f"{node['node_id']}: {node['public_key_fingerprint']}")
E2EE Issues¶
Problem: Decryption failing
Solution: Verify keys match
# Check X25519 public key
from gateway.mesh_transport import MeshCrypto
crypto = MeshCrypto("keys/")
pubkey = crypto.get_public_key()
print(f"My X25519 public key: {pubkey}")
# Verify recipient has correct sender key
Problem: "Message expired" error
Solution: Check system time sync
Mailbox Issues¶
Problem: Messages not being delivered
Solution: Check delivery worker status
from gateway.mesh_transport import DeliveryWorker, Mailbox
mailbox = Mailbox("data/mailbox.db")
# Check pending messages
pending = mailbox.get_pending()
print(f"Pending messages: {len(pending)}")
for msg in pending:
print(f" {msg['message_id']}: retries={msg['retry_count']}")
# Check if worker is running
# (should see periodic delivery attempts in logs)
Complete Example: Sending Encrypted Task¶
from gateway.mesh_transport import (
TaskEnvelope, MessageType, MeshCrypto, Mailbox, DeliveryWorker
)
from edge_node.identity import NodeIdentity
from trust.store import TrustStore
# Initialize
gateway_identity = NodeIdentity("gateway_identity.pem")
mesh_crypto = MeshCrypto("keys/")
mailbox = Mailbox("data/mailbox.db")
trust_store = TrustStore()
# Get recipient info from trust store
recipient_node_id = "node_edge_1"
recipient_info = trust_store.get_node(recipient_node_id)
if not recipient_info:
raise ValueError(f"Node {recipient_node_id} not in trust store")
recipient_x25519_pubkey = recipient_info["x25519_pubkey"]
recipient_http_url = recipient_info["http_url"]
# Create envelope handler
envelope_handler = TaskEnvelope(gateway_identity, mesh_crypto)
# Create task request
payload = {
"tool_name": "pii_redact",
"tool_args": {
"text": "Contact admin@company.com for support"
},
"trace_id": "trace_abc123"
}
# Create signed + encrypted envelope
envelope = envelope_handler.create(
message_type=MessageType.TASK_REQUEST,
payload_dict=payload,
recipient_node_id=recipient_node_id,
recipient_x25519_pubkey=recipient_x25519_pubkey,
ttl_seconds=86400
)
print(f"Created envelope: {envelope['message_id']}")
# Queue for delivery
mailbox.queue(
message_id=envelope["message_id"],
recipient_node_id=recipient_node_id,
envelope=envelope
)
print("Envelope queued for delivery")
# Start delivery worker
worker = DeliveryWorker(mailbox, interval_seconds=10)
worker.start()
print("Delivery worker started - will retry until delivered")
Changelog¶
- v1.0.0 (2026-01-24): Initial mesh transport implementation
- UDP multicast discovery with Ed25519 signatures
- HTTP communication for tool execution
- E2EE task envelopes with X25519 + ChaCha20-Poly1305
- Store-and-forward mailbox with exponential backoff
- Replay protection and TTL enforcement