跳到主要内容

SSE 流式推送

2025年02月27日
柏拉文
越努力,越幸运

一、认识


SSEServer-Sent Events)是一种基于 HTTP 持久连接的单向数据推送协议,服务端通过设置特定响应头(如 Content-Type: text/event-stream、Cache-Control: no-cache、Connection: keep-alive),并关闭框架(例如 Koa)默认的自动响应处理,来确保连接持续开放。服务端按 SSE 规范生成消息(每条消息以 data: 开头,以两个换行符结束),并在客户端断开时清理定时器等资源以避免内存泄露。客户端接收方式则主要有三种: 利用 Fetch 结合 ReadableStream 进行 POST 请求与流数据读取,采用 EventSource 处理 GET 请求并自动解析事件数据,或通过 XMLHttpRequestonprogress 事件手动管理数据缓冲。

SSEServer-Sent Events 是一种单向通信协议,服务器可以通过 HTTP 持久连接主动向客户端推送数据。相比 WebSocketSSE 实现简单,适用于只需要单向消息推送的场景。

SSE 服务端工作流: 1. 在服务端,我们需要设置 HTTP 响应头, 告诉客户端这是一个事件流, 设置响应头: Content-Type: text/event-streamCache-Control: no-cache 禁用缓存、Connection: keep-alive 保持长连接; 2.关闭自动响应处理, 比如 Koa 需要设置 ctx.respond = false, 否则, Koa 可能在你的写操作完成之前结束响应,导致连接中断,从而触发 error 事件; 3. 生成消息之后, 通过 ctx.res.write(data) 发送数据,并注意数据格式必须符合 SSE 规范(每条消息以 data: 开头,以两个换行符 \n\n 结束); 4. 保持连接并在客户端断开时清理定时器等资源。当客户端关闭连接时,通过 ctx.req.on('close', ...) 清理定时器,防止内存泄露。

SSE 客户端工作流:

  • 基于 Fetch 接收流式数据: 1. 请求初始化, 使用 fetch 发起 POST 请求,传入 JSON 格式的 body,并设置 Content-Type 请求头, 通过 AbortController 来获取 signal, 方便后续取消请求; 2. 流式接收处理, 在获取响应后,判断 response.ok,否则抛出错误。从 response.body 获取一个 ReadableStreamreader,通过 reader.read() 进入一个异步循环。每次读取到数据块后,使用 TextDecoder 将二进制数据解码为字符串,然后直接处理(如检查是否包含 data: 前缀,或是否为 [DONE] 标记)。3. 请求结束处理, 当 reader.read() 返回 donetrue 或遇到 [DONE] 标记时,结束循环并调用 close()。4. 提供 close 方法, 调用 controller.abort 中断请求, 关闭连接。基于 Fetch 接收流式数据 可以发送 Post 请求, 处理流数据相比于 XMLRequestHttp 比较方便。

  • 基于 EventSource 接收流式数据: 一个 EventSource 实例会对 HTTP 服务器开启一个持久化的连接, 以 text/event-stream 格式发送事件,此连接会一直保持开启直到通过调用 EventSource.close() 关闭。与 WebSocket 不同的是, 服务器发送事件是单向的。数据消息只能从服务端到发送到客户端(如用户的浏览器)。工作流为: 1. 请求初始化, 直接创建 EventSource 对象,传入目标 URL(注意 SSE 默认使用 GET 请求,且请求参数通过 URL 查询字符串传递)。2. 流式接收处理, onopen 事件连接建立成功时,更新状态为 connected, onmessage, 浏览器自动解析服务端以 SSE 格式发送的数据,调用该回调, 每收到一条消息,就检查 event.data, 如果为 [DONE] 则关闭连接, 否则, 直接调用 onMessage。4. 提送 close 方法, 调用 eventSource.close 关闭链接, 终端请求。基于 EventSource 接收流式数据 只支持 Get 请求, 发送的数据有长度限制。

  • 基于 XMLRequestHttp 接收流式数据: 1. 请求初始化, 创建 XMLHttpRequest 对象,调用 open("POST", url, true) 初始化异步 POST 请求,并设置请求头(Content-Type: application/json)(请求头随意)。2. 通过 send() 发送请求。3. 流式接收处理, 每当收到部分响应时,会触发 onprogress 事件, 此时, 根据 responseText 的长度与之前记录的 lastIndex,提取出新增的数据, 将新增数据累加到一个内部 buffer 中,然后以换行符分割,处理分割出来的完整数据块, 对每个数据块做 trim 处理,并检查是否以 data: 开头,如果是则去掉 data: 前缀后传给 onMessage 回调, 对于 [DONE] 标记,则结束接收并调用 close() 终止请求。4. 请求结束处理, 在 onreadystatechange 回调中, 当 readyStateDONE 时(即请求完成),如果状态码不为 200,则调用 onError 通知错误,否则处理 buffer 中残留的部分数据。5. 提供 close 方法, 调用 xhr.abort 终端请求, 关闭长连接。基于 XMLRequestHttp 接收流式数据 流数据处理需要手动管理 buffer 与分块处理,代码较繁琐, 错误处理与状态判断较为分散。

二、服务端


const Koa = require("koa");
const cors = require("@koa/cors");
const Router = require("koa-router");
const bodyParser = require("koa-bodyparser");

const app = new Koa();
const router = new Router();

app.use(cors({ origin: "*" }));
app.use(bodyParser());

