Real-Time Systems
Build real-time communication systems with WebSocket, SSE, and pub/sub patterns. This skill covers connection management, scaling, and production deployment.
Purpose
Implement live features that users expect:
-
Real-time messaging and chat
-
Live notifications and updates
-
Collaborative editing
-
Presence detection
-
Live dashboards and metrics
-
Gaming and interactive experiences
Features
- WebSocket Server with Socket.io
import { Server } from 'socket.io'; import { createAdapter } from '@socket.io/redis-adapter'; import { createClient } from 'redis';
// Initialize Socket.io with Redis adapter for scaling async function createSocketServer(httpServer: http.Server) { const io = new Server(httpServer, { cors: { origin: process.env.CLIENT_URL, credentials: true, }, pingTimeout: 60000, pingInterval: 25000, });
// Redis adapter for multi-server deployment const pubClient = createClient({ url: process.env.REDIS_URL }); const subClient = pubClient.duplicate(); await Promise.all([pubClient.connect(), subClient.connect()]); io.adapter(createAdapter(pubClient, subClient));
// Authentication middleware io.use(async (socket, next) => { const token = socket.handshake.auth.token;
if (!token) {
return next(new Error('Authentication required'));
}
try {
const user = await verifyToken(token);
socket.data.user = user;
next();
} catch (error) {
next(new Error('Invalid token'));
}
});
// Connection handling
io.on('connection', (socket) => {
const userId = socket.data.user.id;
console.log(User connected: ${userId});
// Join user's personal room
socket.join(`user:${userId}`);
// Handle joining rooms
socket.on('join:room', async (roomId: string) => {
// Verify access
const hasAccess = await checkRoomAccess(userId, roomId);
if (!hasAccess) {
socket.emit('error', { message: 'Access denied' });
return;
}
socket.join(`room:${roomId}`);
socket.to(`room:${roomId}`).emit('user:joined', {
userId,
username: socket.data.user.name,
});
});
// Handle messages
socket.on('message:send', async (data: { roomId: string; content: string }) => {
const message = await saveMessage({
roomId: data.roomId,
userId,
content: data.content,
});
io.to(`room:${data.roomId}`).emit('message:new', message);
});
// Typing indicators
socket.on('typing:start', (roomId: string) => {
socket.to(`room:${roomId}`).emit('typing:user', {
userId,
username: socket.data.user.name,
typing: true,
});
});
socket.on('typing:stop', (roomId: string) => {
socket.to(`room:${roomId}`).emit('typing:user', {
userId,
typing: false,
});
});
// Presence
socket.on('presence:update', async (status: 'online' | 'away' | 'busy') => {
await updatePresence(userId, status);
io.emit('presence:changed', { userId, status });
});
// Disconnect handling
socket.on('disconnect', async (reason) => {
console.log(`User disconnected: ${userId}, reason: ${reason}`);
await updatePresence(userId, 'offline');
io.emit('presence:changed', { userId, status: 'offline' });
});
});
return io; }
- Server-Sent Events (SSE)
import { Router } from 'express';
const router = Router();
// SSE endpoint for notifications router.get('/events/notifications', authenticate, (req, res) => { const userId = req.user.id;
// Set SSE headers res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering
// Send initial connection event
res.write(event: connected\ndata: ${JSON.stringify({ userId })}\n\n);
// Keep-alive ping
const pingInterval = setInterval(() => {
res.write(: ping\n\n);
}, 30000);
// Subscribe to user's notifications
const subscription = pubsub.subscribe(notifications:${userId}, (message) => {
res.write(event: notification\ndata: ${JSON.stringify(message)}\n\n);
});
// Cleanup on disconnect
req.on('close', () => {
clearInterval(pingInterval);
subscription.unsubscribe();
console.log(SSE connection closed for user ${userId});
});
});
// SSE for live updates (e.g., stock prices, metrics) router.get('/events/stream/:channel', authenticate, async (req, res) => { const { channel } = req.params;
res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive');
// Send initial data
const initialData = await getChannelData(channel);
res.write(event: initial\ndata: ${JSON.stringify(initialData)}\n\n);
// Stream updates
const unsubscribe = subscribeToChannel(channel, (update) => {
res.write(event: update\ndata: ${JSON.stringify(update)}\n\n);
});
// Handle retry on reconnection
res.write(retry: 3000\n\n);
req.on('close', () => { unsubscribe(); }); });
// Client-side SSE handling const EventSourceComponent = () => { useEffect(() => { const eventSource = new EventSource('/api/events/notifications', { withCredentials: true, });
eventSource.onopen = () => {
console.log('SSE connected');
};
eventSource.addEventListener('notification', (event) => {
const notification = JSON.parse(event.data);
showNotification(notification);
});
eventSource.onerror = (error) => {
console.error('SSE error:', error);
// EventSource auto-reconnects
};
return () => {
eventSource.close();
};
}, []);
return null; };
- Pub/Sub with Redis
import { createClient } from 'redis';
class PubSubService { private publisher: ReturnType<typeof createClient>; private subscriber: ReturnType<typeof createClient>; private handlers: Map<string, Set<(message: any) => void>> = new Map();
async connect() { this.publisher = createClient({ url: process.env.REDIS_URL }); this.subscriber = this.publisher.duplicate();
await Promise.all([
this.publisher.connect(),
this.subscriber.connect(),
]);
// Handle incoming messages
this.subscriber.on('message', (channel, message) => {
const handlers = this.handlers.get(channel);
if (handlers) {
const parsed = JSON.parse(message);
handlers.forEach(handler => handler(parsed));
}
});
}
async publish(channel: string, message: any): Promise<void> { await this.publisher.publish(channel, JSON.stringify(message)); }
subscribe(channel: string, handler: (message: any) => void): () => void { if (!this.handlers.has(channel)) { this.handlers.set(channel, new Set()); this.subscriber.subscribe(channel); }
this.handlers.get(channel)!.add(handler);
// Return unsubscribe function
return () => {
const handlers = this.handlers.get(channel);
if (handlers) {
handlers.delete(handler);
if (handlers.size === 0) {
this.handlers.delete(channel);
this.subscriber.unsubscribe(channel);
}
}
};
}
// Pattern subscription async psubscribe(pattern: string, handler: (channel: string, message: any) => void): Promise<() => void> { await this.subscriber.pSubscribe(pattern, (message, channel) => { handler(channel, JSON.parse(message)); });
return () => {
this.subscriber.pUnsubscribe(pattern);
};
} }
const pubsub = new PubSubService();
// Usage in services class NotificationService { async sendNotification(userId: string, notification: Notification): Promise<void> { // Save to database await db.notification.create({ data: { ...notification, userId } });
// Publish to real-time channel
await pubsub.publish(`notifications:${userId}`, notification);
}
async broadcastToRoom(roomId: string, event: string, data: any): Promise<void> {
await pubsub.publish(room:${roomId}, { event, data });
}
}
- Presence System
interface PresenceData { status: 'online' | 'away' | 'busy' | 'offline'; lastSeen: Date; socketIds: string[]; }
class PresenceService { private redis: ReturnType<typeof createClient>; private readonly PRESENCE_TTL = 300; // 5 minutes
async setPresence(userId: string, socketId: string, status: string): Promise<void> {
const key = presence:${userId};
// Use MULTI for atomic operations
await this.redis.multi()
.hSet(key, {
status,
lastSeen: Date.now().toString(),
})
.sAdd(`${key}:sockets`, socketId)
.expire(key, this.PRESENCE_TTL)
.exec();
// Publish presence change
await pubsub.publish('presence:updates', {
userId,
status,
lastSeen: new Date(),
});
}
async removeSocket(userId: string, socketId: string): Promise<void> {
const key = presence:${userId};
await this.redis.sRem(`${key}:sockets`, socketId);
const remaining = await this.redis.sCard(`${key}:sockets`);
if (remaining === 0) {
await this.redis.hSet(key, 'status', 'offline');
await pubsub.publish('presence:updates', {
userId,
status: 'offline',
lastSeen: new Date(),
});
}
}
async getPresence(userId: string): Promise<PresenceData | null> {
const key = presence:${userId};
const data = await this.redis.hGetAll(key);
if (!data.status) return null;
return {
status: data.status as PresenceData['status'],
lastSeen: new Date(parseInt(data.lastSeen)),
socketIds: await this.redis.sMembers(`${key}:sockets`),
};
}
async getMultiplePresence(userIds: string[]): Promise<Map<string, PresenceData>> { const pipeline = this.redis.multi();
userIds.forEach(id => {
pipeline.hGetAll(`presence:${id}`);
});
const results = await pipeline.exec();
const presenceMap = new Map<string, PresenceData>();
userIds.forEach((id, index) => {
const data = results[index] as Record<string, string>;
if (data?.status) {
presenceMap.set(id, {
status: data.status as PresenceData['status'],
lastSeen: new Date(parseInt(data.lastSeen)),
socketIds: [],
});
}
});
return presenceMap;
} }
- Connection Recovery
// Client-side reconnection logic class ReconnectingWebSocket { private ws: WebSocket | null = null; private reconnectAttempts = 0; private maxReconnectAttempts = 10; private reconnectInterval = 1000; private messageQueue: any[] = [];
constructor( private url: string, private options: { onMessage: (data: any) => void; onConnect: () => void; onDisconnect: () => void; } ) { this.connect(); }
private connect(): void { this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.reconnectAttempts = 0;
this.options.onConnect();
// Flush queued messages
while (this.messageQueue.length > 0) {
const msg = this.messageQueue.shift();
this.send(msg);
}
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.options.onMessage(data);
};
this.ws.onclose = (event) => {
console.log('WebSocket closed:', event.code, event.reason);
this.options.onDisconnect();
this.scheduleReconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
}
private scheduleReconnect(): void { if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.error('Max reconnection attempts reached'); return; }
const delay = Math.min(
this.reconnectInterval * Math.pow(2, this.reconnectAttempts),
30000 // Max 30 seconds
);
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, delay);
}
send(data: any): void { if (this.ws?.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(data)); } else { // Queue message for when connection is restored this.messageQueue.push(data); } }
close(): void { this.maxReconnectAttempts = 0; // Prevent reconnection this.ws?.close(); } }
// Server-side missed message recovery class MessageRecovery { async getMessagesSince(roomId: string, lastMessageId: string): Promise<Message[]> { // Fetch messages after the last seen message return db.message.findMany({ where: { roomId, id: { gt: lastMessageId }, }, orderBy: { createdAt: 'asc' }, take: 100, // Limit recovery batch }); }
async recoverClientState(userId: string, lastSyncTimestamp: number): Promise<{ messages: Message[]; notifications: Notification[]; presenceUpdates: PresenceUpdate[]; }> { const since = new Date(lastSyncTimestamp);
return {
messages: await this.getUnreadMessages(userId, since),
notifications: await this.getUnreadNotifications(userId, since),
presenceUpdates: await this.getPresenceChanges(since),
};
} }
- Scaling WebSockets
// Horizontal scaling with sticky sessions // nginx.conf upstream websocket_servers { ip_hash; // Sticky sessions server ws1.example.com:3000; server ws2.example.com:3000; server ws3.example.com:3000; }
server { location /socket.io/ { proxy_pass http://websocket_servers; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_read_timeout 86400; } }
// Broadcasting across servers
class ScaledBroadcaster {
async broadcastToRoom(roomId: string, event: string, data: any): Promise<void> {
// Publish to Redis - all servers will receive
await pubsub.publish(broadcast:room:${roomId}, {
event,
data,
timestamp: Date.now(),
});
}
// Each server subscribes and emits locally setupBroadcastListener(io: Server): void { pubsub.psubscribe('broadcast:*', (channel, message) => { const [, type, id] = channel.split(':');
if (type === 'room') {
io.to(`room:${id}`).emit(message.event, message.data);
} else if (type === 'user') {
io.to(`user:${id}`).emit(message.event, message.data);
}
});
} }
Use Cases
- Chat Application
// Real-time chat with typing indicators and read receipts
socket.on('chat:message', async (data) => {
const message = await createMessage(data);
io.to(room:${data.roomId}).emit('chat:message', message);
});
socket.on('chat:read', async ({ roomId, messageId }) => {
await markAsRead(socket.data.user.id, roomId, messageId);
socket.to(room:${roomId}).emit('chat:read', {
userId: socket.data.user.id,
messageId,
});
});
- Live Dashboard
// Real-time metrics with SSE setInterval(async () => { const metrics = await gatherMetrics(); await pubsub.publish('dashboard:metrics', metrics); }, 5000);
Best Practices
Do's
-
Implement heartbeat/ping - Detect dead connections
-
Handle reconnection gracefully - Queue messages, recover state
-
Use rooms for scaling - Don't broadcast to all
-
Implement backpressure - Handle slow clients
-
Plan for offline scenarios - Message queuing
-
Monitor connection metrics - Track active connections
Don'ts
-
Don't trust client data without validation
-
Don't skip authentication
-
Don't broadcast sensitive data
-
Don't ignore connection limits
-
Don't forget cleanup on disconnect
-
Don't use WebSocket for everything
Related Skills
-
redis - Pub/sub and state management
-
backend-development - Server architecture
-
api-architecture - REST fallbacks
Reference Resources
-
Socket.io Documentation
-
WebSocket MDN
-
SSE MDN