websocket-streaming

Real-time bidirectional communication between DAG execution engines and dashboards. Typed event protocols, connection management, and React hook integration.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "websocket-streaming" with this command: npx skills add erichowens/some_claude_skills/erichowens-some-claude-skills-websocket-streaming

WebSocket Streaming

Real-time bidirectional communication between DAG execution engines and dashboards. Typed event protocols, connection management, and React hook integration.

When to Use

✅ Use for:

  • Streaming DAG node state changes to a visualization dashboard

  • Sending human gate decisions from dashboard to execution engine

  • Live cost ticker and progress updates during execution

  • Bi-directional communication (not just server → client)

❌ NOT for:

  • One-way server → client updates (consider SSE, simpler)

  • REST API design (use api-architect )

  • Polling-based status checks (WebSocket replaces polling)

Event Protocol

Server → Client Events

type ServerEvent = | { type: 'node_state'; node_id: string; status: NodeStatus; output?: any; metrics?: NodeMetrics } | { type: 'edge_active'; from: string; to: string } | { type: 'dag_mutated'; mutation: DAGMutation } | { type: 'cost_update'; spent: number; budget: number; remaining: number } | { type: 'execution_complete'; results: Record<string, any> } | { type: 'human_gate_waiting'; node_id: string; presentation: GatePresentation } | { type: 'error'; node_id?: string; message: string };

Client → Server Events

type ClientEvent = | { type: 'human_decision'; node_id: string; decision: 'approve' | 'reject' | 'modify'; feedback?: string } | { type: 'pause_execution' } | { type: 'resume_execution' } | { type: 'cancel_execution' };

React Hook: useDAGStream

import { useEffect, useRef, useCallback } from 'react';

export function useDAGStream(dagId: string, store: DAGStore) { const wsRef = useRef<WebSocket | null>(null); const reconnectAttempt = useRef(0);

const connect = useCallback(() => { const ws = new WebSocket(/api/dags/${dagId}/stream);

ws.onopen = () => { reconnectAttempt.current = 0; };

ws.onmessage = (event) => {
  const msg = JSON.parse(event.data) as ServerEvent;
  switch (msg.type) {
    case 'node_state':
      store.updateNodeData(msg.node_id, {
        status: msg.status, output: msg.output, metrics: msg.metrics,
      });
      break;
    case 'cost_update':
      store.setCostState({ spent: msg.spent, budget: msg.budget });
      break;
    case 'dag_mutated':
      store.applyMutation(msg.mutation);
      break;
    case 'execution_complete':
      store.setExecutionComplete(msg.results);
      break;
  }
};

ws.onclose = () => {
  // Reconnect with exponential backoff (max 30s)
  const delay = Math.min(1000 * 2 ** reconnectAttempt.current, 30000);
  reconnectAttempt.current++;
  setTimeout(connect, delay);
};

wsRef.current = ws;

}, [dagId, store]);

useEffect(() => { connect(); return () => wsRef.current?.close(); }, [connect]);

// Send client events const send = useCallback((event: ClientEvent) => { wsRef.current?.send(JSON.stringify(event)); }, []);

return { send }; }

Server Implementation (Node.js)

import { WebSocketServer } from 'ws';

const wss = new WebSocketServer({ noServer: true });

// Per-DAG rooms const rooms = new Map<string, Set<WebSocket>>();

function broadcast(dagId: string, event: ServerEvent) { const clients = rooms.get(dagId); if (!clients) return; const msg = JSON.stringify(event); for (const ws of clients) { if (ws.readyState === ws.OPEN) ws.send(msg); } }

// Usage in execution engine: function onNodeComplete(dagId: string, nodeId: string, result: any) { broadcast(dagId, { type: 'node_state', node_id: nodeId, status: 'completed', output: result.output, metrics: result.metrics, }); }

Anti-Patterns

No Reconnection Logic

Wrong: WebSocket closes and the dashboard shows stale data forever. Right: Exponential backoff reconnection (1s, 2s, 4s, 8s... max 30s). Resync state on reconnect.

Sending Full State on Every Event

Wrong: Broadcasting the entire DAG state on every node update. Right: Send only the delta: which node changed, to what status. The client applies the update to its local store.

No Typed Protocol

Wrong: Sending untyped JSON objects and parsing with any . Right: Define ServerEvent and ClientEvent union types. Exhaustive switch on msg.type .

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

video-processing-editing

No summary provided by upstream source.

Repository SourceNeeds Review
General

cv-creator

No summary provided by upstream source.

Repository SourceNeeds Review
General

mobile-ux-optimizer

No summary provided by upstream source.

Repository SourceNeeds Review
General

personal-finance-coach

No summary provided by upstream source.

Repository SourceNeeds Review