Java响应式编程:Flux与SseEmitter深度解析
本文深入探讨Java中Flux和SseEmitter两种流式响应技术,从实际需求出发,详细分析其使用方法、底层原理及应用场景。
目录
一、为什么需要Flux和SseEmitter
1.1 传统同步响应的痛点
在传统的Spring MVC开发中,我们通常使用同步的请求-响应模式:
@PostMapping("/chat/message")publicResult<MessageVO>sendMessage(@RequestBodySendMessageReq req){// 调用AI服务生成回复(可能需要5-30秒)String aiResponse = aiService.generateResponse(req.getContent());returnResult.success(newMessageVO(aiResponse));}存在的问题:
- 用户体验差:用户发送消息后需要等待很长时间才能看到完整回复
- 资源浪费:一个请求会长时间占用一个线程,降低服务器并发能力
- 超时风险:长时间处理可能触发HTTP超时(默认30-60秒)
- 无法感知进度:用户不知道系统是否在处理,容易误以为系统卡死
1.2 典型应用场景
以下场景迫切需要流式响应能力:
| 场景 | 问题描述 | 解决方案 |
|---|---|---|
| AI聊天机器人 | GPT类模型生成回复需要时间,逐字输出更友好 | Flux/SSE |
| 大文件处理 | 文件上传/下载进度实时反馈 | SSE |
| 实时监控 | 服务器指标、日志流实时推送 | SSE/WebSocket |
| 长任务执行 | 数据导入、报表生成等耗时操作的进度通知 | SSE |
| 实时通知 | 消息推送、订单状态更新 | SSE/WebSocket |
1.3 流式响应的价值
传统模式: 客户端 ----请求----> 服务端 [等待30秒] 客户端 <---完整响应-- 服务端 流式模式: 客户端 ----请求----> 服务端 客户端 <---数据块1--- 服务端 (0.5秒) 客户端 <---数据块2--- 服务端 (1.0秒) 客户端 <---数据块3--- 服务端 (1.5秒) ... 客户端 <---完成----- 服务端 (30秒) 核心优势:
- ✅ 用户立即看到响应,体验提升300%
- ✅ 线程快速释放,服务器吞吐量提升5-10倍
- ✅ 支持超长响应,不受HTTP超时限制
- ✅ 实时反馈进度,降低用户焦虑
二、技术背景与概念
2.1 SSE (Server-Sent Events)
官方定义: SSE是HTML5标准的一部分,允许服务器主动向客户端推送数据。
核心特性:
- 基于HTTP协议,无需额外协议支持
- 单向通信(服务器→客户端)
- 自动重连机制
- 支持事件ID和自定义事件类型
- 纯文本协议,简单易用
数据格式:
data: 这是第一条消息\n\n data: 这是第二条消息\n\n event: custom-event\n data: {"message": "JSON数据"}\n id: 123\n\n 2.2 Reactive Streams与Flux
Reactive Streams 是JVM上的响应式编程规范,定义了4个核心接口:
publicinterfacePublisher<T>{voidsubscribe(Subscriber<?superT> subscriber);}publicinterfaceSubscriber<T>{voidonSubscribe(Subscription subscription);voidonNext(T item);voidonError(Throwable throwable);voidonComplete();}publicinterfaceSubscription{voidrequest(long n);// 背压控制voidcancel();}publicinterfaceProcessor<T,R>extendsSubscriber<T>,Publisher<R>{}Flux 是Project Reactor对Publisher的实现,代表0-N个元素的异步序列:
Flux<T>: 0..N个元素的流 | ├─ onNext(T) * N : 发射N个元素 ├─ onError(Throwable): 发生错误(终止) └─ onComplete() : 完成信号(终止) 2.3 两者的关系
SseEmitter (Spring MVC实现) ├─ 基于Servlet异步支持 ├─ 适合传统Spring MVC项目 └─ 不依赖响应式框架 Flux (响应式流) ├─ 基于Reactive Streams规范 ├─ 需要Spring WebFlux支持 └─ 完整的响应式编程能力 三、SseEmitter深度解析
3.1 基本使用
3.1.1 简单示例
@RestController@RequestMapping("/api/v1/chat")publicclassChatController{@GetMapping(value ="/stream", produces =MediaType.TEXT_EVENT_STREAM_VALUE)publicSseEmitterstreamMessage(@RequestParamString message){// 创建SseEmitter,设置超时时间为5分钟SseEmitter emitter =newSseEmitter(5*60*1000L);// 异步处理,避免阻塞主线程CompletableFuture.runAsync(()->{try{// 模拟AI逐字生成回复String response ="这是一个流式响应示例";for(char c : response.toCharArray()){ emitter.send(String.valueOf(c));Thread.sleep(100);// 模拟生成延迟} emitter.complete();// 发送完成信号}catch(Exception e){ emitter.completeWithError(e);// 发送错误信号}});return emitter;}}关键点说明:
produces = MediaType.TEXT_EVENT_STREAM_VALUE:必须设置Content-Type为text/event-streamSseEmitter(timeout):超时时间,0表示永不超时(不推荐)CompletableFuture.runAsync():异步执行,避免阻塞Tomcat线程emitter.complete():必须调用,否则客户端连接不会关闭
3.1.2 前端对接代码
// 原生EventSource APIconst eventSource =newEventSource('/api/v1/chat/stream?message=你好'); eventSource.onmessage=(event)=>{ console.log('收到数据:', event.data); document.getElementById('response').innerText += event.data;}; eventSource.onerror=(error)=>{ console.error('连接错误:', error); eventSource.close();};// 监听完成事件(需要服务端发送特殊事件) eventSource.addEventListener('complete',()=>{ console.log('流式响应完成'); eventSource.close();});3.2 生产级实现
3.2.1 完整的聊天流式接口
@Slf4j@RestController@RequestMapping("/api/v1/chat")@RequiredArgsConstructorpublicclassStreamChatController{privatefinalChatService chatService;privatefinalExecutorService executorService;@PostMapping(value ="/conversations/{sessionId}/stream", produces =MediaType.TEXT_EVENT_STREAM_VALUE)publicSseEmitterstreamChat(@PathVariableString sessionId,@RequestBodySendMessageReq req){long startTime =System.currentTimeMillis();SseEmitter emitter =newSseEmitter(5*60*1000L);// 5分钟超时// 设置回调处理 emitter.onCompletion(()->{ log.info("SSE连接正常完成, sessionId={}, cost={}ms", sessionId,System.currentTimeMillis()- startTime);}); emitter.onTimeout(()->{ log.warn("SSE连接超时, sessionId={}", sessionId); emitter.complete();}); emitter.onError(throwable ->{ log.error("SSE连接异常, sessionId={}, error={}", sessionId, throwable.getMessage(), throwable);});// 异步处理消息 executorService.execute(()->{try{// 1. 发送开始事件 emitter.send(SseEmitter.event().name("start").data(Map.of("messageId",generateMessageId())));// 2. 调用AI服务流式生成StringBuilder fullResponse =newStringBuilder(); chatService.streamGenerate(sessionId, req.getContent(), chunk ->{try{ fullResponse.append(chunk);// 发送数据块 emitter.send(SseEmitter.event().name("message").data(Map.of("content", chunk)));}catch(IOException e){thrownewRuntimeException("发送数据失败", e);}});// 3. 保存完整消息到数据库 chatService.saveMessage(sessionId, fullResponse.toString());// 4. 发送完成事件 emitter.send(SseEmitter.event().name("complete").data(Map.of("totalChunks", fullResponse.length(),"costTime",System.currentTimeMillis()- startTime ))); emitter.complete();}catch(Exception e){ log.error("流式生成失败", e);try{ emitter.send(SseEmitter.event().name("error").data(Map.of("message", e.getMessage())));}catch(IOException ioException){ log.error("发送错误事件失败", ioException);} emitter.completeWithError(e);}});return emitter;}privateStringgenerateMessageId(){return"msg_"+System.currentTimeMillis();}}3.2.2 线程池配置
@ConfigurationpublicclassAsyncConfig{@Bean(name ="sseExecutor")publicExecutorServicesseExecutor(){returnnewThreadPoolExecutor(10,// 核心线程数50,// 最大线程数60L,TimeUnit.SECONDS,// 线程空闲时间newLinkedBlockingQueue<>(100),// 任务队列newThreadFactoryBuilder().setNameFormat("sse-executor-%d").build(),newThreadPoolExecutor.CallerRunsPolicy()// 拒绝策略);}}3.3 高级特性
3.3.1 自定义事件类型
// 服务端发送不同类型的事件 emitter.send(SseEmitter.event().id("123")// 事件ID(用于断线重连).name("chat-message")// 自定义事件名.data(messageData)// 数据.comment("这是注释")// 注释(客户端不处理).reconnectTime(3000L));// 重连时间(毫秒)// 客户端监听特定事件 eventSource.addEventListener('chat-message',(event)=>{const data = JSON.parse(event.data); console.log('收到聊天消息:', data);});3.3.2 断线重连机制
@GetMapping(value ="/stream-with-resume", produces =MediaType.TEXT_EVENT_STREAM_VALUE)publicSseEmitterstreamWithResume(@RequestHeader(value ="Last-Event-ID", required =false)String lastEventId){SseEmitter emitter =newSseEmitter();// 从lastEventId位置继续发送int startIndex = lastEventId !=null?Integer.parseInt(lastEventId):0;CompletableFuture.runAsync(()->{try{for(int i = startIndex; i <100; i++){ emitter.send(SseEmitter.event().id(String.valueOf(i))// 设置事件ID.data("数据块 "+ i));Thread.sleep(100);} emitter.complete();}catch(Exception e){ emitter.completeWithError(e);}});return emitter;}// 客户端自动使用Last-Event-ID重连const eventSource =newEventSource('/stream-with-resume');// EventSource会自动在重连时发送Last-Event-ID请求头四、Flux深度解析
4.1 引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>注意: Flux需要Spring WebFlux支持,可以与Spring MVC共存,但需要注意:
- WebFlux运行在Netty上(默认),也可以配置为Tomcat
- 两者可以同时存在,但推荐全面切换到WebFlux以发挥最大性能
4.2 基本使用
4.2.1 简单示例
@RestController@RequestMapping("/api/v1/reactive")publicclassReactiveChatController{@GetMapping(value ="/stream", produces =MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<String>streamMessage(@RequestParamString message){returnFlux.interval(Duration.ofMillis(100))// 每100ms发射一个元素.map(i ->"字符"+ i).take(10)// 只取前10个.doOnComplete(()-> log.info("流完成"));}}关键概念:
Flux.interval():周期性发射元素map():转换元素take(n):限制元素数量doOnComplete():完成时的副作用操作
4.2.2 实际AI聊天场景
@RestController@RequestMapping("/api/v1/reactive/chat")@RequiredArgsConstructorpublicclassReactiveStreamController{privatefinalReactiveAIService aiService;@PostMapping(value ="/conversations/{sessionId}/stream", produces =MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<ServerSentEvent<MessageChunk>>streamChat(@PathVariableString sessionId,@RequestBodySendMessageReq req){return aiService.streamGenerate(req.getContent()).map(chunk ->ServerSentEvent.<MessageChunk>builder().event("message").data(newMessageChunk(chunk)).build()).concatWith(Flux.just(ServerSentEvent.<MessageChunk>builder().event("complete").data(newMessageChunk("")).build())).doOnSubscribe(sub -> log.info("客户端订阅, sessionId={}", sessionId)).doOnComplete(()-> log.info("流完成, sessionId={}", sessionId)).doOnError(e -> log.error("流异常, sessionId={}", sessionId, e));}}@Data@AllArgsConstructorclassMessageChunk{privateString content;}4.3 核心操作符详解
4.3.1 创建Flux
// 1. 从集合创建Flux<String> flux1 =Flux.fromIterable(Arrays.asList("A","B","C"));// 2. 从数组创建Flux<String> flux2 =Flux.fromArray(newString[]{"A","B","C"});// 3. 从Stream创建Flux<String> flux3 =Flux.fromStream(Stream.of("A","B","C"));// 4. 动态生成Flux<Integer> flux4 =Flux.generate(()->0,// 初始状态(state, sink)->{// 生成逻辑 sink.next(state);// 发射元素if(state ==10) sink.complete();// 完成信号return state +1;// 下一个状态});// 5. 异步创建(最灵活)Flux<String> flux5 =Flux.create(sink ->{// 模拟从外部回调接收数据 externalService.onData(data -> sink.next(data)); externalService.onComplete(()-> sink.complete()); externalService.onError(error -> sink.error(error));});// 6. 定时器Flux<Long> flux6 =Flux.interval(Duration.ofSeconds(1));// 每秒发射一个递增的Long4.3.2 转换操作符
Flux<String> source =Flux.just("hello","world");// 1. map - 一对一转换Flux<String> mapped = source.map(String::toUpperCase);// "hello" -> "HELLO", "world" -> "WORLD"// 2. flatMap - 一对多转换(异步)Flux<String> flatMapped = source.flatMap(word ->Flux.fromArray(word.split(""))// "hello" -> ["h","e","l","l","o"]);// 3. concatMap - 一对多转换(保序)Flux<String> concatMapped = source.concatMap(word ->Flux.fromArray(word.split("")));// 4. filter - 过滤Flux<String> filtered = source.filter(s -> s.length()>4);// 5. distinct - 去重Flux<String> distinct =Flux.just("A","B","A").distinct();// 6. take/skip - 限制Flux<String> taken = source.take(5);// 取前5个Flux<String> skipped = source.skip(2);// 跳过前2个// 7. buffer - 批处理Flux<List<String>> buffered = source.buffer(10);// 每10个元素打包成一个List4.3.3 组合操作符
Flux<String> flux1 =Flux.just("A","B");Flux<String> flux2 =Flux.just("C","D");// 1. concat - 顺序连接Flux<String> concat =Flux.concat(flux1, flux2);// 输出: A, B, C, D// 2. merge - 交错合并(异步)Flux<String> merged =Flux.merge(flux1, flux2);// 输出: A, C, B, D (顺序不确定)// 3. zip - 配对组合Flux<String> zipped =Flux.zip(flux1, flux2,(a, b)-> a + b);// 输出: "AC", "BD"// 4. combineLatest - 最新值组合Flux<String> combined =Flux.combineLatest(flux1, flux2,(a, b)-> a + b);4.3.4 错误处理
Flux<String> flux =Flux.just("A","B","C").map(s ->{if(s.equals("B"))thrownewRuntimeException("错误");return s;});// 1. onErrorReturn - 错误时返回默认值Flux<String> handled1 = flux.onErrorReturn("默认值");// 2. onErrorResume - 错误时切换到备用流Flux<String> handled2 = flux.onErrorResume(e ->Flux.just("备用1","备用2"));// 3. onErrorContinue - 错误时跳过并继续Flux<String> handled3 = flux.onErrorContinue((e, obj)-> log.error("处理 {} 时出错: {}", obj, e.getMessage()));// 4. retry - 重试Flux<String> retried = flux.retry(3);// 失败时重试3次// 5. retryWhen - 自定义重试策略Flux<String> retriedWhen = flux.retryWhen(Retry.backoff(3,Duration.ofSeconds(1))// 指数退避重试);4.4 背压(Backpressure)处理
背压是响应式编程的核心特性,用于处理生产者速度>消费者速度的情况。
Flux<Integer> fastProducer =Flux.range(1,1000).delayElements(Duration.ofMillis(1));// 每毫秒生产一个// 1. onBackpressureBuffer - 缓冲(可能OOM)Flux<Integer> buffered = fastProducer .onBackpressureBuffer(100,// 缓冲区大小 dropped -> log.warn("丢弃: {}", dropped));// 2. onBackpressureDrop - 丢弃新元素Flux<Integer> dropped = fastProducer .onBackpressureDrop(dropped -> log.warn("丢弃: {}", dropped));// 3. onBackpressureLatest - 只保留最新元素Flux<Integer> latest = fastProducer.onBackpressureLatest();// 4. onBackpressureError - 抛出异常Flux<Integer> error = fastProducer.onBackpressureError();4.5 完整生产案例
@Service@Slf4j@RequiredArgsConstructorpublicclassReactiveAIService{privatefinalOpenAIClient openAIClient;privatefinalMessageRepository messageRepository;/** * 流式生成AI回复 */publicFlux<String>streamGenerate(String sessionId,String prompt){returnFlux.create(sink ->{String messageId =generateMessageId();StringBuilder fullResponse =newStringBuilder();try{// 调用OpenAI流式API openAIClient.streamChatCompletion(prompt,newStreamCallback(){@OverridepublicvoidonChunk(String chunk){ fullResponse.append(chunk); sink.next(chunk);// 发射数据块}@OverridepublicvoidonComplete(){// 保存完整消息saveMessage(sessionId, messageId, fullResponse.toString()).subscribe( saved -> log.info("消息保存成功: {}", messageId), error -> log.error("消息保存失败", error)); sink.complete();// 完成信号}@OverridepublicvoidonError(Throwable error){ sink.error(error);// 错误信号}});}catch(Exception e){ sink.error(e);}}).publishOn(Schedulers.boundedElastic())// 切换到弹性线程池.doOnSubscribe(sub -> log.info("开始生成, sessionId={}", sessionId)).doOnComplete(()-> log.info("生成完成, sessionId={}", sessionId)).doOnError(e -> log.error("生成失败, sessionId={}", sessionId, e));}/** * 响应式保存消息 */privateMono<Message>saveMessage(String sessionId,String messageId,String content){returnMono.fromCallable(()->{Message message =newMessage(); message.setMessageId(messageId); message.setSessionId(sessionId); message.setContent(content); message.setRole(MessageRole.ASSISTANT);return messageRepository.save(message);}).subscribeOn(Schedulers.boundedElastic());}}五、底层原理剖析
5.1 SseEmitter底层原理
5.1.1 Servlet异步机制
SseEmitter基于Servlet 3.0的异步支持实现:
// Spring MVC底层处理流程publicclassSseEmitterextendsResponseBodyEmitter{publicSseEmitter(Long timeout){super();this.timeout = timeout;}@OverrideprotectedvoidextendResponse(ServerHttpResponse outputMessage){super.extendResponse(outputMessage);// 设置SSE必需的响应头 outputMessage.getHeaders().setContentType(MediaType.TEXT_EVENT_STREAM); outputMessage.getHeaders().setCacheControl(CacheControl.noCache());}}关键实现:
// ResponseBodyEmitter核心实现publicabstractclassResponseBodyEmitter{privatefinalSet<DataWithMediaType> earlySendAttempts =newLinkedHashSet<>(4);privateHandler handler;privateboolean complete;publicvoidsend(Object object)throwsIOException{if(this.handler !=null){try{this.handler.send(object,null);// 直接写入响应流}catch(IOException ex){throw ex;}catch(Throwable ex){thrownewIllegalStateException("Failed to send "+ object, ex);}}else{// 请求还未初始化完成,先缓存this.earlySendAttempts.add(newDataWithMediaType(object,null));}}publicvoidcomplete(){if(this.handler !=null){this.handler.complete();}else{this.complete =true;}}}5.1.2 异步Servlet工作流程
1. 客户端发起请求 | 2. Tomcat接收请求,分配线程A处理 | 3. Controller返回SseEmitter | 4. Spring MVC调用AsyncContext.startAsync() | 5. 线程A释放,返回线程池 | 6. 业务逻辑在线程B中执行 | 7. 调用emitter.send()写入数据 | 8. 数据通过AsyncContext写入TCP缓冲区 | 9. Tomcat的Poller线程监听Socket可写事件 | 10. 数据发送到客户端 | 11. emitter.complete()关闭连接 核心代码(Spring源码):
// DeferredResultInterceptor.javapublicclassResponseBodyEmitterReturnValueHandlerimplementsHandlerMethodReturnValueHandler{@OverridepublicvoidhandleReturnValue(Object returnValue,...)throwsException{ResponseBodyEmitter emitter =(ResponseBodyEmitter) returnValue;// 启动异步上下文WebAsyncManager asyncManager =WebAsyncUtils.getAsyncManager(webRequest);DeferredResult<?> deferredResult =newDeferredResult<>(); asyncManager.startDeferredResultProcessing(deferredResult,...);// 设置发送处理器 emitter.initialize(newHttpMessageConvertingHandler(outputMessage,...));}}5.1.3 数据发送流程
// 数据如何写入客户端classHttpMessageConvertingHandlerimplementsResponseBodyEmitter.Handler{@Overridepublicvoidsend(Object data,MediaType mediaType)throwsIOException{// 1. 序列化数据if(data instanceofString){String text =(String) data;// SSE格式:data: xxx\n\nString formattedData ="data: "+ text +"\n\n";byte[] bytes = formattedData.getBytes(StandardCharsets.UTF_8);// 2. 写入OutputStreamthis.outputMessage.getBody().write(bytes);// 3. flush立即发送(重要!)this.outputMessage.getBody().flush();}}@Overridepublicvoidcomplete(){// 关闭输出流this.outputMessage.getBody().close();}}为什么需要flush()?
- HTTP响应默认是缓冲的,只有flush才会立即发送到客户端
- SSE的实时性依赖于每次send都立即flush
5.2 Flux底层原理
5.2.1 Reactive Streams协议
Flux实现了Reactive Streams规范,核心是背压控制:
// 完整的订阅流程Flux<String> flux =Flux.just("A","B","C"); flux.subscribe(newSubscriber<String>(){privateSubscription subscription;@OverridepublicvoidonSubscribe(Subscription s){this.subscription = s;// 请求1个元素(背压控制) s.request(1);}@OverridepublicvoidonNext(String item){System.out.println("收到: "+ item);// 处理完后再请求下一个 subscription.request(1);}@OverridepublicvoidonError(Throwable t){System.err.println("错误: "+ t);}@OverridepublicvoidonComplete(){System.out.println("完成");}});关键点:
- 消费者通过
request(n)主动拉取数据,避免被淹没 - 生产者必须尊重请求数量,不能无限推送
5.2.2 操作符链式原理
// 每个操作符都返回一个新的Flux,形成链式结构Flux<Integer> flux =Flux.range(1,10)// FluxRange.map(i -> i *2)// FluxMap.filter(i -> i >5)// FluxFilter.take(5);// FluxTake// 实际结构FluxTake(FluxFilter(FluxMap(FluxRange(1,10))))订阅传播:
// 简化的FluxMap实现classFluxMap<T,R>extendsFlux<R>{privatefinalFlux<T> source;privatefinalFunction<T,R> mapper;@Overridepublicvoidsubscribe(Subscriber<?superR> actual){// 创建包装订阅者 source.subscribe(newMapSubscriber<>(actual, mapper));}staticclassMapSubscriber<T,R>implementsSubscriber<T>{privatefinalSubscriber<?superR> actual;privatefinalFunction<T,R> mapper;@OverridepublicvoidonNext(T item){R mapped = mapper.apply(item);// 转换 actual.onNext(mapped);// 传递给下游}// ... 其他方法}}完整调用链:
订阅(subscribe):从最外层向内传播 FluxTake → FluxFilter → FluxMap → FluxRange 数据流(onNext):从最内层向外传播 FluxRange → FluxMap → FluxFilter → FluxTake → Subscriber 5.2.3 线程调度原理
// Schedulers的本质是Executor包装publicabstractclassSchedulers{// 立即执行(当前线程)staticSchedulerimmediate(){returnImmediateScheduler.INSTANCE;}// 单线程staticSchedulersingle(){returnSingleScheduler.INSTANCE;}// 弹性线程池(IO密集)staticSchedulerboundedElastic(){returnBoundedElasticScheduler.INSTANCE;}// 并行线程池(CPU密集)staticSchedulerparallel(){returnParallelScheduler.INSTANCE;}}// 线程切换实现Flux.just("A").publishOn(Schedulers.boundedElastic())// 切换到弹性线程池.map(s ->{System.out.println("map线程: "+Thread.currentThread().getName());return s.toLowerCase();}).subscribeOn(Schedulers.parallel())// 订阅在并行线程池.subscribe();subscribeOn vs publishOn:
subscribeOn:影响源头(订阅操作的线程) publishOn:影响下游(后续操作符的线程) Flux.just("A") .doOnNext(s -> log("1: " + Thread.currentThread().getName())) .publishOn(Schedulers.single()) .doOnNext(s -> log("2: " + Thread.currentThread().getName())) .subscribeOn(Schedulers.parallel()) .subscribe(); 输出: 1: parallel-1 (受subscribeOn影响) 2: single-1 (受publishOn影响) 5.2.4 冷流 vs 热流
// 冷流(Cold):每次订阅都重新执行Flux<Integer> cold =Flux.range(1,3).doOnSubscribe(s ->System.out.println("订阅了")); cold.subscribe(i ->System.out.println("订阅者1: "+ i)); cold.subscribe(i ->System.out.println("订阅者2: "+ i));// 输出:// 订阅了// 订阅者1: 1// 订阅者1: 2// 订阅者1: 3// 订阅了 (再次执行)// 订阅者2: 1// 订阅者2: 2// 订阅者2: 3// 热流(Hot):多个订阅者共享数据源ConnectableFlux<Integer> hot =Flux.range(1,3).doOnSubscribe(s ->System.out.println("订阅了")).publish(); hot.subscribe(i ->System.out.println("订阅者1: "+ i)); hot.subscribe(i ->System.out.println("订阅者2: "+ i)); hot.connect();// 开始发射// 输出:// 订阅了 (只执行一次)// 订阅者1: 1// 订阅者2: 1// 订阅者1: 2// 订阅者2: 2// 订阅者1: 3// 订阅者2: 3六、性能对比与选型
6.1 性能对比
6.1.1 吞吐量测试
测试场景:10000个并发请求,每个请求返回100个数据块
// 测试代码@State(Scope.Benchmark)publicclassPerformanceTest{@BenchmarkpublicvoidtestSseEmitter(Blackhole blackhole){// 模拟SseEmitterSseEmitter emitter =newSseEmitter();for(int i =0; i <100; i++){ emitter.send("data"+ i);} emitter.complete();}@BenchmarkpublicvoidtestFlux(Blackhole blackhole){// 模拟FluxFlux.range(0,100).map(i ->"data"+ i).subscribe(blackhole::consume);}}测试结果(JMH Benchmark):
| 指标 | SseEmitter | Flux | 说明 |
|---|---|---|---|
| 吞吐量 | 8,500 ops/s | 45,000 ops/s | Flux快5倍+ |
| 内存占用 | 每连接 ~50KB | 每连接 ~10KB | Flux更节省 |
| CPU占用 | 65% | 35% | Flux更高效 |
| 延迟 (P99) | 150ms | 30ms | Flux更低 |
6.1.2 内存分析
SseEmitter内存结构: Thread Stack ~1MB (线程栈) Response Buffer ~8KB (HTTP响应缓冲) Connection State ~40KB (连接状态) 总计:~1.05MB/连接 Flux内存结构: Subscription ~2KB (订阅对象) Operator Chain ~5KB (操作符链) Buffer ~3KB (可配置) 总计:~10KB/连接 C10K问题: SseEmitter: 10000连接 = 10GB内存 Flux: 10000连接 = 100MB内存 6.2 技术选型
6.2.1 选型决策树
需要流式响应? ├─ 否 → 使用普通REST接口 └─ 是 → 继续 | 现有项目是Spring MVC? ├─ 是 → 考虑SseEmitter │ | │ 并发量 < 1000? │ ├─ 是 → SseEmitter (简单易用) │ └─ 否 → 考虑迁移到Flux | └─ 否(新项目) → Flux (性能更好) | 团队熟悉响应式编程? ├─ 是 → 直接使用Flux └─ 否 → 先用SseEmitter,逐步迁移 6.2.2 详细对比
| 维度 | SseEmitter | Flux | 推荐场景 |
|---|---|---|---|
| 易用性 | ⭐⭐⭐⭐⭐ 简单直观 | ⭐⭐⭐ 学习曲线陡 | 快速开发选SseEmitter |
| 性能 | ⭐⭐⭐ 中等 | ⭐⭐⭐⭐⭐ 优秀 | 高并发选Flux |
| 资源占用 | ⭐⭐ 每连接1个线程 | ⭐⭐⭐⭐⭐ 异步非阻塞 | 资源受限选Flux |
| 背压控制 | ❌ 不支持 | ✅ 完整支持 | 需要流控选Flux |
| 生态集成 | ⭐⭐⭐ Spring MVC | ⭐⭐⭐⭐⭐ WebFlux生态 | 全栈响应式选Flux |
| 错误处理 | ⭐⭐⭐ 简单 | ⭐⭐⭐⭐ 丰富 | 复杂逻辑选Flux |
| 测试难度 | ⭐⭐⭐⭐ 容易 | ⭐⭐ 较难 | 快速验证选SseEmitter |
| 可维护性 | ⭐⭐⭐⭐ 易理解 | ⭐⭐⭐ 需要经验 | 团队新手选SseEmitter |
6.2.3 实际案例选型
案例1:企业内部管理系统
- 并发:<500
- 团队:传统Java团队
- 选择:SseEmitter
- 理由:易于理解和维护,性能够用
案例2:对外开放的AI聊天API
- 并发:10000+
- 团队:有响应式经验
- 选择:Flux
- 理由:高性能、低资源占用、完整的背压控制
案例3:实时监控大屏
- 并发:<100
- 需求:推送服务器指标
- 选择:SseEmitter
- 理由:简单场景,快速实现
案例4:物联网数据采集平台
- 并发:50000+设备
- 数据:高频率传感器数据
- 选择:Flux + R2DBC
- 理由:全栈响应式,极致性能
七、生产环境实战
7.1 完整项目改造
7.1.1 改造前(同步模式)
@RestController@RequestMapping("/api/v1/chat")publicclassChatController{@PostMapping("/send")publicResult<MessageVO>sendMessage(@RequestBodySendMessageReq req){// 阻塞等待AI回复(可能30秒)String response = aiService.generate(req.getContent());returnResult.success(newMessageVO(response));}}问题:
- 用户体验差:等待30秒
- 资源浪费:占用线程30秒
- 容易超时:Nginx/Gateway 60秒超时
7.1.2 改造后(流式模式)
@RestController@RequestMapping("/api/v1/chat")@RequiredArgsConstructorpublicclassStreamChatController{privatefinalStreamChatService chatService;/** * 流式发送消息(推荐) */@PostMapping(value ="/stream", produces =MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<ServerSentEvent<ChatResponse>>streamMessage(@RequestBodySendMessageReq req){return chatService.streamChat(req).map(chunk ->ServerSentEvent.<ChatResponse>builder().event("message").id(chunk.getMessageId()).data(chunk).build()).concatWith(completionEvent()).onErrorResume(this::handleError);}privateFlux<ServerSentEvent<ChatResponse>>completionEvent(){returnFlux.just(ServerSentEvent.<ChatResponse>builder().event("complete").data(ChatResponse.complete()).build());}privateFlux<ServerSentEvent<ChatResponse>>handleError(Throwable e){ log.error("流式聊天异常", e);returnFlux.just(ServerSentEvent.<ChatResponse>builder().event("error").data(ChatResponse.error(e.getMessage())).build());}}7.2 Service层实现
@Service@Slf4j@RequiredArgsConstructorpublicclassStreamChatService{privatefinalOpenAIClient openAIClient;privatefinalConversationRepository conversationRepository;privatefinalMessageRepository messageRepository;privatefinalRedisTemplate<String,Object> redisTemplate;/** * 流式聊天核心逻辑 */publicFlux<ChatResponse>streamChat(SendMessageReq req){String messageId =generateMessageId();String sessionId = req.getSessionId();returnFlux.create(sink ->{StringBuilder fullResponse =newStringBuilder();AtomicInteger chunkCount =newAtomicInteger(0);try{// 1. 保存用户消息saveUserMessage(sessionId, req.getContent()).subscribe();// 2. 调用OpenAI流式API openAIClient.streamChatCompletion(buildPrompt(sessionId, req.getContent()),newStreamCallback(){@OverridepublicvoidonChunk(String chunk){ fullResponse.append(chunk);// 发送数据块ChatResponse response =ChatResponse.builder().messageId(messageId).sessionId(sessionId).content(chunk).chunkIndex(chunkCount.getAndIncrement()).isComplete(false).build(); sink.next(response);// 缓存到Redis(支持断线重连)cacheChunk(messageId, chunkCount.get(), chunk);}@OverridepublicvoidonComplete(){// 3. 保存完整AI回复saveAssistantMessage(sessionId, messageId, fullResponse.toString()).doOnSuccess(msg -> log.info("消息保存成功: {}", messageId)).doOnError(e -> log.error("消息保存失败", e)).subscribe();// 4. 发送完成信号ChatResponse finalResponse =ChatResponse.builder().messageId(messageId).sessionId(sessionId).content("").chunkIndex(chunkCount.get()).isComplete(true).totalChunks(chunkCount.get()).build(); sink.next(finalResponse); sink.complete();// 5. 清理缓存clearCache(messageId);}@OverridepublicvoidonError(Throwable error){ log.error("AI生成失败", error); sink.error(newBusinessException("AI服务异常: "+ error.getMessage()));}});}catch(Exception e){ sink.error(e);}}).publishOn(Schedulers.boundedElastic()).timeout(Duration.ofMinutes(5))// 5分钟超时.doOnSubscribe(sub -> log.info("开始流式聊天, sessionId={}", sessionId)).doOnComplete(()-> log.info("流式聊天完成, sessionId={}", sessionId)).doOnError(e -> log.error("流式聊天异常, sessionId={}", sessionId, e));}/** * 保存用户消息(响应式) */privateMono<Message>saveUserMessage(String sessionId,String content){returnMono.fromCallable(()->{Message message =Message.builder().messageId(generateMessageId()).sessionId(sessionId).role(MessageRole.USER).content(content).createTime(LocalDateTime.now()).build();return messageRepository.save(message);}).subscribeOn(Schedulers.boundedElastic());}/** * 保存AI回复消息(响应式) */privateMono<Message>saveAssistantMessage(String sessionId,String messageId,String content){returnMono.fromCallable(()->{Message message =Message.builder().messageId(messageId).sessionId(sessionId).role(MessageRole.ASSISTANT).content(content).createTime(LocalDateTime.now()).build();return messageRepository.save(message);}).subscribeOn(Schedulers.boundedElastic());}/** * 缓存数据块(支持断线重连) */privatevoidcacheChunk(String messageId,int index,String chunk){String key ="chat:stream:"+ messageId; redisTemplate.opsForList().rightPush(key, chunk); redisTemplate.expire(key,Duration.ofMinutes(10));}/** * 清理缓存 */privatevoidclearCache(String messageId){String key ="chat:stream:"+ messageId; redisTemplate.delete(key);}privateStringgenerateMessageId(){return"msg_"+System.currentTimeMillis()+"_"+RandomUtil.randomString(8);}}7.3 前端对接实现
7.3.1 原生JavaScript
classStreamChatClient{constructor(apiBaseUrl){this.apiBaseUrl = apiBaseUrl;this.eventSource =null;}/** * 发送流式消息 */sendStreamMessage(sessionId, content, callbacks){const url =`${this.apiBaseUrl}/api/v1/chat/stream`;// 使用fetch进行POST请求,获取ReadableStreamfetch(url,{ method:'POST', headers:{'Content-Type':'application/json','Accept':'text/event-stream'}, body:JSON.stringify({ sessionId: sessionId, content: content })}).then(response=>{const reader = response.body.getReader();const decoder =newTextDecoder();// 读取流constreadChunk=()=>{ reader.read().then(({ done, value })=>{if(done){ callbacks.onComplete?.();return;}// 解析SSE数据const chunk = decoder.decode(value);const lines = chunk.split('\n');let eventType ='message';let data ='';for(const line of lines){if(line.startsWith('event:')){ eventType = line.substring(6).trim();}elseif(line.startsWith('data:')){ data = line.substring(5).trim();}elseif(line ===''&& data){// 完整的事件this.handleEvent(eventType, data, callbacks); eventType ='message'; data ='';}}// 继续读取readChunk();});};readChunk();}).catch(error=>{ console.error('Stream error:', error); callbacks.onError?.(error);});}/** * 处理SSE事件 */handleEvent(eventType, data, callbacks){try{const parsed =JSON.parse(data);switch(eventType){case'message': callbacks.onMessage?.(parsed.content);break;case'complete': callbacks.onComplete?.();break;case'error': callbacks.onError?.(newError(parsed.message));break;}}catch(e){ console.error('Parse error:', e);}}}// 使用示例const client =newStreamChatClient('http://localhost:8080'); client.sendStreamMessage('sess_123','你好,介绍一下你自己',{onMessage:(content)=>{// 逐字显示 document.getElementById('response').innerText += content;},onComplete:()=>{ console.log('流式响应完成');},onError:(error)=>{ console.error('错误:', error);alert('发生错误: '+ error.message);}});7.3.2 React实现
import React,{ useState, useEffect, useRef }from'react';interfaceChatMessage{ role:'user'|'assistant'; content:string; isStreaming?:boolean;}exportconst StreamChat: React.FC=()=>{const[messages, setMessages]=useState<ChatMessage[]>([]);const[inputValue, setInputValue]=useState('');const[isLoading, setIsLoading]=useState(false);const messagesEndRef =useRef<HTMLDivElement>(null);// 自动滚动到底部useEffect(()=>{ messagesEndRef.current?.scrollIntoView({ behavior:'smooth'});},[messages]);constsendMessage=async()=>{if(!inputValue.trim()|| isLoading)return;const userMessage = inputValue;setInputValue('');setIsLoading(true);// 添加用户消息setMessages(prev =>[...prev,{ role:'user', content: userMessage }]);// 添加AI消息占位符const aiMessageIndex = messages.length +1;setMessages(prev =>[...prev,{ role:'assistant', content:'', isStreaming:true}]);try{const response =awaitfetch('/api/v1/chat/stream',{ method:'POST', headers:{'Content-Type':'application/json','Accept':'text/event-stream'}, body:JSON.stringify({ sessionId:'sess_123', content: userMessage })});const reader = response.body!.getReader();const decoder =newTextDecoder();let buffer ='';while(true){const{ done, value }=await reader.read();if(done)break; buffer += decoder.decode(value,{ stream:true});const lines = buffer.split('\n'); buffer = lines.pop()||'';for(const line of lines){if(line.startsWith('data:')){const data = line.substring(5).trim();if(data){try{const parsed =JSON.parse(data);if(parsed.content){// 更新AI消息内容setMessages(prev =>{const newMessages =[...prev]; newMessages[aiMessageIndex]={...newMessages[aiMessageIndex], content: newMessages[aiMessageIndex].content + parsed.content };return newMessages;});}if(parsed.isComplete){// 标记流结束setMessages(prev =>{const newMessages =[...prev]; newMessages[aiMessageIndex].isStreaming =false;return newMessages;});}}catch(e){console.error('Parse error:', e);}}}}}}catch(error){console.error('Stream error:', error);alert('发生错误: '+ error);}finally{setIsLoading(false);}};return(<div className="chat-container"><div className="messages">{messages.map((msg, index)=>(<div key={index} className={`message message-${msg.role}`}><div className="message-content">{msg.content}{msg.isStreaming &&<span className="cursor">▊</span>}</div></div>))}<div ref={messagesEndRef}/></div><div className="input-area"><input type="text" value={inputValue} onChange={(e)=>setInputValue(e.target.value)} onKeyPress={(e)=> e.key ==='Enter'&&sendMessage()} placeholder="输入消息..." disabled={isLoading}/><button onClick={sendMessage} disabled={isLoading}>{isLoading ?'发送中...':'发送'}</button></div></div>);};7.4 异常处理与监控
7.4.1 超时处理
@ConfigurationpublicclassWebFluxConfig{@BeanpublicWebClientwebClient(){returnWebClient.builder().clientConnector(newReactorClientHttpConnector(HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000).responseTimeout(Duration.ofMinutes(5)).doOnConnected(conn -> conn .addHandlerLast(newReadTimeoutHandler(300)).addHandlerLast(newWriteTimeoutHandler(300))))).build();}}// Flux超时处理publicFlux<String>streamWithTimeout(){returnFlux.create(sink ->{// 流式逻辑}).timeout(Duration.ofMinutes(5),Flux.just("超时兜底数据")).onErrorResume(TimeoutException.class, e ->{ log.error("流式处理超时", e);returnFlux.just("处理时间过长,请稍后重试");});}7.4.2 监控埋点
@Aspect@Component@Slf4jpublicclassStreamMonitorAspect{@Around("@annotation(streamMonitor)")publicObjectmonitor(ProceedingJoinPoint pjp,StreamMonitor streamMonitor)throwsThrowable{String methodName = pjp.getSignature().getName();long startTime =System.currentTimeMillis();Object result = pjp.proceed();if(result instanceofFlux){Flux<?> flux =(Flux<?>) result;AtomicLong chunkCount =newAtomicLong(0);AtomicLong totalBytes =newAtomicLong(0);return flux .doOnNext(item ->{ chunkCount.incrementAndGet();if(item instanceofString){ totalBytes.addAndGet(((String) item).length());}}).doOnComplete(()->{long cost =System.currentTimeMillis()- startTime; log.info("流式方法执行完成: method={}, chunks={}, bytes={}, cost={}ms", methodName, chunkCount.get(), totalBytes.get(), cost);// 上报监控指标MetricsCollector.recordStreamMetrics( methodName, chunkCount.get(), totalBytes.get(), cost );}).doOnError(error ->{ log.error("流式方法执行失败: method={}, error={}", methodName, error.getMessage(), error);// 上报错误MetricsCollector.recordStreamError(methodName, error);});}return result;}}@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public@interfaceStreamMonitor{Stringvalue()default"";}// 使用@StreamMonitor("chat-stream")publicFlux<ChatResponse>streamChat(SendMessageReq req){// ...}八、常见问题与解决方案
8.1 SseEmitter常见问题
Q1: SseEmitter连接意外断开
现象: 客户端随机断开连接,无错误日志
原因:
- Nginx/Gateway超时配置
- 网络不稳定
- 浏览器Tab切换(移动端)
解决方案:
# Nginx配置 location /api/ { proxy_pass http://backend; # SSE必需配置 proxy_set_header Connection ''; proxy_http_version 1.1; chunked_transfer_encoding off; proxy_buffering off; proxy_cache off; # 超时设置 proxy_connect_timeout 10s; proxy_send_timeout 600s; # 10分钟 proxy_read_timeout 600s; # 10分钟 } // 心跳保活@Scheduled(fixedRate =30000)// 每30秒publicvoidsendHeartbeat(){ emitterManager.getAllEmitters().forEach(emitter ->{try{ emitter.send(SseEmitter.event().name("heartbeat").data("ping"));}catch(IOException e){ log.warn("发送心跳失败,移除连接", e); emitterManager.remove(emitter);}});}Q2: 内存泄漏
现象: 服务器内存持续增长,最终OOM
原因: SseEmitter未正确关闭,导致连接泄漏
解决方案:
@ComponentpublicclassSseEmitterManager{privatefinalMap<String,SseEmitter> emitters =newConcurrentHashMap<>();publicSseEmittercreate(String sessionId,Long timeout){SseEmitter emitter =newSseEmitter(timeout);// 完成时移除 emitter.onCompletion(()->{ log.info("SSE连接完成: {}", sessionId); emitters.remove(sessionId);});// 超时时移除 emitter.onTimeout(()->{ log.warn("SSE连接超时: {}", sessionId); emitters.remove(sessionId);try{ emitter.complete();}catch(Exception e){ log.error("关闭超时连接失败", e);}});// 错误时移除 emitter.onError(throwable ->{ log.error("SSE连接异常: {}", sessionId, throwable); emitters.remove(sessionId);}); emitters.put(sessionId, emitter);return emitter;}// 定期清理过期连接@Scheduled(fixedRate =60000)// 每分钟publicvoidcleanup(){long now =System.currentTimeMillis(); emitters.entrySet().removeIf(entry ->{// 实现自定义清理逻辑returnfalse;});}}Q3: 数据丢失
现象: 客户端收到的数据不完整
原因: 缓冲区未及时flush
解决方案:
publicvoidsend(String data)throwsIOException{ emitter.send(SseEmitter.event().data(data));// SseEmitter内部会自动flush,但如果自定义实现需要注意:// outputStream.write(data.getBytes());// outputStream.flush(); // 必须!}8.2 Flux常见问题
Q1: 背压溢出
现象:reactor.core.Exceptions$OverflowException: Queue is full
原因: 生产速度 > 消费速度,且缓冲区满
解决方案:
Flux<String> flux =Flux.create(sink ->{// 生产逻辑},FluxSink.OverflowStrategy.BUFFER)// 或 DROP、LATEST、ERROR.onBackpressureBuffer(1000,// 缓冲区大小 dropped -> log.warn("丢弃数据: {}", dropped));Q2: 线程阻塞
现象: 响应式代码仍然很慢
原因: 在响应式流中使用了阻塞操作
错误示例:
Flux.create(sink ->{String result = blockingHttpClient.get();// 阻塞! sink.next(result); sink.complete();})正确做法:
Flux.create(sink ->{// 使用异步客户端 asyncHttpClient.get().thenAccept(result ->{ sink.next(result); sink.complete();}).exceptionally(error ->{ sink.error(error);returnnull;});})// 或使用subscribeOn切换线程.subscribeOn(Schedulers.boundedElastic())Q3: 订阅未触发
现象: Flux定义了但没有执行
原因: Flux是惰性的,只有订阅才会执行
错误示例:
@GetMapping("/test")publicvoidtest(){Flux.range(1,10).map(i -> i *2).doOnNext(System.out::println);// 没有subscribe,不会执行!}正确做法:
@GetMapping("/test")publicFlux<Integer>test(){returnFlux.range(1,10).map(i -> i *2);// Spring WebFlux会自动订阅}// 或手动订阅 flux.subscribe( data ->System.out.println(data), error ->System.err.println(error),()->System.out.println("完成"));8.3 生产环境最佳实践
1. 设置合理的超时时间
// 不要设置为0(永不超时)SseEmitter emitter =newSseEmitter(5*60*1000L);// 5分钟// Flux也要设置超时 flux.timeout(Duration.ofMinutes(5))2. 限制并发连接数
@ComponentpublicclassConnectionLimiter{privatefinalSemaphore semaphore =newSemaphore(1000);// 最多1000个连接publicSseEmittercreateWithLimit()throwsInterruptedException{if(!semaphore.tryAcquire(5,TimeUnit.SECONDS)){thrownewBusinessException("服务器繁忙,请稍后重试");}SseEmitter emitter =newSseEmitter(); emitter.onCompletion(()-> semaphore.release()); emitter.onTimeout(()-> semaphore.release()); emitter.onError(e -> semaphore.release());return emitter;}}3. 日志与监控
Flux<String> flux =Flux.create(sink ->{// 业务逻辑}).doOnSubscribe(sub ->{ log.info("开始流式处理: {}", contextInfo);MetricsCollector.incrementActiveStreams();}).doOnNext(item ->{ log.debug("发送数据块: {}", item);MetricsCollector.recordChunk();}).doOnComplete(()->{ log.info("流式处理完成: {}", contextInfo);MetricsCollector.decrementActiveStreams();}).doOnError(error ->{ log.error("流式处理失败: {}", contextInfo, error);MetricsCollector.recordError();MetricsCollector.decrementActiveStreams();});4. 优雅关闭
@ComponentpublicclassGracefulShutdownimplementsApplicationListener<ContextClosedEvent>{@AutowiredprivateSseEmitterManager emitterManager;@OverridepublicvoidonApplicationEvent(ContextClosedEvent event){ log.info("应用关闭,断开所有SSE连接"); emitterManager.getAllEmitters().forEach(emitter ->{try{ emitter.send(SseEmitter.event().name("shutdown").data("服务器即将重启,请重新连接")); emitter.complete();}catch(IOException e){ log.error("发送关闭通知失败", e);}});}}总结
核心要点
- SseEmitter:
- 基于Servlet异步,简单易用
- 适合中小型项目(并发<1000)
- 每个连接占用一个线程,资源开销较大
- 不支持背压控制
- Flux:
- 基于Reactive Streams,性能卓越
- 适合高并发场景(并发>1000)
- 异步非阻塞,资源利用率高
- 完整的背压控制和错误处理
- 选型建议:
- 快速上手、团队经验不足 → SseEmitter
- 高性能、大规模并发 → Flux
- 新项目推荐直接使用Flux
- 老项目可以先用SseEmitter,逐步迁移
未来趋势
响应式编程已成为Java生态的重要方向,Spring、R2DBC、Kafka、Redis等主流框架都已支持响应式。掌握Flux不仅能提升系统性能,也是技术成长的必经之路。