背景
近期部分应用需要融合AI,对接了大模型,并按SSE进行流式返回。为了减少其他平台调用复杂度,需要在现有内部云平台进行对接,现在有云平台微服务较多,但是JDK版本为8。又没办法直接全量升级,只能是在内部基础平台进行对接大模型接口,云平台进行透传。然后就遇到了,透传部分内容丢失,导致页面呈现效果有问题日。尝试了大半天,有了解决方案,故此写此博文做个记录分享。
什么是SSE
服务器向浏览器推送信息,除了 WebSocket,还有一种方法:Server-Sent Events(以下简称 SSE)。
SSE(Server-Sent Events)是一种用于实现服务器主动向客户端推送数据的技术,也被称为“事件流”(Event Stream)。它基于 HTTP 协议,利用了其长连接特性,在客户端与服务器之间建立一条持久化连接,并通过这条连接实现服务器向客户端的实时数据推送。
SSE基本原理
客户端向服务器发送一个请求,带有指定的Header,表示可以接收事件流类型,并禁用任何的事件缓存。
服务器返回一个响应,带有指定的Header,表示事件的媒体类型和编码,以及使用分块传输编码(chunked)来流式传输动态生成的内容。
事件之间由两个换行符分隔。服务器可以发送事件数据、事件类型、事件ID和重试时间等字段。
客户端使用EventSource接口来创建一个对象,打开连接,并订阅onopen、onmessage和onerror等事件处理程序来处理连接状态和接收消息。
客户端可以使用GET查询参数来传递数据给服务器,也可以使用close方法来关闭连接。
SSE适用场景
实时数据推送
金融行情:股票、加密货币价格的实时变动。
系统监控:服务器CPU、内存、网络流量的实时图表更新。
物联网数据:传感器读数(如温度、湿度)的实时展示。
实时通知与消息流
新闻推送:突发新闻或头条的实时提示。
社交媒体动态:关注用户新发布内容或点赞评论的提示。
后台任务进度:文件处理、数据导出等长任务的完成百分比推送。
协同编辑与状态同步
在线文档:提示其他协作者正在编辑或已更新了某部分内容。
在线状态:显示好友或团队成员的“在线/离线”状态变化。
实时体育比分或赛事更新
比赛得分、关键事件(如进球)的即时推送。
SSE特点
数据更新频繁:服务器需要不断地将最新的数据推送给客户端,保持数据的实时性和准确性。
低延迟:服务器需要尽快地将数据推送给客户端,避免数据的延迟和过期。
单向通信:服务器只需要向客户端推送数据,而不需要接收客户端的数据。
接下来,上代码、上问题、上解决方案
GO GO GO
壹
初版代码 调用初见端倪
//基础平台调用某大模型并以(SSE)流式返回 充当服务端
Flux<String> fluxContent = llmClient.prompt()
.user(prompt)
.advisors(a -> a.param(CHAT_MEMORY_CONVERSATION_ID_KEY, topicId))
.stream()
.content();
//云平台透传基础平台大模型(SSE)流式的接口
Flux<String> flux = webClient.get()
.uri(API_URL)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class);基础平台验证

云平台透传验证

可以看到,云平台透传后,换行丢失,导致内容均呈现在一行内,导致可读性降低,影响用户体验。可能是SSE针对于事件会多个合并。
各种尝试
尝试一:把云平台Flux<String> 修改为Mono<String> 事件正确,透传不会丢失任何内容,但是Mono需要等待全部返回完成,再进行事件响应客户端。不可行。影响体验,等待时间过长
尝试二:把云平台Flux<String> 修改为 Flux<ServerSentEvent<String>> 接收为
.bodyToFlux(DataBuffer.class).map(buffer->{String s = StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString(); DataBufferUtils.release(buffer); return s; });不可行。事件返回多了一层data:

❤
解决方案 内容转KV返回
//基础平台代码改动
Flux<ServerSentEvent<Map<String, String>>> fluxContent = llmClient.prompt()
.user(prompt)
.advisors(a -> a.param(CHAT_MEMORY_CONVERSATION_ID_KEY, topicId))
.stream()
.content()
.map(content ->
{
ServerSentEvent<Map<String, String>> build = ServerSentEvent.<Map<String, String>>builder()
.data(Map.of("content", content))
.build();
return build;
}
);
return fluxContent;// 云平台透传代码改动
Flux<Map<String, Object>> flux = webClient.get()
.uri(API_URL)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<Map<String, Object>>() {
});完美解决
本文只是针对自身问题得到完美解决,可做参考,但并不是通用全量遇到问题的解决方案。
使用SpringAI,如果使用LangChain4J就不太清楚了。
END