realtime-systems

Building real-time applications with WebSocket, Server-Sent Events, and event-driven architectures.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "realtime-systems" with this command: npx skills add miles990/claude-software-skills/miles990-claude-software-skills-realtime-systems

Real-time Systems

Overview

Building real-time applications with WebSocket, Server-Sent Events, and event-driven architectures.

WebSocket

Server Implementation (Node.js)

import { WebSocketServer, WebSocket } from 'ws'; import { createServer } from 'http'; import { v4 as uuid } from 'uuid';

const server = createServer(); const wss = new WebSocketServer({ server });

interface Client { id: string; ws: WebSocket; userId?: string; rooms: Set<string>; }

const clients = new Map<string, Client>(); const rooms = new Map<string, Set<string>>();

wss.on('connection', (ws, req) => { const clientId = uuid(); const client: Client = { id: clientId, ws, rooms: new Set(), };

clients.set(clientId, client); console.log(Client connected: ${clientId});

// Handle messages ws.on('message', (data) => { try { const message = JSON.parse(data.toString()); handleMessage(client, message); } catch (error) { console.error('Invalid message:', error); } });

// Handle disconnection ws.on('close', () => { // Leave all rooms client.rooms.forEach(room => leaveRoom(client, room)); clients.delete(clientId); console.log(Client disconnected: ${clientId}); });

// Send connection confirmation send(ws, { type: 'connected', clientId }); });

function handleMessage(client: Client, message: any) { switch (message.type) { case 'authenticate': client.userId = message.userId; break;

case 'join':
  joinRoom(client, message.room);
  break;

case 'leave':
  leaveRoom(client, message.room);
  break;

case 'message':
  broadcastToRoom(message.room, {
    type: 'message',
    from: client.userId,
    content: message.content,
    timestamp: Date.now(),
  }, client.id);
  break;

case 'ping':
  send(client.ws, { type: 'pong' });
  break;

} }

function joinRoom(client: Client, room: string) { if (!rooms.has(room)) { rooms.set(room, new Set()); } rooms.get(room)!.add(client.id); client.rooms.add(room);

// Notify room members broadcastToRoom(room, { type: 'user_joined', userId: client.userId, room, }, client.id); }

function leaveRoom(client: Client, room: string) { rooms.get(room)?.delete(client.id); client.rooms.delete(room);

// Notify room members broadcastToRoom(room, { type: 'user_left', userId: client.userId, room, }); }

function broadcastToRoom(room: string, message: any, excludeClientId?: string) { const roomClients = rooms.get(room); if (!roomClients) return;

roomClients.forEach(clientId => { if (clientId !== excludeClientId) { const client = clients.get(clientId); if (client?.ws.readyState === WebSocket.OPEN) { send(client.ws, message); } } }); }

function send(ws: WebSocket, message: any) { ws.send(JSON.stringify(message)); }

server.listen(8080);

Client Implementation

class WebSocketClient { private ws: WebSocket | null = null; private reconnectAttempts = 0; private maxReconnectAttempts = 5; private reconnectDelay = 1000; private messageHandlers = new Map<string, Set<Function>>(); private messageQueue: any[] = [];

constructor(private url: string) {}

connect(): Promise<void> { return new Promise((resolve, reject) => { this.ws = new WebSocket(this.url);

  this.ws.onopen = () => {
    console.log('WebSocket connected');
    this.reconnectAttempts = 0;
    this.flushMessageQueue();
    resolve();
  };

  this.ws.onmessage = (event) => {
    const message = JSON.parse(event.data);
    this.handleMessage(message);
  };

  this.ws.onclose = (event) => {
    console.log('WebSocket closed:', event.code, event.reason);
    this.attemptReconnect();
  };

  this.ws.onerror = (error) => {
    console.error('WebSocket error:', error);
    reject(error);
  };
});

}

private attemptReconnect() { if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.error('Max reconnection attempts reached'); return; }

this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);

console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);

setTimeout(() => {
  this.connect().catch(() => {});
}, delay);

}

send(message: any) { if (this.ws?.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(message)); } else { // Queue message for when connection is restored this.messageQueue.push(message); } }

private flushMessageQueue() { while (this.messageQueue.length > 0) { const message = this.messageQueue.shift(); this.send(message); } }

private handleMessage(message: any) { const handlers = this.messageHandlers.get(message.type); handlers?.forEach(handler => handler(message));

// Also emit to wildcard handlers
const wildcardHandlers = this.messageHandlers.get('*');
wildcardHandlers?.forEach(handler => handler(message));

}

on(type: string, handler: Function) { if (!this.messageHandlers.has(type)) { this.messageHandlers.set(type, new Set()); } this.messageHandlers.get(type)!.add(handler);

// Return unsubscribe function
return () => {
  this.messageHandlers.get(type)?.delete(handler);
};

}

// Convenience methods joinRoom(room: string) { this.send({ type: 'join', room }); }

leaveRoom(room: string) { this.send({ type: 'leave', room }); }

sendMessage(room: string, content: string) { this.send({ type: 'message', room, content }); }

disconnect() { this.ws?.close(); this.ws = null; } }

// Usage const ws = new WebSocketClient('wss://api.example.com/ws');

ws.on('connected', (msg) => { console.log('Connected with ID:', msg.clientId); ws.joinRoom('general'); });

ws.on('message', (msg) => { console.log([${msg.from}]: ${msg.content}); });

await ws.connect();

Socket.IO

Server

import { Server } from 'socket.io'; import { createServer } from 'http'; import { createAdapter } from '@socket.io/redis-adapter'; import { createClient } from 'redis';

const httpServer = createServer(); const io = new Server(httpServer, { cors: { origin: process.env.ALLOWED_ORIGINS?.split(',') || '*', credentials: true, }, });

// Redis adapter for horizontal scaling const pubClient = createClient({ url: process.env.REDIS_URL }); const subClient = pubClient.duplicate();

Promise.all([pubClient.connect(), subClient.connect()]).then(() => { io.adapter(createAdapter(pubClient, subClient)); });

// Authentication middleware io.use(async (socket, next) => { const token = socket.handshake.auth.token;

try { const user = await verifyToken(token); socket.data.user = user; next(); } catch (err) { next(new Error('Authentication failed')); } });

