websocket-implementation

安装量: 204
排名: #4243

安装

npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill websocket-implementation

WebSocket Implementation Overview

Build scalable WebSocket systems for real-time communication with proper connection management, message routing, error handling, and horizontal scaling support.

When to Use Building real-time chat and messaging Implementing live notifications Creating collaborative editing tools Broadcasting live data updates Building real-time dashboards Streaming events to clients Live multiplayer games Instructions 1. Node.js WebSocket Server (Socket.IO) const express = require('express'); const http = require('http'); const socketIo = require('socket.io'); const redis = require('redis');

const app = express(); const server = http.createServer(app); const io = socketIo(server, { cors: { origin: '*' }, transports: ['websocket', 'polling'], reconnection: true, reconnectionDelay: 1000, reconnectionDelayMax: 5000, reconnectionAttempts: 5 });

// Redis adapter for horizontal scaling const redisClient = redis.createClient(); const { createAdapter } = require('@socket.io/redis-adapter');

io.adapter(createAdapter(redisClient, redisClient.duplicate()));

// Connection management const connectedUsers = new Map();

io.on('connection', (socket) => { console.log(User connected: ${socket.id});

// Store user connection socket.on('auth', (userData) => { connectedUsers.set(socket.id, { userId: userData.id, username: userData.username, socketId: socket.id, connectedAt: new Date() });

// Join user-specific room
socket.join(`user:${userData.id}`);
socket.join('authenticated_users');

// Notify others user is online
io.to('authenticated_users').emit('user:online', {
  userId: userData.id,
  username: userData.username,
  timestamp: new Date()
});

console.log(`User authenticated: ${userData.username}`);

});

// Chat messaging socket.on('chat:message', (message) => { const user = connectedUsers.get(socket.id);

if (!user) {
  socket.emit('error', { message: 'Not authenticated' });
  return;
}

const chatMessage = {
  id: `msg_${Date.now()}`,
  senderId: user.userId,
  senderName: user.username,
  text: message.text,
  roomId: message.roomId,
  timestamp: new Date(),
  status: 'delivered'
};

// Save to database
Message.create(chatMessage);

// Broadcast to room
io.to(`room:${message.roomId}`).emit('chat:message', chatMessage);

// Update message status
setTimeout(() => {
  socket.emit('chat:message:ack', { messageId: chatMessage.id, status: 'read' });
}, 100);

});

// Room management socket.on('room:join', (roomId) => { socket.join(room:${roomId});

const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit('room:user:joined', {
  userId: user.userId,
  username: user.username,
  timestamp: new Date()
});

});

socket.on('room:leave', (roomId) => { socket.leave(room:${roomId});

const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit('room:user:left', {
  userId: user.userId,
  timestamp: new Date()
});

});

// Typing indicator socket.on('typing:start', (roomId) => { const user = connectedUsers.get(socket.id); io.to(room:${roomId}).emit('typing:indicator', { userId: user.userId, username: user.username, isTyping: true }); });

socket.on('typing:stop', (roomId) => { const user = connectedUsers.get(socket.id); io.to(room:${roomId}).emit('typing:indicator', { userId: user.userId, isTyping: false }); });

// Handle disconnection socket.on('disconnect', () => { const user = connectedUsers.get(socket.id);

if (user) {
  connectedUsers.delete(socket.id);
  io.to('authenticated_users').emit('user:offline', {
    userId: user.userId,
    timestamp: new Date()
  });

  console.log(`User disconnected: ${user.username}`);
}

});

// Error handling socket.on('error', (error) => { console.error(Socket error: ${error}); socket.emit('error', { message: 'An error occurred' }); }); });

// Server methods const broadcastUserUpdate = (userId, data) => { io.to(user:${userId}).emit('user:update', data); };

const notifyRoom = (roomId, event, data) => { io.to(room:${roomId}).emit(event, data); };

const sendDirectMessage = (userId, event, data) => { io.to(user:${userId}).emit(event, data); };

server.listen(3000, () => { console.log('WebSocket server listening on port 3000'); });

  1. Browser WebSocket Client class WebSocketClient { constructor(url, options = {}) { this.url = url; this.socket = null; this.reconnectAttempts = 0; this.maxReconnectAttempts = options.maxReconnectAttempts || 5; this.reconnectDelay = options.reconnectDelay || 1000; this.listeners = new Map(); this.messageQueue = []; this.isAuthenticated = false;

    this.connect(); }

connect() { this.socket = io(this.url, { reconnection: true, reconnectionDelay: this.reconnectDelay, reconnectionAttempts: this.maxReconnectAttempts });

this.socket.on('connect', () => {
  console.log('Connected to server');
  this.reconnectAttempts = 0;
  this.processMessageQueue();
});

this.socket.on('disconnect', () => {
  console.log('Disconnected from server');
});

this.socket.on('error', (error) => {
  console.error('Socket error:', error);
  this.emit('error', error);
});

this.socket.on('connect_error', (error) => {
  console.error('Connection error:', error);
});

}

authenticate(userData) { this.socket.emit('auth', userData, (response) => { if (response.success) { this.isAuthenticated = true; this.emit('authenticated'); } }); }

on(event, callback) { this.socket.on(event, callback);

if (!this.listeners.has(event)) {
  this.listeners.set(event, []);
}
this.listeners.get(event).push(callback);

}

emit(event, data, callback) { if (!this.socket.connected) { this.messageQueue.push({ event, data, callback }); return; }

this.socket.emit(event, data, callback);

}

processMessageQueue() { while (this.messageQueue.length > 0) { const { event, data, callback } = this.messageQueue.shift(); this.socket.emit(event, data, callback); } }

joinRoom(roomId) { this.emit('room:join', roomId); }

leaveRoom(roomId) { this.emit('room:leave', roomId); }

sendMessage(roomId, text) { this.emit('chat:message', { roomId, text }); }

setTypingIndicator(roomId, isTyping) { if (isTyping) { this.emit('typing:start', roomId); } else { this.emit('typing:stop', roomId); } }

disconnect() { this.socket.disconnect(); } }

