旌磅箱 发表于 2025-6-4 20:29:42

通过 API 将Deepseek响应流式内容输出到前端

要实现通过 API 将流式内容输出到前端,可以采用以下技术方案(以 Python 后端 + 前端 JavaScript 为例):
方案一:使用 Server-Sent Events (SSE)

这是浏览器原生支持的流式传输方案,推荐首选
# Flask 示例
from flask import Response, stream_with_context

@app.route('/stream')
def stream_data():
    def generate():
      response = client.chat.completions.create(
            model="deepseek-chat",
            messages=messages,
            stream=True
      )
      
      for chunk in response:
            if chunk.choices:
                content = chunk.choices.delta.content or ""
                # SSE 格式要求 data: 前缀和双换行符
                yield f"data: {json.dumps({'content': content})}\n\n"
   
    return Response(stream_with_context(generate()), mimetype='text/event-stream')// 前端 JavaScript
const eventSource = new EventSource('/stream');

eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    document.getElementById('output').innerHTML += data.content;
};

eventSource.onerror = (err) => {
    console.error('EventSource failed:', err);
    eventSource.close();
};方案二:使用流式 HTTP 响应(NDJSON)

更通用的流式传输方案,适合非浏览器客户端
# FastAPI 示例
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import json

@app.get("/stream")
async def stream_data():
    async def generate():
      response = client.chat.completions.create(
            model="deepseek-chat",
            messages=messages,
            stream=True
      )
      
      async for chunk in response:
            if chunk.choices:
                content = chunk.choices.delta.content or ""
                yield json.dumps({"content": content}) + "\n"# NDJSON 格式
   
    return StreamingResponse(generate(), media_type='application/x-ndjson')// 前端 JavaScript 使用 Fetch API
async function streamData() {
    const response = await fetch('/stream');
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
   
    while(true) {
      const { done, value } = await reader.read();
      if(done) break;
      
      const chunk = decoder.decode(value);
      const data = JSON.parse(chunk);
      document.getElementById('output').innerHTML += data.content;
    }
}关键配置说明


[*]响应头设置:
# Flask
headers = {
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
}

# FastAPI 自动处理
[*]数据格式选择:

[*]SSE (text/event-stream):浏览器原生支持,自动重连
[*]NDJSON (application/x-ndjson):更通用的流式 JSON 格式
[*]纯文本流:简单但结构化能力弱

[*]前端处理建议:
// 更健壮的读取方式(处理分块不完整情况)
let buffer = '';

async function processChunk(chunk) {
    buffer += chunk;
    while(buffer.includes('\n')) {
      const lineEnd = buffer.indexOf('\n');
      const line = buffer.slice(0, lineEnd);
      buffer = buffer.slice(lineEnd + 1);
      
      try {
            const data = JSON.parse(line);
            // 处理数据...
      } catch(e) {
            console.error('解析错误:', e);
      }
    }
}
完整工作流程示例(FastAPI + React)

后端:
# main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/chat")
async def chat_stream(prompt: str):
    async def generate():
      response = client.chat.completions.create(
            model="deepseek-chat",
            messages=[{"role": "user", "content": prompt}],
            stream=True
      )
      
      async for chunk in response:
            if content := chunk.choices.delta.content:
                yield json.dumps({"content": content})
   
    return StreamingResponse(generate(), media_type="application/x-ndjson")前端 React 组件:
// ChatComponent.jsx
import { useState } from 'react';

export default function ChatComponent() {
    const = useState('');
   
    const startStream = async () => {
      const response = await fetch('http://api/chat?prompt=你好');
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let buffer = '';
      
      while(true) {
            const { done, value } = await reader.read();
            if(done) break;
            
            buffer += decoder.decode(value);
            while(buffer.includes('}')) {
                const endIndex = buffer.indexOf('}') + 1;
                const chunk = buffer.slice(0, endIndex);
                buffer = buffer.slice(endIndex);
               
                try {
                  const data = JSON.parse(chunk);
                  setOutput(prev => prev + data.content);
                } catch(e) {
                  console.error('解析错误:', e);
                }
            }
      }
    };
   
    return (
      
            <button onClick={startStream}>开始对话</button>
            {output}
      
    );
}注意事项


[*]连接管理:

[*]设置合理的超时时间(通常 30-60 秒)
[*]处理客户端提前断开连接的情况
# FastAPI 示例
try:
    async for chunk in response:
      # ...处理数据
      if await request.is_disconnected():
            break
finally:
    await client.close()# 清理资源
[*]性能优化:

[*]使用异步框架(FastAPI 性能优于 Flask)
[*]启用响应压缩
app = FastAPI()
@app.middleware("http")
async def add_compression(request, call_next):
    response = await call_next(request)
    response.headers["Content-Encoding"] = "gzip"
    return response
[*]安全考虑:

[*]限制最大并发连接数
[*]实施速率限制
from fastapi import Request
from fastapi.middleware import Middleware
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter

@app.get("/chat")
@limiter.limit("10/minute")
async def chat_stream(request: Request):
    # ...
[*]错误处理增强:
async def generate():
    try:
      response = client.chat.completions.create(...)
      async for chunk in response:
            # 处理数据...
    except Exception as e:
      yield json.dumps({"error": str(e)})
    finally:
      await client.close()# 确保释放资源
这些方案可根据具体需求组合使用,建议优先选择 SSE 方案(浏览器兼容性好),需要支持更复杂场景时可考虑 WebSocket,但后者实现成本较高。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 通过 API 将Deepseek响应流式内容输出到前端