const chatHandler = async (ctx) => {
ctx.respond = false;
ctx.req.setTimeout(0);

ctx.status = 200;
ctx.set({
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});

ctx.res.write("\n");

const message = ctx.method === "POST" ? ctx.request.body.msg : ctx.query.msg;
console.log("接收到的消息:", message);

const answer = "市场容量大,竞争空间充足:全球新能源汽车市场";
const tokens = answer.split("");
let index = 0;

const intervalId = setInterval(() => {
if (index < tokens.length) {
const token = tokens[index];
ctx.res.write(`data: ${token}\n\n`);
index++;
} else {
clearInterval(intervalId);
ctx.res.write(`data: [DONE]\n\n`);
ctx.res.end();
}
}, 200);

ctx.req.on("close", () => {
clearInterval(intervalId);
});
};

router.get("/chat", chatHandler);
router.post("/chat", chatHandler);

app.use(router.routes()).use(router.allowedMethods());

app.listen(3000, () => {
console.log("SSE 服务端已启动,访问 http://localhost:3000/chat");
});

三、客户端


3.1 client.js

class XHRClient {
constructor() {
this.xhr = null;
this.buffer = "";
this.lastIndex = 0;
this.isAborted = false;
this.contentType = "application/json";
}

open(url, params, onMessage, onError) {
this.close();
this.buffer = "";
this.lastIndex = 0;
this.isAborted = false;

this.xhr = new XMLHttpRequest();
this.xhr.open("POST", url, true);
this.xhr.setRequestHeader("Content-Type", "application/json");

this.xhr.onreadystatechange = () => {
if (this.xhr.readyState === XMLHttpRequest.DONE) {
if (!this.isAborted && this.xhr.status !== 200) {
onError(new Error(`请求失败:${this.xhr.statusText}`));
} else {
if (this.buffer) {
const remainingParts = this.buffer.split("\n");
remainingParts.forEach((chunk) => {
const dataContent = chunk.trim().startsWith("data:")
? chunk.slice(5).trim()
: chunk.trim();
if (dataContent) {
onMessage(dataContent);
}
});
}
}
}
};

this.xhr.onprogress = () => {
const newData = this.xhr.responseText.substring(this.lastIndex);
this.lastIndex = this.xhr.responseText.length;
this.buffer += newData;
const parts = this.buffer.split("\n");
this.buffer = parts.pop();
parts.forEach((chunk) => {
const dataContent = chunk.trim().startsWith("data:")
? chunk.slice(5).trim()
: chunk.trim();
if (dataContent) {
if (dataContent === "[DONE]") {
this.close();
return;
}
onMessage(dataContent);
}
});
};

this.xhr.onerror = (err) => {
if (!this.isAborted) {
onError(err);
}
this.close();
};

this.xhr.send(JSON.stringify(params));
}

close() {
if (this.xhr) {
this.isAborted = true;
this.xhr.abort();
this.xhr = null;
}
}
}

class FetchClient {
constructor() {
this.controller = null;
this.status = "closed";
}

async open(url, params, onMessage, onError) {
this.close();
this.controller = new AbortController();
this.status = "connecting";

const options = {
method: "POST",
body: JSON.stringify(params),
signal: this.controller.signal,
headers: { "Content-Type": "application/json" },
};

try {
const response = await fetch(url, options);
if (!response.ok) {
throw new Error(`请求失败:${response.status} ${response.statusText}`);
}
this.status = "connected";
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");

while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const dataContent = chunk.trim().startsWith("data:")
? chunk.slice(5).trim()
: chunk.trim();

if (dataContent === "[DONE]") {
this.close();
return;
}
onMessage(dataContent);
}
} catch (err) {
this.status = "closed";
onError(err);
}
}

close() {
if (this.controller) {
this.controller.abort();
this.controller = null;
this.status = "closed";
}
}
}

class SSEClient {
constructor() {
this.eventSource = null;
this.status = "closed";
}

open(url, onMessage, onError) {
this.close();

this.eventSource = new EventSource(url);
this.status = "connecting";

this.eventSource.onopen = () => {
this.status = "connected";
};

this.eventSource.onmessage = (event) => {
if (event.data === "[DONE]") {
this.close();
return;
}
onMessage(event.data);
};

this.eventSource.onerror = (err) => {
this.status = "closed";
onError(err);
this.close();
};
}

close() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
this.status = "closed";
}
}
}

class StreamClient {
constructor(type = "fetch") {
this.type = type;
this.url = "http://localhost:3000/chat";
if (type === "fetch") {
this.client = new FetchClient();
} else if (type === "xhr") {
this.client = new XHRClient();
} else if (type === "sse") {
this.client = new SSEClient();
} else {
throw new Error("未知的 stream 类型: " + type);
}
}

open(url, params, onMessage, onError) {
if (this.type === "fetch" || this.type === "xhr") {
this.client.open(url, params, onMessage, onError);
} else {
this.client.open(url, onMessage, onError);
}
}

close() {
this.client.close();
}

send(message, successCallback, failCallback) {
const url = this.type === "sse" ? `${this.url}?msg=${message}` : this.url;

this.open(
url,
{ msg: message },
(data) => {
successCallback?.(data);
},
(err) => {
this.close();
failCallback?.(err);
}
);
}
}

document.getElementById("sendBtn").addEventListener("click", () => {
const inputEl = document.getElementById("userInput");
const message = inputEl.value.trim();
if (!message) {
alert("请输入消息");
return;
}

const responseDiv = document.getElementById("response");
responseDiv.textContent = "";
const streamClient = new StreamClient("fetch");

streamClient.send(message, (data) => {
console.log("接收到数据:", data);
responseDiv.textContent += data;
});
});

3.2 client.html

<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>ChatGPT SSE 示例</title>
</head>
<body>
<h1>ChatGPT SSE 示例</h1>
<div>
<input type="text" id="userInput" placeholder="请输入消息" />
<button id="sendBtn">发送</button>
</div>
<div id="response" style="margin-top:20px; border: 1px solid #ccc; padding: 10px;"></div>
<script src="./client.js"></script>
</body>
</html>