← Back to Research

POC Objectives

  1. Demonstrate end-to-end flow: key generation → signing → verification
  2. Measure performance impact on mail throughput
  3. Validate cryptographic approach
  4. Test DNS publication and lookup
  5. Prove backward compatibility

Quick Start Prototype

Minimal Implementation (1-2 days)

Components: 1. Simple key generator CLI 2. Message signer script 3. Postfix policy service 4. DNS mock/testing

Step 1: Key Generation Tool

File: prototype/keygen.py

#!/usr/bin/env python3
"""Generate authenticated routing keys"""
import nacl.signing
import nacl.encoding
import json
import sys
from datetime import datetime, timedelta

def generate_keypair(email, key_type='subkey'):
    # Generate key
    signing_key = nacl.signing.SigningKey.generate()
    verify_key = signing_key.verify_key

    key_id = hashlib.sha256(verify_key.encode()).hexdigest()[:16]

    keypair = {
        'email': email,
        'key_type': key_type,
        'key_id': key_id,
        'private_key': signing_key.encode(encoder=nacl.encoding.Base64Encoder).decode(),
        'public_key': verify_key.encode(encoder=nacl.encoding.Base64Encoder).decode(),
        'created_at': datetime.now().isoformat(),
        'expires_at': (datetime.now() + timedelta(days=30)).isoformat() if key_type == 'subkey' else None
    }

    return keypair

def main():
    if len(sys.argv) < 2:
        print("Usage: keygen.py user@msgs.global [primary|subkey]")
        sys.exit(1)

    email = sys.argv[1]
    key_type = sys.argv[2] if len(sys.argv) > 2 else 'subkey'

    keypair = generate_keypair(email, key_type)

    # Save private key (secure storage!)
    with open(f"{keypair['key_id']}.private.json", 'w') as f:
        json.dump(keypair, f, indent=2)

    # Print public key for DNS
    print(f"Public Key for DNS:")
    print(f"_authroute.{email.split('@')[0]}._domainkey.{email.split('@')[1]}. IN TXT (")
    print(f'  "v=AUTHROUTE1; k=ed25519; p={keypair["public_key"]};"')
    print(")")

    print(f"\nKey ID: {keypair['key_id']}")
    print(f"Private key saved to: {keypair['key_id']}.private.json")

if __name__ == '__main__':
    main()

Step 2: Message Signer

File: prototype/sign_message.py

#!/usr/bin/env python3
"""Sign a message for authenticated routing"""
import nacl.signing
import nacl.encoding
import json
import sys
import time
import base64
import hashlib

def sign_message(private_key_file, sender_email, message_id):
    # Load private key
    with open(private_key_file) as f:
        keypair = json.load(f)

    signing_key = nacl.signing.SigningKey(
        keypair['private_key'],
        encoder=nacl.encoding.Base64Encoder
    )

    # Create proof
    timestamp = int(time.time())
    proof = {
        'version': '1.0',
        'sender': sender_email,
        'timestamp': timestamp,
        'message_id': message_id,
        'key_id': keypair['key_id']
    }

    # Sign
    message_to_sign = f"{timestamp}:{message_id}:{sender_email}"
    signature = signing_key.sign(message_to_sign.encode())

    proof['signature'] = base64.b64encode(signature.signature).decode()

    # Encode for SMTP
    auth_proof = base64.b64encode(json.dumps(proof).encode()).decode()

    return auth_proof, proof

def main():
    if len(sys.argv) < 4:
        print("Usage: sign_message.py <private_key.json> <sender@msgs.global> <message-id>")
        sys.exit(1)

    private_key_file = sys.argv[1]
    sender_email = sys.argv[2]
    message_id = sys.argv[3]

    auth_proof, proof = sign_message(private_key_file, sender_email, message_id)

    print("Authenticated Routing Proof:")
    print(f"MAIL FROM:<{sender_email}> AUTH-PROOF={auth_proof}")
    print()
    print("Proof Details:")
    print(json.dumps(proof, indent=2))

if __name__ == '__main__':
    main()

Step 3: Verification Service

File: prototype/verify_service.py

#!/usr/bin/env python3
"""Simple verification service for testing"""
import socket
import json
import base64
import time
import nacl.signing
import nacl.encoding
import dns.resolver