// Namespace for chat const chatNamespace = io.of('/chat');

chatNamespace.on('connection', (socket) => { const user = socket.data.user; console.log(User connected: ${user.name});

// Join user's personal room socket.join(user:${user.id});

// Join a chat room socket.on('join_room', async (roomId: string) => { // Verify access const hasAccess = await checkRoomAccess(user.id, roomId); if (!hasAccess) { socket.emit('error', { message: 'Access denied' }); return; }

socket.join(roomId);

// Notify room members
socket.to(roomId).emit('user_joined', {
  userId: user.id,
  userName: user.name,
});

// Send recent messages
const messages = await getRecentMessages(roomId, 50);
socket.emit('room_history', { roomId, messages });

});

// Leave room socket.on('leave_room', (roomId: string) => { socket.leave(roomId); socket.to(roomId).emit('user_left', { userId: user.id, userName: user.name, }); });

// Send message socket.on('message', async (data: { roomId: string; content: string }) => { const message = { id: uuid(), roomId: data.roomId, userId: user.id, userName: user.name, content: data.content, timestamp: new Date(), };

// Persist message
await saveMessage(message);

// Broadcast to room
chatNamespace.to(data.roomId).emit('message', message);

});

// Typing indicator socket.on('typing_start', (roomId: string) => { socket.to(roomId).emit('user_typing', { userId: user.id, userName: user.name, }); });

socket.on('typing_stop', (roomId: string) => { socket.to(roomId).emit('user_stopped_typing', { userId: user.id, }); });

// Disconnect socket.on('disconnect', () => { console.log(User disconnected: ${user.name}); }); });

// Send to specific user (from anywhere in the app) function sendToUser(userId: string, event: string, data: any) { chatNamespace.to(user:${userId}).emit(event, data); }

httpServer.listen(3000);

Client (React)

import { io, Socket } from 'socket.io-client'; import { createContext, useContext, useEffect, useState } from 'react';

// Socket context const SocketContext = createContext<Socket | null>(null);

export function SocketProvider({ children }: { children: React.ReactNode }) { const [socket, setSocket] = useState<Socket | null>(null); const { token } = useAuth();

useEffect(() => { if (!token) return;

const newSocket = io(`${API_URL}/chat`, {
  auth: { token },
  transports: ['websocket'],
});

newSocket.on('connect', () => {
  console.log('Socket connected');
});

newSocket.on('connect_error', (error) => {
  console.error('Socket connection error:', error);
});

setSocket(newSocket);

return () => {
  newSocket.close();
};

}, [token]);

return ( <SocketContext.Provider value={socket}> {children} </SocketContext.Provider> ); }

export function useSocket() { return useContext(SocketContext); }

// Chat room hook function useChatRoom(roomId: string) { const socket = useSocket(); const [messages, setMessages] = useState<Message[]>([]); const [typingUsers, setTypingUsers] = useState<Set<string>>(new Set());

