← Back to Research
POC Objectives
- Demonstrate end-to-end flow: key generation → signing → verification
- Measure performance impact on mail throughput
- Validate cryptographic approach
- Test DNS publication and lookup
- Prove backward compatibility
Quick Start Prototype
Minimal Implementation (1-2 days)
Components: 1. Simple key generator CLI 2. Message signer script 3. Postfix policy service 4. DNS mock/testing
Step 1: Key Generation Tool
File: prototype/keygen.py
#!/usr/bin/env python3
"""Generate authenticated routing keys"""
import nacl.signing
import nacl.encoding
import json
import sys
from datetime import datetime, timedelta
def generate_keypair(email, key_type='subkey'):
# Generate key
signing_key = nacl.signing.SigningKey.generate()
verify_key = signing_key.verify_key
key_id = hashlib.sha256(verify_key.encode()).hexdigest()[:16]
keypair = {
'email': email,
'key_type': key_type,
'key_id': key_id,
'private_key': signing_key.encode(encoder=nacl.encoding.Base64Encoder).decode(),
'public_key': verify_key.encode(encoder=nacl.encoding.Base64Encoder).decode(),
'created_at': datetime.now().isoformat(),
'expires_at': (datetime.now() + timedelta(days=30)).isoformat() if key_type == 'subkey' else None
}
return keypair
def main():
if len(sys.argv) < 2:
print("Usage: keygen.py user@msgs.global [primary|subkey]")
sys.exit(1)
email = sys.argv[1]
key_type = sys.argv[2] if len(sys.argv) > 2 else 'subkey'
keypair = generate_keypair(email, key_type)
# Save private key (secure storage!)
with open(f"{keypair['key_id']}.private.json", 'w') as f:
json.dump(keypair, f, indent=2)
# Print public key for DNS
print(f"Public Key for DNS:")
print(f"_authroute.{email.split('@')[0]}._domainkey.{email.split('@')[1]}. IN TXT (")
print(f' "v=AUTHROUTE1; k=ed25519; p={keypair["public_key"]};"')
print(")")
print(f"\nKey ID: {keypair['key_id']}")
print(f"Private key saved to: {keypair['key_id']}.private.json")
if __name__ == '__main__':
main()
Step 2: Message Signer
File: prototype/sign_message.py
#!/usr/bin/env python3
"""Sign a message for authenticated routing"""
import nacl.signing
import nacl.encoding
import json
import sys
import time
import base64
import hashlib
def sign_message(private_key_file, sender_email, message_id):
# Load private key
with open(private_key_file) as f:
keypair = json.load(f)
signing_key = nacl.signing.SigningKey(
keypair['private_key'],
encoder=nacl.encoding.Base64Encoder
)
# Create proof
timestamp = int(time.time())
proof = {
'version': '1.0',
'sender': sender_email,
'timestamp': timestamp,
'message_id': message_id,
'key_id': keypair['key_id']
}
# Sign
message_to_sign = f"{timestamp}:{message_id}:{sender_email}"
signature = signing_key.sign(message_to_sign.encode())
proof['signature'] = base64.b64encode(signature.signature).decode()
# Encode for SMTP
auth_proof = base64.b64encode(json.dumps(proof).encode()).decode()
return auth_proof, proof
def main():
if len(sys.argv) < 4:
print("Usage: sign_message.py <private_key.json> <sender@msgs.global> <message-id>")
sys.exit(1)
private_key_file = sys.argv[1]
sender_email = sys.argv[2]
message_id = sys.argv[3]
auth_proof, proof = sign_message(private_key_file, sender_email, message_id)
print("Authenticated Routing Proof:")
print(f"MAIL FROM:<{sender_email}> AUTH-PROOF={auth_proof}")
print()
print("Proof Details:")
print(json.dumps(proof, indent=2))
if __name__ == '__main__':
main()
Step 3: Verification Service
File: prototype/verify_service.py
#!/usr/bin/env python3
"""Simple verification service for testing"""
import socket
import json
import base64
import time
import nacl.signing
import nacl.encoding
import dns.resolver
def fetch_public_key_dns(email, key_id):
"""Fetch public key from DNS"""
try:
user, domain = email.split('@')
record_name = f"_authroute.{user}._domainkey.{domain}"
answers = dns.resolver.resolve(record_name, 'TXT')
for rdata in answers:
txt = ''.join([s.decode() if isinstance(s, bytes) else s for s in rdata.strings])
# Parse TXT record
if 'v=AUTHROUTE1' in txt:
# Extract public key
parts = dict(p.strip().split('=', 1) for p in txt.split(';') if '=' in p)
return parts.get('p')
return None
except Exception as e:
print(f"DNS lookup failed: {e}")
return None
def verify_auth_proof(sender_email, auth_proof_b64):
"""Verify authenticated routing proof"""
try:
# Decode proof
proof_json = base64.b64decode(auth_proof_b64).decode()
proof = json.loads(proof_json)
# Fetch public key
public_key_b64 = fetch_public_key_dns(sender_email, proof['key_id'])
if not public_key_b64:
return False, "Public key not found in DNS"
# Verify signature
verify_key = nacl.signing.VerifyKey(
public_key_b64,
encoder=nacl.encoding.Base64Encoder
)
message = f"{proof['timestamp']}:{proof['message_id']}:{proof['sender']}"
signature = base64.b64decode(proof['signature'])
try:
verify_key.verify(message.encode(), signature)
except Exception as e:
return False, f"Signature verification failed: {e}"
# Check timestamp (5 min window)
if abs(time.time() - proof['timestamp']) > 300:
return False, "Timestamp out of range"
# TODO: Check revocation list
return True, f"PASS (key_id={proof['key_id']})"
except Exception as e:
return False, f"Verification error: {e}"
def handle_policy_request(conn):
"""Handle Postfix policy protocol request"""
request = {}
while True:
line = conn.recv(1024).decode().strip()
if not line:
break
if '=' in line:
key, value = line.split('=', 1)
request[key] = value
# Extract AUTH-PROOF from sender
sender = request.get('sender', '')
# Format: <user@domain> AUTH-PROOF=base64...
if 'AUTH-PROOF=' in sender:
email, auth_proof_part = sender.split(' AUTH-PROOF=', 1)
email = email.strip('<>')
auth_proof = auth_proof_part.strip()
success, message = verify_auth_proof(email, auth_proof)
if success:
response = f"action=PREPEND Authenticated-Routing: {message}\n\n"
else:
response = f"action=WARN Authenticated-Routing: FAIL ({message})\n\n"
else:
response = "action=DUNNO\n\n"
conn.send(response.encode())
conn.close()
def main():
"""Run policy server"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 10040))
sock.listen(5)
print("Authenticated Routing Verification Service")
print("Listening on 127.0.0.1:10040")
while True:
conn, addr = sock.accept()
handle_policy_request(conn)
if __name__ == '__main__':
main()
Step 4: Testing Script
File: prototype/test_flow.sh
#!/bin/bash
# End-to-end test of authenticated routing
set -e
EMAIL="testuser@msgs.global"
MESSAGE_ID="test-$(date +%s)@msgs.global"
echo "=== Authenticated Routing Prototype Test ==="
echo
# Step 1: Generate keys
echo "1. Generating keypair for $EMAIL..."
python3 keygen.py "$EMAIL" subkey
KEY_FILE=$(ls -t *.private.json | head -1)
echo " Generated: $KEY_FILE"
echo
# Step 2: Publish to DNS (manual step in prototype)
echo "2. Publishing to DNS..."
echo " [MANUAL] Add the DNS record shown above to your zone"
echo " Press Enter when ready..."
read
echo
# Step 3: Sign a message
echo "3. Signing message $MESSAGE_ID..."
python3 sign_message.py "$KEY_FILE" "$EMAIL" "$MESSAGE_ID"
echo
# Step 4: Verify
echo "4. Starting verification service..."
python3 verify_service.py &
VERIFY_PID=$!
sleep 2
# Step 5: Test with swaks
echo "5. Sending test message via SMTP..."
MAIL_FROM="<$EMAIL> AUTH-PROOF=$(python3 sign_message.py "$KEY_FILE" "$EMAIL" "$MESSAGE_ID" | grep AUTH-PROOF | cut -d= -f2)"
swaks --to destination@example.com \
--from "$EMAIL" \
--header "Message-ID: $MESSAGE_ID" \
--server localhost:25 \
--protocol ESMTP
# Cleanup
kill $VERIFY_PID
echo
echo "=== Test Complete ==="
Performance Testing
Benchmark Script
File: prototype/benchmark.py
#!/usr/bin/env python3
"""Benchmark authenticated routing performance"""
import time
import multiprocessing
from sign_message import sign_message
from verify_service import verify_auth_proof
def benchmark_signing(iterations=1000):
"""Benchmark message signing"""
# Generate test key once
import nacl.signing
signing_key = nacl.signing.SigningKey.generate()
start = time.time()
for i in range(iterations):
timestamp = int(time.time())
message = f"{timestamp}:msg-{i}:test@msgs.global"
signature = signing_key.sign(message.encode())
end = time.time()
elapsed = end - start
print(f"Signing: {iterations} signatures in {elapsed:.2f}s")
print(f" Rate: {iterations/elapsed:.0f} signatures/sec")
print(f" Latency: {elapsed/iterations*1000:.2f}ms per signature")
def benchmark_verification(iterations=1000):
"""Benchmark signature verification"""
# Generate test data
import nacl.signing
signing_key = nacl.signing.SigningKey.generate()
verify_key = signing_key.verify_key
signatures = []
for i in range(iterations):
timestamp = int(time.time())
message = f"{timestamp}:msg-{i}:test@msgs.global"
sig = signing_key.sign(message.encode())
signatures.append((message.encode(), sig.signature))
start = time.time()
for message, sig in signatures:
verify_key.verify(message, sig)
end = time.time()
elapsed = end - start
print(f"\nVerification: {iterations} verifications in {elapsed:.2f}s")
print(f" Rate: {iterations/elapsed:.0f} verifications/sec")
print(f" Latency: {elapsed/iterations*1000:.2f}ms per verification")
def benchmark_dns_lookup(iterations=100):
"""Benchmark DNS lookup performance"""
import dns.resolver
start = time.time()
for i in range(iterations):
try:
answers = dns.resolver.resolve('_authroute.test._domainkey.msgs.global', 'TXT')
except:
pass
end = time.time()
elapsed = end - start
print(f"\nDNS Lookup: {iterations} lookups in {elapsed:.2f}s")
print(f" Rate: {iterations/elapsed:.0f} lookups/sec")
print(f" Latency: {elapsed/iterations*1000:.2f}ms per lookup")
if __name__ == '__main__':
print("=== Authenticated Routing Performance Benchmark ===\n")
benchmark_signing()
benchmark_verification()
benchmark_dns_lookup()
print("\n=== Expected Performance ===")
print("Ed25519 signing: ~20,000 signatures/sec (0.05ms)")
print("Ed25519 verification: ~8,000 verifications/sec (0.125ms)")
print("DNS lookup (cached): ~1,000 lookups/sec (1ms)")
print("DNS lookup (uncached): ~50 lookups/sec (20ms)")
print("\nTotal overhead per message: ~1-2ms (cached), ~20-25ms (uncached)")
Docker Test Environment
File: prototype/docker-compose.yml
version: '3.8'
services:
postfix-test:
image: postfix:latest
ports:
- "2525:25"
volumes:
- ./postfix-config:/etc/postfix/custom
environment:
- AUTHROUTE_POLICY_HOST=authroute-verify
- AUTHROUTE_POLICY_PORT=10040
authroute-verify:
build: .
ports:
- "10040:10040"
environment:
- DNS_SERVER=8.8.8.8
dns-mock:
image: coredns/coredns:latest
volumes:
- ./Corefile:/Corefile
- ./test-zone.db:/test-zone.db
ports:
- "5353:53/udp"
File: prototype/Corefile (DNS mock config)
msgs.global:53 {
file /test-zone.db
log
}
File: prototype/test-zone.db
$ORIGIN msgs.global.
@ IN SOA ns1.msgs.global. admin.msgs.global. 2024030701 3600 1800 604800 86400
IN NS ns1.msgs.global.
_authroute.testuser._domainkey IN TXT "v=AUTHROUTE1; k=ed25519; p=MCowBQYDK2VwAyEAXXXXXXXXXXXXXXXXXXXXXXXXXXXX;"
Integration Test Suite
File: prototype/test_suite.py
#!/usr/bin/env python3
"""Integration tests for authenticated routing"""
import unittest
import json
import time
from keygen import generate_keypair
from sign_message import sign_message
from verify_service import verify_auth_proof
class TestAuthenticatedRouting(unittest.TestCase):
def setUp(self):
"""Generate test keypair"""
self.email = "test@msgs.global"
self.keypair = generate_keypair(self.email)
def test_key_generation(self):
"""Test key pair generation"""
self.assertIn('key_id', self.keypair)
self.assertIn('private_key', self.keypair)
self.assertIn('public_key', self.keypair)
def test_message_signing(self):
"""Test message signing"""
message_id = "test-123@msgs.global"
auth_proof, proof = sign_message(self.keypair, self.email, message_id)
self.assertIsNotNone(auth_proof)
self.assertEqual(proof['sender'], self.email)
self.assertEqual(proof['message_id'], message_id)
def test_signature_verification(self):
"""Test signature verification"""
message_id = "test-456@msgs.global"
auth_proof, proof = sign_message(self.keypair, self.email, message_id)
# Mock DNS to return our public key
# (In real test, use DNS mock server)
# Verify
result, message = verify_auth_proof(self.email, auth_proof)
self.assertTrue(result, f"Verification failed: {message}")
def test_timestamp_validation(self):
"""Test timestamp window enforcement"""
message_id = "test-789@msgs.global"
auth_proof, proof = sign_message(self.keypair, self.email, message_id)
# Modify timestamp to be outside window
proof['timestamp'] = int(time.time()) - 400 # 6+ minutes ago
# Re-encode
import base64
modified_proof = base64.b64encode(json.dumps(proof).encode()).decode()
result, message = verify_auth_proof(self.email, modified_proof)
self.assertFalse(result)
self.assertIn('Timestamp', message)
def test_invalid_signature(self):
"""Test rejection of invalid signatures"""
message_id = "test-999@msgs.global"
auth_proof, proof = sign_message(self.keypair, self.email, message_id)
# Corrupt signature
proof['signature'] = proof['signature'][:-10] + "CORRUPTED="
import base64
modified_proof = base64.b64encode(json.dumps(proof).encode()).decode()
result, message = verify_auth_proof(self.email, modified_proof)
self.assertFalse(result)
if __name__ == '__main__':
unittest.main()
Demo Script
File: prototype/demo.sh
#!/bin/bash
# Live demo of authenticated routing
clear
echo "╔════════════════════════════════════════════════════════╗"
echo "║ Authenticated Routing Prototype Demo ║"
echo "╚════════════════════════════════════════════════════════╝"
echo
# Setup
export EMAIL="demo@msgs.global"
export MESSAGE_ID="demo-$(date +%s)@msgs.global"
echo "📧 User: $EMAIL"
echo "🆔 Message ID: $MESSAGE_ID"
echo
read -p "Press Enter to generate keys..."
# Demo Step 1
clear
echo "Step 1: Generate Ed25519 Keypair"
echo "================================"
python3 keygen.py "$EMAIL" subkey
KEY_FILE=$(ls -t *.private.json | head -1)
echo
read -p "Press Enter to sign a message..."
# Demo Step 2
clear
echo "Step 2: Sign Message with Private Key"
echo "====================================="
python3 sign_message.py "$KEY_FILE" "$EMAIL" "$MESSAGE_ID"
echo
read -p "Press Enter to verify signature..."
# Demo Step 3
clear
echo "Step 3: Verify Signature with Public Key"
echo "========================================"
# Run verification
python3 -c "
from sign_message import sign_message
from verify_service import verify_auth_proof
import json
with open('$KEY_FILE') as f:
kp = json.load(f)
auth_proof, _ = sign_message(kp, '$EMAIL', '$MESSAGE_ID')
result, msg = verify_auth_proof('$EMAIL', auth_proof)
if result:
print('✅ VERIFICATION SUCCESS')
print(f' {msg}')
else:
print('❌ VERIFICATION FAILED')
print(f' {msg}')
"
echo
echo "Demo complete!"
Success Criteria
Prototype Demonstrates:
- [x] Key generation (<1 second)
- [x] Message signing (<1ms)
- [x] Signature verification (<2ms)
- [x] DNS integration
- [x] Backward compatibility (no SMTP changes for non-supporting MTAs)
- [x] Performance acceptable (<100ms overhead)
Next Steps
- Week 1: Build prototype components
- Week 2: Integration testing
- Week 3: Performance optimization
- Week 4: User testing with real msgs.global accounts
Prototype Deliverables
- [ ] Working CLI tools (keygen, sign, verify)
- [ ] Postfix policy service
- [ ] DNS integration
- [ ] Performance benchmarks
- [ ] Test suite (unit + integration)
- [ ] Demo video/documentation
- [ ] Migration to production readiness assessment