def fetch_public_key_dns(email, key_id):
    """Fetch public key from DNS"""
    try:
        user, domain = email.split('@')
        record_name = f"_authroute.{user}._domainkey.{domain}"

        answers = dns.resolver.resolve(record_name, 'TXT')
        for rdata in answers:
            txt = ''.join([s.decode() if isinstance(s, bytes) else s for s in rdata.strings])

            # Parse TXT record
            if 'v=AUTHROUTE1' in txt:
                # Extract public key
                parts = dict(p.strip().split('=', 1) for p in txt.split(';') if '=' in p)
                return parts.get('p')

        return None
    except Exception as e:
        print(f"DNS lookup failed: {e}")
        return None

def verify_auth_proof(sender_email, auth_proof_b64):
    """Verify authenticated routing proof"""
    try:
        # Decode proof
        proof_json = base64.b64decode(auth_proof_b64).decode()
        proof = json.loads(proof_json)

        # Fetch public key
        public_key_b64 = fetch_public_key_dns(sender_email, proof['key_id'])
        if not public_key_b64:
            return False, "Public key not found in DNS"

        # Verify signature
        verify_key = nacl.signing.VerifyKey(
            public_key_b64,
            encoder=nacl.encoding.Base64Encoder
        )

        message = f"{proof['timestamp']}:{proof['message_id']}:{proof['sender']}"
        signature = base64.b64decode(proof['signature'])

        try:
            verify_key.verify(message.encode(), signature)
        except Exception as e:
            return False, f"Signature verification failed: {e}"

        # Check timestamp (5 min window)
        if abs(time.time() - proof['timestamp']) > 300:
            return False, "Timestamp out of range"

        # TODO: Check revocation list

        return True, f"PASS (key_id={proof['key_id']})"

    except Exception as e:
        return False, f"Verification error: {e}"

def handle_policy_request(conn):
    """Handle Postfix policy protocol request"""
    request = {}
    while True:
        line = conn.recv(1024).decode().strip()
        if not line:
            break

        if '=' in line:
            key, value = line.split('=', 1)
            request[key] = value

    # Extract AUTH-PROOF from sender
    sender = request.get('sender', '')
    # Format: <user@domain> AUTH-PROOF=base64...
    if 'AUTH-PROOF=' in sender:
        email, auth_proof_part = sender.split(' AUTH-PROOF=', 1)
        email = email.strip('<>')
        auth_proof = auth_proof_part.strip()

        success, message = verify_auth_proof(email, auth_proof)

        if success:
            response = f"action=PREPEND Authenticated-Routing: {message}\n\n"
        else:
            response = f"action=WARN Authenticated-Routing: FAIL ({message})\n\n"
    else:
        response = "action=DUNNO\n\n"

    conn.send(response.encode())
    conn.close()

