别再只用replaceAll了手把手教你用Reactor的scan操作符处理AI流式响应的跨分块标签在构建现代AI对话系统时开发者常常会遇到一个棘手的问题如何处理流式响应中被拆分的结构化标签。想象这样一个场景——当AI返回包含think标签的响应时这类标签通常用于隐藏AI的推理过程如果标签内容恰好被分割到不同的数据块中简单的字符串替换就会彻底失效。这正是本文要解决的核心痛点。1. 为什么传统字符串处理会失败当AI服务以流式方式返回响应时数据会被拆分成多个小块chunk逐步传输。这种机制虽然能显著提升用户体验却给标签处理带来了独特挑战。假设我们收到以下两个连续的数据块第一个chunk: 人类可以think这是 第二个chunk: 一个测试/think理解吗使用replaceAll(think.*/think, )这样的正则表达式会完全失效因为跨块匹配问题正则表达式无法跨多个chunk进行匹配状态丢失简单的字符串处理无法记住前一个chunk是否开启了标签内容截断风险直接替换可能导致有效内容被意外删除更糟糕的是这类问题往往在测试中难以发现因为标签拆分取决于网络条件和响应内容长度属于典型的边界条件问题。2. Reactor的scan操作符与状态机模式2.1 scan操作符的核心优势在响应式编程中scan操作符提供了一种优雅的解决方案。与map不同scan会维护一个累积状态对每个输入元素应用转换函数后既产生输出又更新状态。这种特性使其特别适合处理需要记忆的场景。FluxString processed rawFlux.scan(initialState, (state, chunk) - { // 处理当前chunk并返回新状态 return processChunk(state, chunk); });对比常见操作符操作符状态维护适用场景map无简单1:1转换filter无数据筛选scan有需要累积状态的转换2.2 状态机设计模式状态机是解决这类问题的经典模式。对于标签处理我们可以定义两种基本状态NORMAL正常处理文本内容INSIDE_TAG处于标签内部如think和/think之间状态转换规则如下NORMAL - 遇到think - INSIDE_TAG INSIDE_TAG - 遇到/think - NORMAL提示状态机应该设计为不可变对象确保线程安全3. 完整实现方案3.1 定义状态对象首先创建一个专门处理标签状态的对象public class TagProcessorState { private final boolean insideTag; private final StringBuilder buffer; // 初始化方法等... public TagProcessorState process(String chunk) { StringBuilder output new StringBuilder(); String remaining chunk; while (!remaining.isEmpty()) { if (!insideTag) { int tagStart remaining.indexOf(think); if (tagStart -1) { output.append(remaining); break; } output.append(remaining.substring(0, tagStart)); remaining remaining.substring(tagStart think.length()); insideTag true; } else { int tagEnd remaining.indexOf(/think); if (tagEnd -1) break; remaining remaining.substring(tagEnd /think.length()); insideTag false; } } return new TagProcessorState(insideTag, buffer.append(output)); } }3.2 构建处理管道将状态机与scan操作符结合FluxResponseDTO processedStream aiService.streamRequest(prompt) .scan(new TagProcessorState(false, new StringBuilder()), (state, response) - state.process(response.getText())) .filter(state - !state.getContent().isEmpty()) .map(state - { ResponseDTO dto new ResponseDTO(); dto.setText(state.getContent()); return dto; });关键点说明初始状态创建未进入标签的状态实例状态传递每个chunk处理后会返回新状态内容过滤过滤掉空内容避免无效传输结果转换将处理后的状态转换为响应DTO4. 高级应用与优化4.1 处理多种标签类型当系统需要处理多种标签如think、reasoning等时可以扩展状态机enum TagType { THINK, REASONING, NONE } class MultiTagState { private TagType currentTag; private MapTagType, StringBuilder buffers; // 处理方法需要根据不同类型标签进行调整 }4.2 性能优化技巧缓冲区复用避免频繁创建StringBuilder索引优化使用KMP算法加速标签查找并行处理对于CPU密集型操作可使用parallel().parallel() .runOn(Schedulers.parallel()) .scan(...) .sequential()4.3 错误处理策略为增强鲁棒性应该添加标签嵌套检测防止thinkthink这样的错误未闭合标签超时长时间未闭合的标签应自动关闭异常恢复在错误状态后能恢复正常处理5. 替代方案对比虽然scan状态机组合功能强大但也要根据场景选择合适方案方案优点缺点适用场景正则替换实现简单无法处理跨块标签标签保证完整的简单场景完整缓存逻辑简单内存占用高小数据量处理scan状态机精准控制实现复杂需要精确处理的专业场景服务端预处理客户端简单依赖服务端支持有服务端控制权的场景在实际项目中我通常会先评估标签出现的频率和拆分概率。对于低频出现的标签有时简单的服务端预处理反而是更经济的方案。但对于核心业务场景scan状态机的可靠性往往值得额外的实现成本。