useEffect(() => { if (!socket || !roomId) return;

// Join room
socket.emit('join_room', roomId);

// Listen for messages
socket.on('message', (message: Message) => {
  setMessages(prev => [...prev, message]);
});

// Room history
socket.on('room_history', ({ messages }: { messages: Message[] }) => {
  setMessages(messages);
});

// Typing indicators
socket.on('user_typing', ({ userId }: { userId: string }) => {
  setTypingUsers(prev => new Set(prev).add(userId));
});

socket.on('user_stopped_typing', ({ userId }: { userId: string }) => {
  setTypingUsers(prev => {
    const next = new Set(prev);
    next.delete(userId);
    return next;
  });
});

return () => {
  socket.emit('leave_room', roomId);
  socket.off('message');
  socket.off('room_history');
  socket.off('user_typing');
  socket.off('user_stopped_typing');
};

}, [socket, roomId]);

const sendMessage = (content: string) => { socket?.emit('message', { roomId, content }); };

const startTyping = () => { socket?.emit('typing_start', roomId); };

const stopTyping = () => { socket?.emit('typing_stop', roomId); };

return { messages, typingUsers, sendMessage, startTyping, stopTyping }; }

Server-Sent Events (SSE)

Server

import express from 'express';

const app = express();

// SSE endpoint app.get('/events', (req, res) => { // Set SSE headers res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive');

// Send initial connection event res.write(event: connected\ndata: ${JSON.stringify({ time: Date.now() })}\n\n);

// Keep-alive interval const keepAlive = setInterval(() => { res.write(: keep-alive\n\n); }, 30000);

// Subscribe to events const unsubscribe = eventEmitter.on('update', (data) => { res.write(event: update\ndata: ${JSON.stringify(data)}\n\n); });

// Handle client disconnect req.on('close', () => { clearInterval(keepAlive); unsubscribe(); }); });

// With user-specific events app.get('/events/user/:userId', authenticate, (req, res) => { const { userId } = req.params;

res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive');

// Subscribe to user-specific channel const channel = user:${userId}; const unsubscribe = pubsub.subscribe(channel, (message) => { res.write(event: ${message.type}\ndata: ${JSON.stringify(message.data)}\n\n); });

req.on('close', () => { unsubscribe(); }); });

Client

class EventSourceClient { private eventSource: EventSource | null = null; private handlers = new Map<string, Set<Function>>();

connect(url: string) { this.eventSource = new EventSource(url);

this.eventSource.onopen = () => {
  console.log('SSE connected');
};

this.eventSource.onerror = (error) => {
  console.error('SSE error:', error);
  // EventSource auto-reconnects
};

// Handle named events
this.handlers.forEach((handlers, eventType) => {
  this.eventSource!.addEventListener(eventType, (event: MessageEvent) => {
    const data = JSON.parse(event.data);
    handlers.forEach(handler => handler(data));
  });
});

}

on(eventType: string, handler: Function) { if (!this.handlers.has(eventType)) { this.handlers.set(eventType, new Set());

  // Add listener if already connected
  if (this.eventSource) {
    this.eventSource.addEventListener(eventType, (event: MessageEvent) => {
      const data = JSON.parse(event.data);
      this.handlers.get(eventType)?.forEach(h => h(data));
    });
  }
}

this.handlers.get(eventType)!.add(handler);

return () => {
  this.handlers.get(eventType)?.delete(handler);
};

}

close() { this.eventSource?.close(); this.eventSource = null; } }

// Usage const sse = new EventSourceClient(); sse.on('update', (data) => console.log('Update:', data)); sse.on('notification', (data) => showNotification(data)); sse.connect('/events');

Pub/Sub with Redis

import Redis from 'ioredis';

const publisher = new Redis(process.env.REDIS_URL); const subscriber = new Redis(process.env.REDIS_URL);

// Publish event async function publishEvent(channel: string, event: any) { await publisher.publish(channel, JSON.stringify(event)); }

// Subscribe to channel function subscribe(channel: string, handler: (event: any) => void) { subscriber.subscribe(channel);

subscriber.on('message', (ch, message) => { if (ch === channel) { handler(JSON.parse(message)); } }); }

// Pattern subscription function subscribePattern(pattern: string, handler: (channel: string, event: any) => void) { subscriber.psubscribe(pattern);

subscriber.on('pmessage', (pat, channel, message) => { if (pat === pattern) { handler(channel, JSON.parse(message)); } }); }

// Usage subscribe('notifications', (event) => { console.log('Notification:', event); });

subscribePattern('room:*', (channel, event) => { const roomId = channel.split(':')[1]; console.log(Room ${roomId}:, event); });

publishEvent('notifications', { type: 'alert', message: 'New message' }); publishEvent('room:123', { type: 'message', content: 'Hello!' });

Related Skills

  • [[backend]] - Server implementation

  • [[system-design]] - Event-driven architecture

  • [[cloud-platforms]] - Managed pub/sub services

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

saas-platforms

No summary provided by upstream source.

Repository SourceNeeds Review
General

architecture-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

frontend

No summary provided by upstream source.

Repository SourceNeeds Review
General

project-management

No summary provided by upstream source.

Repository SourceNeeds Review