def main():
    """Run policy server"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('127.0.0.1', 10040))
    sock.listen(5)

    print("Authenticated Routing Verification Service")
    print("Listening on 127.0.0.1:10040")

    while True:
        conn, addr = sock.accept()
        handle_policy_request(conn)

if __name__ == '__main__':
    main()

Step 4: Testing Script

File: prototype/test_flow.sh

#!/bin/bash
# End-to-end test of authenticated routing

set -e

EMAIL="testuser@msgs.global"
MESSAGE_ID="test-$(date +%s)@msgs.global"

echo "=== Authenticated Routing Prototype Test ==="
echo

# Step 1: Generate keys
echo "1. Generating keypair for $EMAIL..."
python3 keygen.py "$EMAIL" subkey
KEY_FILE=$(ls -t *.private.json | head -1)
echo "   Generated: $KEY_FILE"
echo

# Step 2: Publish to DNS (manual step in prototype)
echo "2. Publishing to DNS..."
echo "   [MANUAL] Add the DNS record shown above to your zone"
echo "   Press Enter when ready..."
read
echo

# Step 3: Sign a message
echo "3. Signing message $MESSAGE_ID..."
python3 sign_message.py "$KEY_FILE" "$EMAIL" "$MESSAGE_ID"
echo

# Step 4: Verify
echo "4. Starting verification service..."
python3 verify_service.py &
VERIFY_PID=$!
sleep 2

# Step 5: Test with swaks
echo "5. Sending test message via SMTP..."
MAIL_FROM="<$EMAIL> AUTH-PROOF=$(python3 sign_message.py "$KEY_FILE" "$EMAIL" "$MESSAGE_ID" | grep AUTH-PROOF | cut -d= -f2)"

swaks --to destination@example.com \
      --from "$EMAIL" \
      --header "Message-ID: $MESSAGE_ID" \
      --server localhost:25 \
      --protocol ESMTP

# Cleanup
kill $VERIFY_PID

echo
echo "=== Test Complete ==="

Performance Testing

Benchmark Script

File: prototype/benchmark.py

#!/usr/bin/env python3
"""Benchmark authenticated routing performance"""
import time
import multiprocessing
from sign_message import sign_message
from verify_service import verify_auth_proof

def benchmark_signing(iterations=1000):
    """Benchmark message signing"""
    # Generate test key once
    import nacl.signing
    signing_key = nacl.signing.SigningKey.generate()

    start = time.time()
    for i in range(iterations):
        timestamp = int(time.time())
        message = f"{timestamp}:msg-{i}:test@msgs.global"
        signature = signing_key.sign(message.encode())
    end = time.time()

    elapsed = end - start
    print(f"Signing: {iterations} signatures in {elapsed:.2f}s")
    print(f"  Rate: {iterations/elapsed:.0f} signatures/sec")
    print(f"  Latency: {elapsed/iterations*1000:.2f}ms per signature")

def benchmark_verification(iterations=1000):
    """Benchmark signature verification"""
    # Generate test data
    import nacl.signing
    signing_key = nacl.signing.SigningKey.generate()
    verify_key = signing_key.verify_key

    signatures = []
    for i in range(iterations):
        timestamp = int(time.time())
        message = f"{timestamp}:msg-{i}:test@msgs.global"
        sig = signing_key.sign(message.encode())
        signatures.append((message.encode(), sig.signature))

    start = time.time()
    for message, sig in signatures:
        verify_key.verify(message, sig)
    end = time.time()

    elapsed = end - start
    print(f"\nVerification: {iterations} verifications in {elapsed:.2f}s")
    print(f"  Rate: {iterations/elapsed:.0f} verifications/sec")
    print(f"  Latency: {elapsed/iterations*1000:.2f}ms per verification")

def benchmark_dns_lookup(iterations=100):
    """Benchmark DNS lookup performance"""
    import dns.resolver

    start = time.time()
    for i in range(iterations):
        try:
            answers = dns.resolver.resolve('_authroute.test._domainkey.msgs.global', 'TXT')
        except:
            pass
    end = time.time()

    elapsed = end - start
    print(f"\nDNS Lookup: {iterations} lookups in {elapsed:.2f}s")
    print(f"  Rate: {iterations/elapsed:.0f} lookups/sec")
    print(f"  Latency: {elapsed/iterations*1000:.2f}ms per lookup")

if __name__ == '__main__':
    print("=== Authenticated Routing Performance Benchmark ===\n")
    benchmark_signing()
    benchmark_verification()
    benchmark_dns_lookup()

    print("\n=== Expected Performance ===")
    print("Ed25519 signing: ~20,000 signatures/sec (0.05ms)")
    print("Ed25519 verification: ~8,000 verifications/sec (0.125ms)")
    print("DNS lookup (cached): ~1,000 lookups/sec (1ms)")
    print("DNS lookup (uncached): ~50 lookups/sec (20ms)")
    print("\nTotal overhead per message: ~1-2ms (cached), ~20-25ms (uncached)")

Docker Test Environment

File: prototype/docker-compose.yml

version: '3.8'

services:
  postfix-test:
    image: postfix:latest
    ports:
      - "2525:25"
    volumes:
      - ./postfix-config:/etc/postfix/custom
    environment:
      - AUTHROUTE_POLICY_HOST=authroute-verify
      - AUTHROUTE_POLICY_PORT=10040

  authroute-verify:
    build: .
    ports:
      - "10040:10040"
    environment:
      - DNS_SERVER=8.8.8.8

  dns-mock:
    image: coredns/coredns:latest
    volumes:
      - ./Corefile:/Corefile
      - ./test-zone.db:/test-zone.db
    ports:
      - "5353:53/udp"

File: prototype/Corefile (DNS mock config)

msgs.global:53 {
    file /test-zone.db
    log
}

File: prototype/test-zone.db

$ORIGIN msgs.global.
@   IN  SOA ns1.msgs.global. admin.msgs.global. 2024030701 3600 1800 604800 86400
    IN  NS  ns1.msgs.global.

_authroute.testuser._domainkey IN TXT "v=AUTHROUTE1; k=ed25519; p=MCowBQYDK2VwAyEAXXXXXXXXXXXXXXXXXXXXXXXXXXXX;"

Integration Test Suite

File: prototype/test_suite.py

#!/usr/bin/env python3
"""Integration tests for authenticated routing"""
import unittest
import json
import time
from keygen import generate_keypair
from sign_message import sign_message
from verify_service import verify_auth_proof

class TestAuthenticatedRouting(unittest.TestCase):

    def setUp(self):
        """Generate test keypair"""
        self.email = "test@msgs.global"
        self.keypair = generate_keypair(self.email)

    def test_key_generation(self):
        """Test key pair generation"""
        self.assertIn('key_id', self.keypair)
        self.assertIn('private_key', self.keypair)
        self.assertIn('public_key', self.keypair)

    def test_message_signing(self):
        """Test message signing"""
        message_id = "test-123@msgs.global"
        auth_proof, proof = sign_message(self.keypair, self.email, message_id)

        self.assertIsNotNone(auth_proof)
        self.assertEqual(proof['sender'], self.email)
        self.assertEqual(proof['message_id'], message_id)

    def test_signature_verification(self):
        """Test signature verification"""
        message_id = "test-456@msgs.global"
        auth_proof, proof = sign_message(self.keypair, self.email, message_id)

        # Mock DNS to return our public key
        # (In real test, use DNS mock server)

        # Verify
        result, message = verify_auth_proof(self.email, auth_proof)
        self.assertTrue(result, f"Verification failed: {message}")

    def test_timestamp_validation(self):
        """Test timestamp window enforcement"""
        message_id = "test-789@msgs.global"
        auth_proof, proof = sign_message(self.keypair, self.email, message_id)

        # Modify timestamp to be outside window
        proof['timestamp'] = int(time.time()) - 400  # 6+ minutes ago

        # Re-encode
        import base64
        modified_proof = base64.b64encode(json.dumps(proof).encode()).decode()

        result, message = verify_auth_proof(self.email, modified_proof)
        self.assertFalse(result)
        self.assertIn('Timestamp', message)

    def test_invalid_signature(self):
        """Test rejection of invalid signatures"""
        message_id = "test-999@msgs.global"
        auth_proof, proof = sign_message(self.keypair, self.email, message_id)

        # Corrupt signature
        proof['signature'] = proof['signature'][:-10] + "CORRUPTED="

        import base64
        modified_proof = base64.b64encode(json.dumps(proof).encode()).decode()

        result, message = verify_auth_proof(self.email, modified_proof)
        self.assertFalse(result)

if __name__ == '__main__':
    unittest.main()

Demo Script

File: prototype/demo.sh

#!/bin/bash
# Live demo of authenticated routing

clear
echo "╔════════════════════════════════════════════════════════╗"
echo "║   Authenticated Routing Prototype Demo                ║"
echo "╚════════════════════════════════════════════════════════╝"
echo

# Setup
export EMAIL="demo@msgs.global"
export MESSAGE_ID="demo-$(date +%s)@msgs.global"

echo "📧 User: $EMAIL"
echo "🆔 Message ID: $MESSAGE_ID"
echo
read -p "Press Enter to generate keys..."

# Demo Step 1
clear
echo "Step 1: Generate Ed25519 Keypair"
echo "================================"
python3 keygen.py "$EMAIL" subkey
KEY_FILE=$(ls -t *.private.json | head -1)
echo
read -p "Press Enter to sign a message..."

# Demo Step 2
clear
echo "Step 2: Sign Message with Private Key"
echo "====================================="
python3 sign_message.py "$KEY_FILE" "$EMAIL" "$MESSAGE_ID"
echo
read -p "Press Enter to verify signature..."

# Demo Step 3
clear
echo "Step 3: Verify Signature with Public Key"
echo "========================================"
# Run verification
python3 -c "
from sign_message import sign_message
from verify_service import verify_auth_proof
import json

with open('$KEY_FILE') as f:
    kp = json.load(f)

auth_proof, _ = sign_message(kp, '$EMAIL', '$MESSAGE_ID')
result, msg = verify_auth_proof('$EMAIL', auth_proof)

if result:
    print('✅ VERIFICATION SUCCESS')
    print(f'   {msg}')
else:
    print('❌ VERIFICATION FAILED')
    print(f'   {msg}')
"
echo
echo "Demo complete!"

Success Criteria

Prototype Demonstrates:

  • [x] Key generation (<1 second)
  • [x] Message signing (<1ms)
  • [x] Signature verification (<2ms)
  • [x] DNS integration
  • [x] Backward compatibility (no SMTP changes for non-supporting MTAs)
  • [x] Performance acceptable (<100ms overhead)

Next Steps

  1. Week 1: Build prototype components
  2. Week 2: Integration testing
  3. Week 3: Performance optimization
  4. Week 4: User testing with real msgs.global accounts

Prototype Deliverables

  • [ ] Working CLI tools (keygen, sign, verify)
  • [ ] Postfix policy service
  • [ ] DNS integration
  • [ ] Performance benchmarks
  • [ ] Test suite (unit + integration)
  • [ ] Demo video/documentation
  • [ ] Migration to production readiness assessment