// Usage const client = new WebSocketClient('http://localhost:3000');

client.on('chat:message', (message) => { console.log('Received message:', message); displayMessage(message); });

client.on('typing:indicator', (data) => { updateTypingIndicator(data); });

client.on('user:online', (user) => { updateUserStatus(user.userId, 'online'); });

client.authenticate({ id: 'user123', username: 'john' }); client.joinRoom('room1'); client.sendMessage('room1', 'Hello everyone!');

  1. Python WebSocket Server (aiohttp) from aiohttp import web import aiohttp import json from datetime import datetime from typing import Set

class WebSocketServer: def init(self): self.app = web.Application() self.rooms = {} self.users = {} self.setup_routes()

def setup_routes(self):
    self.app.router.add_get('/ws', self.websocket_handler)
    self.app.router.add_post('/api/message', self.send_message_api)

async def websocket_handler(self, request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    user_id = None
    room_id = None

    async for msg in ws.iter_any():
        if isinstance(msg, aiohttp.WSMessage):
            data = json.loads(msg.data)
            event_type = data.get('type')

            try:
                if event_type == 'auth':
                    user_id = data.get('userId')
                    self.users[user_id] = ws
                    await ws.send_json({
                        'type': 'authenticated',
                        'timestamp': datetime.now().isoformat()
                    })

                elif event_type == 'join_room':
                    room_id = data.get('roomId')
                    if room_id not in self.rooms:
                        self.rooms[room_id] = set()
                    self.rooms[room_id].add(user_id)

                    # Notify others
                    await self.broadcast_to_room(room_id, {
                        'type': 'user_joined',
                        'userId': user_id,
                        'timestamp': datetime.now().isoformat()
                    }, exclude=user_id)

                elif event_type == 'message':
                    message = {
                        'id': f'msg_{datetime.now().timestamp()}',
                        'userId': user_id,
                        'text': data.get('text'),
                        'roomId': room_id,
                        'timestamp': datetime.now().isoformat()
                    }

                    # Save to database
                    await self.save_message(message)

                    # Broadcast to room
                    await self.broadcast_to_room(room_id, message)

                elif event_type == 'leave_room':
                    if room_id in self.rooms:
                        self.rooms[room_id].discard(user_id)

            except Exception as error:
                await ws.send_json({
                    'type': 'error',
                    'message': str(error)
                })

    # Cleanup on disconnect
    if user_id:
        del self.users[user_id]
    if room_id and user_id:
        if room_id in self.rooms:
            self.rooms[room_id].discard(user_id)

    return ws

async def broadcast_to_room(self, room_id, message, exclude=None):
    if room_id not in self.rooms:
        return

    for user_id in self.rooms[room_id]:
        if user_id != exclude and user_id in self.users:
            try:
                await self.users[user_id].send_json(message)
            except Exception as error:
                print(f'Error sending message: {error}')

async def save_message(self, message):
    # Save to database
    pass

async def send_message_api(self, request):
    data = await request.json()
    room_id = data.get('roomId')

    await self.broadcast_to_room(room_id, {
        'type': 'message',
        'text': data.get('text'),
        'timestamp': datetime.now().isoformat()
    })

    return web.json_response({'sent': True})

def create_app(): server = WebSocketServer() return server.app

if name == 'main': app = create_app() web.run_app(app, port=3000)

  1. Message Types and Protocols // Authentication { "type": "auth", "userId": "user123", "token": "jwt_token_here" }

// Chat Message { "type": "message", "roomId": "room123", "text": "Hello everyone!", "timestamp": "2025-01-15T10:30:00Z" }

// Typing Indicator { "type": "typing", "roomId": "room123", "isTyping": true }

// Presence { "type": "presence", "status": "online|away|offline" }

// Notification { "type": "notification", "title": "New message", "body": "You have a new message", "data": {} }

  1. Scaling with Redis const redis = require('redis'); const { createAdapter } = require('@socket.io/redis-adapter'); const { createClient } = require('redis');

const pubClient = createClient({ host: 'redis', port: 6379 }); const subClient = pubClient.duplicate();

io.adapter(createAdapter(pubClient, subClient));

// Publish to multiple servers io.emit('user:action', { userId: 123, action: 'login' });

// Subscribe to events from other servers redisClient.subscribe('notifications', (message) => { const notification = JSON.parse(message); io.to(user:${notification.userId}).emit('notification', notification); });

Best Practices ✅ DO Implement proper authentication Handle reconnection gracefully Manage rooms/channels effectively Persist messages appropriately Monitor active connections Implement presence features Use Redis for scaling Add message acknowledgment Implement rate limiting Handle errors properly ❌ DON'T Send unencrypted sensitive data Keep unlimited message history in memory Allow arbitrary room/channel creation Forget to clean up disconnected connections Send large messages frequently Ignore network failures Store passwords in messages Skip authentication/authorization Create unbounded growth of connections Ignore scalability from day one Monitoring // Track active connections io.engine.on('connection_error', (err) => { console.log(err.req); // the request object console.log(err.code); // the error code, e.g. 1 console.log(err.message); // the error message console.log(err.context); // some additional error context });

app.get('/metrics/websocket', (req, res) => { res.json({ activeConnections: io.engine.clientsCount, connectedSockets: io.sockets.sockets.size, rooms: Object.keys(io.sockets.adapter.rooms) }); });

返回排行榜