Back to Skills

streaming-api-patterns

ArieGoldkin
Updated Yesterday
104 views
5
5
View on GitHub
Developmentstreamingssewebsocketreal-timeapi2025

About

This skill teaches implementation of real-time data streaming using SSE, WebSockets, and ReadableStream APIs. It focuses on critical production concerns like backpressure handling, reconnection strategies, and LLM streaming. Use it for building ChatGPT-style interfaces, live notifications, chat apps, and other 2025+ real-time applications.

Quick Install

Claude Code

Recommended
Plugin CommandRecommended
/plugin add https://github.com/ArieGoldkin/ai-agent-hub
Git CloneAlternative
git clone https://github.com/ArieGoldkin/ai-agent-hub.git ~/.claude/skills/streaming-api-patterns

Copy and paste this command in Claude Code to install this skill

Documentation

Streaming API Patterns

Overview

Modern applications require real-time data delivery. This skill covers Server-Sent Events (SSE) for server-to-client streaming, WebSockets for bidirectional communication, and the Streams API for handling backpressure and efficient data flow.

When to use this skill:

  • Streaming LLM responses (ChatGPT-style interfaces)
  • Real-time notifications and updates
  • Live data feeds (stock prices, analytics)
  • Chat applications
  • Progress updates for long-running tasks
  • Collaborative editing features

Core Technologies

1. Server-Sent Events (SSE)

Best for: Server-to-client streaming (LLM responses, notifications)

// Next.js Route Handler
export async function GET(req: Request) {
  const encoder = new TextEncoder()

  const stream = new ReadableStream({
    async start(controller) {
      // Send data
      controller.enqueue(encoder.encode('data: Hello\n\n'))

      // Keep connection alive
      const interval = setInterval(() => {
        controller.enqueue(encoder.encode(': keepalive\n\n'))
      }, 30000)

      // Cleanup
      req.signal.addEventListener('abort', () => {
        clearInterval(interval)
        controller.close()
      })
    }
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    }
  })
}

// Client
const eventSource = new EventSource('/api/stream')
eventSource.onmessage = (event) => {
  console.log(event.data)
}

2. WebSockets

Best for: Bidirectional real-time communication (chat, collaboration)

// WebSocket Server (Next.js with ws)
import { WebSocketServer } from 'ws'

const wss = new WebSocketServer({ port: 8080 })

wss.on('connection', (ws) => {
  ws.on('message', (data) => {
    // Broadcast to all clients
    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(data)
      }
    })
  })
})

// Client
const ws = new WebSocket('ws://localhost:8080')
ws.onmessage = (event) => console.log(event.data)
ws.send(JSON.stringify({ type: 'message', text: 'Hello' }))

3. ReadableStream API

Best for: Processing large data streams with backpressure

async function* generateData() {
  for (let i = 0; i < 1000; i++) {
    await new Promise(resolve => setTimeout(resolve, 100))
    yield `data-${i}`
  }
}

const stream = new ReadableStream({
  async start(controller) {
    for await (const chunk of generateData()) {
      controller.enqueue(new TextEncoder().encode(chunk + '\n'))
    }
    controller.close()
  }
})

LLM Streaming Pattern

// Server
import OpenAI from 'openai'

const openai = new OpenAI()

export async function POST(req: Request) {
  const { messages } = await req.json()

  const stream = await openai.chat.completions.create({
    model: 'gpt-4-turbo-preview',
    messages,
    stream: true
  })

  const encoder = new TextEncoder()

  return new Response(
    new ReadableStream({
      async start(controller) {
        for await (const chunk of stream) {
          const content = chunk.choices[0]?.delta?.content
          if (content) {
            controller.enqueue(encoder.encode(`data: ${JSON.stringify({ content })}\n\n`))
          }
        }
        controller.enqueue(encoder.encode('data: [DONE]\n\n'))
        controller.close()
      }
    }),
    {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache'
      }
    }
  )
}

// Client
async function streamChat(messages) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages })
  })

  const reader = response.body.getReader()
  const decoder = new TextDecoder()

  while (true) {
    const { done, value } = await reader.read()
    if (done) break

    const chunk = decoder.decode(value)
    const lines = chunk.split('\n')

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6)
        if (data === '[DONE]') return

        const json = JSON.parse(data)
        console.log(json.content) // Stream token
      }
    }
  }
}

Reconnection Strategy

class ReconnectingEventSource {
  private eventSource: EventSource | null = null
  private reconnectDelay = 1000
  private maxReconnectDelay = 30000

  constructor(private url: string, private onMessage: (data: string) => void) {
    this.connect()
  }

  private connect() {
    this.eventSource = new EventSource(this.url)

    this.eventSource.onmessage = (event) => {
      this.reconnectDelay = 1000 // Reset on success
      this.onMessage(event.data)
    }

    this.eventSource.onerror = () => {
      this.eventSource?.close()

      // Exponential backoff
      setTimeout(() => this.connect(), this.reconnectDelay)
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay)
    }
  }

  close() {
    this.eventSource?.close()
  }
}

Best Practices

SSE

  • ✅ Use for one-way server-to-client streaming
  • ✅ Implement automatic reconnection
  • ✅ Send keepalive messages every 30s
  • ✅ Handle browser connection limits (6 per domain)
  • ✅ Use HTTP/2 for better performance

WebSockets

  • ✅ Use for bidirectional real-time communication
  • ✅ Implement heartbeat/ping-pong
  • ✅ Handle reconnection with exponential backoff
  • ✅ Validate and sanitize messages
  • ✅ Implement message queuing for offline periods

Backpressure

  • ✅ Use ReadableStream with proper flow control
  • ✅ Monitor buffer sizes
  • ✅ Pause production when consumer is slow
  • ✅ Implement timeouts for slow consumers

Performance

  • ✅ Compress data (gzip/brotli)
  • ✅ Batch small messages
  • ✅ Use binary formats (MessagePack, Protobuf) for large data
  • ✅ Implement client-side buffering
  • ✅ Monitor connection count and resource usage

Resources

GitHub Repository

ArieGoldkin/ai-agent-hub
Path: skills/streaming-api-patterns

Related Skills

creating-opencode-plugins

Meta

This skill provides the structure and API specifications for creating OpenCode plugins that hook into 25+ event types like commands, files, and LSP operations. It offers implementation patterns for JavaScript/TypeScript modules that intercept and extend the AI assistant's lifecycle. Use it when you need to build event-driven plugins for monitoring, custom handling, or extending OpenCode's capabilities.

View skill

evaluating-llms-harness

Testing

This Claude Skill runs the lm-evaluation-harness to benchmark LLMs across 60+ standardized academic tasks like MMLU and GSM8K. It's designed for developers to compare model quality, track training progress, or report academic results. The tool supports various backends including HuggingFace and vLLM models.

View skill

langchain

Meta

LangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.

View skill

cloudflare-turnstile

Meta

This skill provides comprehensive guidance for implementing Cloudflare Turnstile as a CAPTCHA-alternative bot protection system. It covers integration for forms, login pages, API endpoints, and frameworks like React/Next.js/Hono, while handling invisible challenges that maintain user experience. Use it when migrating from reCAPTCHA, debugging error codes, or implementing token validation and E2E tests.

View skill