上一篇我们聊了这套架构的设计思路今天直接上干货从核心模块的代码实现、关键配置到部署踩坑带你一步步把这套工业级问答系统跑起来。一、项目架构回顾先快速回顾下整体流程帮你快速定位代码模块MySQL 问答层Redis 缓存 BM25 检索处理高频问题增强 RAG 问答层多策略检索 Reranker 重排序 LLM 生成离线知识库层文档解析 → 双层分块 → BGE-M3 向量化 → Milvus 存储下面我们按模块拆解核心代码和配置。二、离线知识库模块数据处理与向量入库这部分是 RAG 效果的基石核心是文档解析、双层分块、向量化存储。1. 依赖安装bash运行pip install langchain langchain-community langchain-text-splitters pymilvus sentence-transformers bge-m32. 文档解析与双层分块父文档 子文档python运行from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain.embeddings import HuggingFaceBgeEmbeddings from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility import os # 1. 文档加载器映射 LOADER_MAP { .pdf: PyPDFLoader, .docx: Docx2txtLoader, .txt: TextLoader, .md: TextLoader } def load_document(file_path: str): 加载不同格式的文档 ext os.path.splitext(file_path)[1].lower() loader LOADER_MAP.get(ext) if not loader: raise ValueError(f不支持的文件格式: {ext}) return loader(file_path).load() # 2. 双层分块配置 parent_splitter RecursiveCharacterTextSplitter( chunk_size1000, # 父文档块大小保证上下文完整 chunk_overlap200, separators[\n\n, \n, 。, , , ] ) child_splitter RecursiveCharacterTextSplitter( chunk_size200, # 子文档块大小用于向量检索 chunk_overlap50, separators[\n\n, \n, 。, , , ] ) def split_document(documents): 父文档子文档双层分块 parent_chunks parent_splitter.split_documents(documents) all_child_chunks [] for parent in parent_chunks: # 子块继承父文档的元数据 child_chunks child_splitter.split_documents([parent]) for child in child_chunks: child.metadata[parent_chunk_id] hash(parent.page_content) child.metadata[parent_content] parent.page_content all_child_chunks.extend(child_chunks) return all_child_chunks # 3. BGE-M3向量化 embedding_model HuggingFaceBgeEmbeddings( model_nameBAAI/bge-m3, model_kwargs{device: cuda}, # 有GPU就用cuda没有就用cpu encode_kwargs{normalize_embeddings: True} ) def embed_chunks(chunks): 将子文档块向量化 texts [chunk.page_content for chunk in chunks] embeddings embedding_model.embed_documents(texts) return embeddings, texts, [chunk.metadata for chunk in chunks]3. Milvus 向量库初始化与入库python运行# Milvus连接配置 connections.connect( aliasdefault, hostlocalhost, port19530 ) # 集合schema定义 fields [ FieldSchema(nameid, dtypeDataType.INT64, is_primaryTrue, auto_idTrue), FieldSchema(namevector, dtypeDataType.FLOAT_VECTOR, dim1024), # BGE-M3向量维度 FieldSchema(nametext, dtypeDataType.VARCHAR, max_length65535), FieldSchema(nameparent_chunk_id, dtypeDataType.INT64), FieldSchema(nameparent_content, dtypeDataType.VARCHAR, max_length65535) ] schema CollectionSchema(fields, descriptionenterprise_rag_knowledge_base) collection_name enterprise_rag if not utility.has_collection(collection_name): collection Collection(collection_name, schema) # 创建向量索引 index_params { metric_type: IP, index_type: HNSW, params: {M: 8, efConstruction: 64} } collection.create_index(vector, index_params) else: collection Collection(collection_name) collection.load() def insert_to_milvus(embeddings, texts, metadatas): 向量数据写入Milvus data [ embeddings, texts, [meta[parent_chunk_id] for meta in metadatas], [meta[parent_content] for meta in metadatas] ] collection.insert(data) collection.flush()三、MySQL 问答层高频问题缓存与检索核心是 Redis 缓存 BM25 文本匹配实现高频问题的毫秒级响应。1. 数据库与缓存初始化python运行import redis import pymysql from rank_bm25 import BM25Okapi import json # Redis连接缓存高频问答 redis_client redis.Redis(hostlocalhost, port6379, db0) # MySQL连接存储高频问答对 mysql_conn pymysql.connect( hostlocalhost, userroot, passwordyour_password, databaseqa_system, charsetutf8mb4 ) # 初始化BM25检索器预加载高频问题 def load_high_freq_qa(): 从MySQL加载高频问答对 cursor mysql_conn.cursor(pymysql.cursors.DictCursor) cursor.execute(SELECT question, answer FROM high_freq_qa) qa_list cursor.fetchall() questions [qa[question] for qa in qa_list] answers [qa[answer] for qa in qa_list] return questions, answers questions, answers load_high_freq_qa() tokenized_questions [q.split() for q in questions] bm25 BM25Okapi(tokenized_questions) def get_mysql_answer(query: str, threshold: float 0.85): BM25检索高频问答超过阈值直接返回 tokenized_query query.split() scores bm25.get_scores(tokenized_query) max_score max(scores) if scores.size 0 else 0 if max_score threshold: best_idx scores.argmax() return answers[best_idx], True return None, False def check_redis_cache(query: str): 检查Redis缓存是否有该问题的答案 cache_key fqa:cache:{query} cached_answer redis_client.get(cache_key) if cached_answer: return json.loads(cached_answer), True return None, False def update_redis_cache(query: str, answer: str, expire_time: int 3600): 更新Redis缓存 cache_key fqa:cache:{query} redis_client.setex(cache_key, expire_time, json.dumps(answer))四、增强 RAG 问答层多策略检索 重排序 LLM 生成这是系统的核心包含意图识别、多策略检索、Reranker 重排序和大模型生成。1. 意图识别区分通用问题 / 专业问题python运行from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate # 初始化大模型可替换为本地部署的Qwen/Llama llm ChatOpenAI( base_urlhttps://api.openai.com/v1, api_keyyour_api_key, modelgpt-3.5-turbo, temperature0 ) intent_prompt PromptTemplate( input_variables[query], template判断用户的问题是否属于需要依赖企业知识库的专业问题 仅返回是或否。 专业问题的例子公司请假流程是什么AI课程大纲有哪些 非专业问题的例子11等于几今天天气怎么样 用户问题{query} ) def is_general_question(query: str): 判断是否为通用问题是则直接用LLM回答 prompt intent_prompt.format(queryquery) response llm.invoke(prompt).content.strip() return response 否2. 多策略检索实现python运行from langchain_core.documents import Document import numpy as np # 策略1直接检索 def direct_retrieval(query: str, top_k: int 5): 直接向量化检索 query_vector embedding_model.embed_query(query) search_params {metric_type: IP, params: {ef: 10}} results collection.search( data[query_vector], anns_fieldvector, paramsearch_params, limittop_k, output_fields[text, parent_content, parent_chunk_id] ) return [ Document( page_contentres.entity.get(text), metadata{ parent_content: res.entity.get(parent_content), score: res.score } ) for res in results[0] ] # 策略2假设问题检索HyDE hyde_prompt PromptTemplate( input_variables[query], template根据用户问题生成一个合理的假设答案 只输出答案内容不要额外解释。 用户问题{query} ) def hyde_retrieval(query: str, top_k: int 5): HyDE假设答案检索 hypothetical_answer llm.invoke(hyde_prompt.format(queryquery)).content return direct_retrieval(hypothetical_answer, top_k) # 策略3子问题分解检索 subquery_prompt PromptTemplate( input_variables[query], template将用户的复杂问题拆分成2-3个独立的子问题 每个子问题占一行只输出子问题不要额外内容。 用户问题{query} ) def subquery_retrieval(query: str, top_k_per_subquery: int 3): 子问题分解检索合并结果 subqueries llm.invoke(subquery_prompt.format(queryquery)).content.strip().split(\n) all_docs [] for subq in subqueries: docs direct_retrieval(subq, top_k_per_subquery) all_docs.extend(docs) # 去重 unique_docs [] seen_ids set() for doc in all_docs: pid doc.metadata[parent_chunk_id] if pid not in seen_ids: seen_ids.add(pid) unique_docs.append(doc) return unique_docs[:top_k_per_subquery*3] # 策略选择器可根据问题类型动态选择这里简化为自动混合 def strategy_selector(query: str): 根据问题选择检索策略 # 简化逻辑直接检索HyDE子问题检索结果合并 direct_docs direct_retrieval(query) hyde_docs hyde_retrieval(query) subquery_docs subquery_retrieval(query) all_docs direct_docs hyde_docs subquery_docs # 去重 unique_docs [] seen_ids set() for doc in all_docs: pid doc.metadata[parent_chunk_id] if pid not in seen_ids: seen_ids.add(pid) unique_docs.append(doc) return unique_docs3. Reranker 重排序与上下文构建python运行from sentence_transformers import CrossEncoder # 初始化Reranker模型 reranker CrossEncoder(BAAI/bge-reranker-base) def rerank_docs(query: str, docs, top_k: int 5): 用Reranker对检索结果重排序 # 构造query-doc对 pairs [[query, doc.page_content] for doc in docs] scores reranker.predict(pairs) # 按分数降序排序 sorted_indices np.argsort(scores)[::-1] sorted_docs [docs[i] for i in sorted_indices[:top_k]] return sorted_docs def build_context(docs): 从重排序后的文档中构建上下文优先使用父文档内容 # 去重父文档 parent_contents list({doc.metadata[parent_content] for doc in docs}) context \n\n.join([f参考文档片段{i1}{content} for i, content in enumerate(parent_contents)]) return context4. LLM 生成基于上下文的回答python运行qa_prompt PromptTemplate( input_variables[query, context], template你是企业智能问答助手只能根据给定的参考文档片段回答用户问题。 如果参考文档中没有相关信息就说抱歉我没有找到相关信息不要编造内容。 参考文档片段 {context} 用户问题{query} 回答 ) def generate_answer(query: str, context: str): 基于上下文生成回答 prompt qa_prompt.format(queryquery, contextcontext) response llm.invoke(prompt).content.strip() return response五、主流程整合用户问答入口python运行def qa_system(query: str): 完整问答流程MySQL缓存→高频问答→RAG→LLM生成 # 1. 检查Redis缓存 cached_answer, hit_cache check_redis_cache(query) if hit_cache: return {answer: cached_answer, source: redis_cache} # 2. 检查MySQL高频问答 mysql_answer, hit_mysql get_mysql_answer(query) if hit_mysql: update_redis_cache(query, mysql_answer) return {answer: mysql_answer, source: mysql_high_freq} # 3. 意图识别判断是否为通用问题 if is_general_question(query): general_answer llm.invoke(query).content return {answer: general_answer, source: general_llm} # 4. 多策略RAG检索重排序 docs strategy_selector(query) reranked_docs rerank_docs(query, docs) context build_context(reranked_docs) # 5. 生成最终回答 final_answer generate_answer(query, context) # 6. 更新Redis缓存可选 update_redis_cache(query, final_answer) return {answer: final_answer, source: enhanced_rag, context: context}六、部署与优化踩坑指南1. 性能优化Milvus 索引优化使用 HNSW 索引替代暴力搜索向量库 1000 万级数据也能做到毫秒级召回。缓存预热系统启动时将高频问答对提前加载到 Redis 和 BM25 索引中避免首次请求冷启动。批量处理离线知识库的文档入库用批量插入比单条插入快 10 倍以上。2. 效果优化分块调优父文档块大小建议 800-1500 字子文档块 200-300 字根据文档类型调整技术文档可适当调大。Reranker 必加没有 Reranker 的 RAG 召回噪声会非常多重排序后相关性提升 30% 以上。多策略检索组合HyDE 子问题分解能覆盖 80% 的长尾问题比单一向量检索召回率高很多。3. 生产环境部署用 FastAPI 封装接口将 qa_system 函数封装为 API供前端调用。异步处理离线知识库的文档入库用 Celery 异步任务避免阻塞主流程。监控告警对 Milvus、Redis、MySQL 的连接状态和性能指标做监控设置告警阈值。七、总结与扩展方向这套架构的核心是分层处理 多策略增强把企业问答系统的速度、成本和准确性做到了平衡。你可以根据业务场景继续扩展增加对话历史上下文实现多轮问答。接入企业权限系统实现不同用户的知识库隔离。增加问答日志分析自动更新高频问答库。