WebSocket Security Skill File Organization SKILL.md: Core principles, patterns, essential security (this file) references/security-examples.md: CSWSH examples and authentication patterns references/advanced-patterns.md: Connection management, scaling patterns references/threat-model.md: Attack scenarios including CSWSH Validation Gates
Gate 0.2: PASSED (5+ vulnerabilities documented) - CVE-2024-23898, CVE-2024-26135, CVE-2023-0957
- Overview
Risk Level: HIGH
Justification: WebSocket connections bypass Same-Origin Policy protections, making them vulnerable to Cross-Site WebSocket Hijacking (CSWSH). Persistent connections require careful authentication, session management, and input validation.
You are an expert in WebSocket security, understanding the unique vulnerabilities of persistent bidirectional connections.
Core Expertise Areas CSWSH (Cross-Site WebSocket Hijacking) prevention Origin header validation and token-based authentication Message validation and per-message authorization Rate limiting and connection lifecycle security 2. Core Responsibilities Fundamental Principles TDD First: Write tests before implementation - test security boundaries, connection lifecycle Performance Aware: Optimize for low latency (<50ms), connection pooling, backpressure Validate Origin: Always check Origin header against explicit allowlist Authenticate First: Verify identity before accepting messages Authorize Each Action: Don't assume connection equals unlimited access Validate All Messages: Treat WebSocket messages as untrusted input Limit Resources: Rate limit messages, timeout idle connections Security Decision Framework Situation Approach New connection Validate Origin, require authentication token Each message Validate format, check authorization for action Sensitive operations Re-verify session, log action Idle connection Timeout after inactivity period Error condition Close connection, log details 3. Technical Foundation Version Recommendations Component Version Notes FastAPI/Starlette 0.115+ WebSocket support websockets 12.0+ Python WebSocket library Security Configuration WEBSOCKET_CONFIG = { "max_message_size": 1024 * 1024, # 1MB "max_connections_per_ip": 10, "idle_timeout_seconds": 300, "messages_per_minute": 60, }
NEVER use "*" for origins
ALLOWED_ORIGINS = ["https://app.example.com", "https://admin.example.com"]
- Implementation Workflow (TDD) Step 1: Write Failing Test First import pytest from httpx import AsyncClient, ASGITransport from fastapi.testclient import TestClient
Test security boundaries first
@pytest.mark.asyncio async def test_origin_validation_rejects_invalid(): """CSWSH prevention - must reject invalid origins.""" async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as client: # This should fail until origin validation is implemented with pytest.raises(Exception): async with client.websocket_connect( "/ws?token=valid", headers={"Origin": "https://evil.com"} ): pass
@pytest.mark.asyncio async def test_authentication_required(): """Must reject connections without valid token.""" with TestClient(app) as client: with pytest.raises(Exception): with client.websocket_connect("/ws") as ws: pass
@pytest.mark.asyncio async def test_message_authorization(): """Each message action must be authorized.""" with TestClient(app) as client: with client.websocket_connect( "/ws?token=readonly_user", headers={"Origin": "https://app.example.com"} ) as ws: ws.send_json({"action": "delete", "id": "123"}) response = ws.receive_json() assert response.get("error") == "Permission denied"
Step 2: Implement Minimum to Pass
Implement only what's needed to pass the test
async def validate_origin(websocket: WebSocket) -> bool: origin = websocket.headers.get("origin") if not origin or origin not in ALLOWED_ORIGINS: await websocket.close(code=4003, reason="Invalid origin") return False return True
Step 3: Refactor and Verify
Run all WebSocket tests
pytest tests/websocket/ -v --asyncio-mode=auto
Check for security issues
bandit -r src/websocket/
Verify no regressions
pytest tests/ -v
- Performance Patterns Pattern 1: Connection Pooling
BAD - Create new connection for each request
ws = await create_connection(user_id) # Expensive!
GOOD - Reuse connections from pool
class ConnectionPool: def init(self, max_size: int = 100): self.connections: dict[str, WebSocket] = {}
async def get_or_create(self, user_id: str) -> WebSocket:
if user_id not in self.connections:
self.connections[user_id] = await create_connection(user_id)
return self.connections[user_id]
Pattern 2: Message Batching
BAD - Send messages one at a time
for item in items: await websocket.send_json({"type": "item", "data": item})
GOOD - Batch messages to reduce overhead
await websocket.send_json({"type": "batch", "data": items[:50]})
Pattern 3: Binary Protocols
BAD - JSON for high-frequency data (~80 bytes)
await websocket.send_json({"x": 123.456, "y": 789.012, "z": 456.789})
GOOD - Binary format (20 bytes)
import struct await websocket.send_bytes(struct.pack('!3f', 123.456, 789.012, 456.789))
Pattern 4: Heartbeat Optimization
BAD - Fixed frequent heartbeats
HEARTBEAT_INTERVAL = 5 # Every 5 seconds
GOOD - Adaptive heartbeats based on activity
interval = 60 if (time() - last_activity) < 60 else 30
Pattern 5: Backpressure Handling
BAD - Blocks on slow clients
await ws.send_json(message)
GOOD - Timeout and bounded queue
from collections import deque queue = deque(maxlen=100) # Drop oldest when full try: await asyncio.wait_for(ws.send_json(message), timeout=1.0) except asyncio.TimeoutError: pass # Client too slow
- Implementation Patterns Pattern 1: Origin Validation (Critical for CSWSH Prevention) from fastapi import WebSocket
async def validate_origin(websocket: WebSocket) -> bool: """Validate WebSocket origin against allowlist.""" origin = websocket.headers.get("origin") if not origin or origin not in ALLOWED_ORIGINS: await websocket.close(code=4003, reason="Invalid origin") return False return True
@app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): if not await validate_origin(websocket): return await websocket.accept()
Pattern 2: Token-Based Authentication from jose import jwt, JWTError
async def authenticate_websocket(websocket: WebSocket) -> User | None: """Authenticate via token (not cookies - vulnerable to CSWSH).""" token = websocket.query_params.get("token") if not token: await websocket.close(code=4001, reason="Authentication required") return None try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) user = await user_service.get(payload.get("sub")) if not user: await websocket.close(code=4001, reason="User not found") return None return user except JWTError: await websocket.close(code=4001, reason="Invalid token") return None
Pattern 3: Per-Message Authorization from pydantic import BaseModel, field_validator
class WebSocketMessage(BaseModel): action: str data: dict
@field_validator('action')
@classmethod
def validate_action(cls, v):
if v not in {'subscribe', 'unsubscribe', 'send', 'query'}:
raise ValueError(f'Invalid action: {v}')
return v
async def handle_message(websocket: WebSocket, user: User, raw_data: dict): try: message = WebSocketMessage(**raw_data) except ValueError: await websocket.send_json({"error": "Invalid message format"}) return
if not user.has_permission(f"ws:{message.action}"):
await websocket.send_json({"error": "Permission denied"})
return
result = await handlers[message.action](user, message.data)
await websocket.send_json(result)
Pattern 4: Connection Manager with Rate Limiting from collections import defaultdict from time import time
class SecureConnectionManager: def init(self): self.connections: dict[str, WebSocket] = {} self.message_counts: dict[str, list[float]] = defaultdict(list) self.connections_per_ip: dict[str, int] = defaultdict(int)
async def connect(self, websocket: WebSocket, user_id: str, ip: str) -> bool:
if self.connections_per_ip[ip] >= WEBSOCKET_CONFIG["max_connections_per_ip"]:
await websocket.close(code=4029, reason="Too many connections")
return False
await websocket.accept()
self.connections[user_id] = websocket
self.connections_per_ip[ip] += 1
return True
def check_rate_limit(self, user_id: str) -> bool:
now = time()
self.message_counts[user_id] = [
ts for ts in self.message_counts[user_id] if ts > now - 60
]
if len(self.message_counts[user_id]) >= WEBSOCKET_CONFIG["messages_per_minute"]:
return False
self.message_counts[user_id].append(now)
return True
Pattern 5: Complete Secure Handler @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): if not await validate_origin(websocket): return user = await authenticate_websocket(websocket) if not user: return
ip = websocket.client.host
if not await manager.connect(websocket, user.id, ip):
return
try:
while True:
raw = await asyncio.wait_for(
websocket.receive_json(),
timeout=WEBSOCKET_CONFIG["idle_timeout_seconds"]
)
if not manager.check_rate_limit(user.id):
await websocket.send_json({"error": "Rate limited"})
continue
await handle_message(websocket, user, raw)
except (WebSocketDisconnect, asyncio.TimeoutError):
pass
finally:
manager.disconnect(user.id, ip)
-
Security Standards Domain Vulnerability Landscape CVE ID Severity Description Mitigation CVE-2024-23898 HIGH Jenkins CSWSH - command execution Validate Origin CVE-2024-26135 HIGH MeshCentral CSWSH - config leak Origin + SameSite CVE-2023-0957 CRITICAL Gitpod CSWSH - account takeover Origin + token auth OWASP Top 10 Mapping Category Mitigations A01 Access Control Origin validation, per-message authz A02 Crypto Failures TLS/WSS only, signed tokens A03 Injection Validate all message content A07 Auth Failures Token auth, session validation CSWSH Prevention Summary async def secure_websocket_handler(websocket: WebSocket): # 1. VALIDATE ORIGIN (Critical) if websocket.headers.get("origin") not in ALLOWED_ORIGINS: await websocket.close(code=4003) return # 2. AUTHENTICATE with token (not cookies) user = await validate_token(websocket.query_params.get("token")) if not user: await websocket.close(code=4001) return # 3. Accept only after validation await websocket.accept() # 4. AUTHORIZE each message, 5. RATE LIMIT, 6. TIMEOUT idle
-
Common Mistakes & Anti-Patterns No Origin Validation
NEVER - vulnerable to CSWSH
@app.websocket("/ws") async def vulnerable(websocket: WebSocket): await websocket.accept() # Accepts any origin!
ALWAYS - validate origin first
if websocket.headers.get("origin") not in ALLOWED_ORIGINS: await websocket.close(code=4003) return
Cookie-Only Authentication
NEVER - cookies sent automatically in CSWSH attacks
session = websocket.cookies.get("session")
ALWAYS - require explicit token parameter
token = websocket.query_params.get("token")
No Per-Message Authorization
NEVER - assumes connection = full access
if data["action"] == "delete": await delete_resource(data["id"])
ALWAYS - check permission for each action
if not user.has_permission("delete"): return {"error": "Permission denied"}
No Input Validation
NEVER - trust WebSocket messages
await db.execute(f"SELECT * FROM {data['table']}") # SQL injection!
ALWAYS - validate with Pydantic
message = WebSocketMessage(**data)
- Pre-Implementation Checklist Phase 1: Before Writing Code Write failing tests for security boundaries (CSWSH, auth, authz) Write failing tests for connection lifecycle (connect, disconnect, timeout) Write failing tests for message validation and rate limiting Review threat model in references/threat-model.md Identify performance requirements (latency, throughput, connections) Phase 2: During Implementation Origin validation against explicit allowlist Token-based authentication (not cookie-only) Per-message authorization checks Rate limiting and idle timeout implemented All messages validated with Pydantic Connection pooling for efficiency Backpressure handling for slow clients Phase 3: Before Committing All security tests pass: pytest tests/websocket/ -v No security issues: bandit -r src/websocket/ WSS (TLS) enforced in production config CSWSH test coverage verified Performance benchmarks meet targets (<50ms latency) No regressions: pytest tests/ -v
- Summary
Security Goals:
CSWSH-Resistant: Origin validation, token auth Properly Authorized: Per-message permission checks Rate Limited: Prevent message flooding Validated: All messages treated as untrusted
Critical Reminders: ALWAYS validate Origin, use token auth (not cookies), authorize EACH message, use WSS in production.