Skip to main content

Why Stream?

Streaming provides immediate feedback to users, making your application feel faster and more responsive.

Python Streaming

Basic Streaming

from openai import OpenAI

client = OpenAI(
    api_key="sk-savegate-xxxxxxxxxxxxx",
    base_url="https://api.savegate.ai/v1"
)

def stream_response(message, model="gpt-4"):
    stream = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": message}],
        stream=True
    )

    print("Response: ", end="")

    for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)

    print()  # New line at end

# Usage
stream_response("Tell me a story about a robot")

Streaming with Callback

def stream_with_callback(message, callback, model="gpt-4"):
    """Stream response and call callback for each chunk"""
    stream = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": message}],
        stream=True
    )

    full_response = ""

    for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_response += content
            callback(content)

    return full_response

# Usage with custom callback
def print_colored(text):
    print(f"\033[92m{text}\033[0m", end="", flush=True)

response = stream_with_callback("Write a poem", callback=print_colored)

Async Streaming

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI(
    api_key="sk-savegate-xxxxxxxxxxxxx",
    base_url="https://api.savegate.ai/v1"
)

async def async_stream(message, model="gpt-4"):
    stream = await async_client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": message}],
        stream=True
    )

    full_response = ""

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            full_response += content

    return full_response

# Usage
asyncio.run(async_stream("Explain quantum computing"))

Node.js Streaming

Basic Streaming

import OpenAI from 'openai';

const client = new OpenAI({
  apiKey: 'sk-savegate-xxxxxxxxxxxxx',
  baseURL: 'https://api.savegate.ai/v1'
});

async function streamResponse(message, model = 'gpt-4') {
  const stream = await client.chat.completions.create({
    model: model,
    messages: [{ role: 'user', content: message }],
    stream: true
  });

  process.stdout.write('Response: ');

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || '';
    process.stdout.write(content);
  }

  console.log();
}

// Usage
streamResponse('Tell me a story about a robot');

Streaming with Event Emitter

import { EventEmitter } from 'events';

async function streamWithEvents(message, model = 'gpt-4') {
  const emitter = new EventEmitter();

  const stream = await client.chat.completions.create({
    model: model,
    messages: [{ role: 'user', content: message }],
    stream: true
  });

  let fullResponse = '';

  (async () => {
    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content || '';
      if (content) {
        fullResponse += content;
        emitter.emit('chunk', content);
      }
    }
    emitter.emit('done', fullResponse);
  })();

  return emitter;
}

// Usage
const stream = await streamWithEvents('Write a poem');

stream.on('chunk', (chunk) => {
  process.stdout.write(chunk);
});

stream.on('done', (fullResponse) => {
  console.log('\n\nFull response received!');
});

Web Streaming with Server-Sent Events

FastAPI Backend

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json

app = FastAPI()
client = OpenAI(
    api_key="sk-savegate-xxxxxxxxxxxxx",
    base_url="https://api.savegate.ai/v1"
)

async def stream_generator(message: str):
    stream = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": message}],
        stream=True
    )

    for chunk in stream:
        if chunk.choices[0].delta.content:
            data = json.dumps({"content": chunk.choices[0].delta.content})
            yield f"data: {data}\n\n"

@app.post("/stream")
async def stream_chat(message: str):
    return StreamingResponse(
        stream_generator(message),
        media_type="text/event-stream"
    )

Express.js Backend

import express from 'express';
import OpenAI from 'openai';

const app = express();
const client = new OpenAI({
  apiKey: 'sk-savegate-xxxxxxxxxxxxx',
  baseURL: 'https://api.savegate.ai/v1'
});

app.use(express.json());

app.post('/stream', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  const stream = await client.chat.completions.create({
    model: 'gpt-4',
    messages: [{ role: 'user', content: req.body.message }],
    stream: true
  });

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || '';
    if (content) {
      res.write(`data: ${JSON.stringify({ content })}\n\n`);
    }
  }

  res.write('data: [DONE]\n\n');
  res.end();
});

app.listen(3000, () => console.log('Server running on port 3000'));

Frontend (HTML + JavaScript)

<!DOCTYPE html>
<html>
<head>
    <title>Streaming Chat</title>
</head>
<body>
    <div id="chat-container">
        <div id="messages"></div>
        <input type="text" id="user-input" placeholder="Type a message...">
        <button onclick="sendMessage()">Send</button>
    </div>

    <script>
        async function sendMessage() {
            const input = document.getElementById('user-input');
            const message = input.value;
            input.value = '';

            // Add user message
            addMessage('User', message);

            // Create message div for assistant
            const assistantDiv = addMessage('Assistant', '');

            // Start streaming
            const response = await fetch('/stream', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ message })
            });

            const reader = response.body.getReader();
            const decoder = new TextDecoder();

            while (true) {
                const { value, done } = await reader.read();
                if (done) break;

                const chunk = decoder.decode(value);
                const lines = chunk.split('\n');

                for (const line of lines) {
                    if (line.startsWith('data: ')) {
                        const data = line.slice(6);
                        if (data === '[DONE]') break;

                        try {
                            const parsed = JSON.parse(data);
                            assistantDiv.textContent += parsed.content;
                        } catch (e) {
                            console.error('Parse error:', e);
                        }
                    }
                }
            }
        }

        function addMessage(sender, content) {
            const messagesDiv = document.getElementById('messages');
            const messageDiv = document.createElement('div');
            messageDiv.innerHTML = `<strong>${sender}:</strong> <span>${content}</span>`;
            messagesDiv.appendChild(messageDiv);
            return messageDiv.querySelector('span');
        }
    </script>
</body>
</html>

Error Handling in Streams

def safe_stream(message, model="gpt-4"):
    try:
        stream = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": message}],
            stream=True
        )

        for chunk in stream:
            try:
                if chunk.choices[0].delta.content:
                    print(chunk.choices[0].delta.content, end="", flush=True)
            except (AttributeError, IndexError) as e:
                # Handle malformed chunks
                continue

    except Exception as e:
        print(f"\nError during streaming: {e}")
        return None

    print()  # New line at end

Next Steps

Learn how to use function calling for tool integration