Manshu RAG 知识库与对话后端#

#RAG; #FastAPI


项目仓库:Manshu RAG 知识库与对话后端


下文流程图概括了「文档上传与解析」与「用户提问与流式回答」两条主链路。

graph TD
    A[用户上传文档] --> B[分片上传MinIO]
    B --> C[记录分片进度Redis]
    C --> D{所有分片完成?}
    D -->|否| B
    D -->|是| E[合并文件]
    E --> F[写入MySQL元数据]
    F --> G[投递Kafka消息]
    G --> H[Kafka消费者]
    H --> I[Tika解析文档]
    I --> J[文本分块]
    J --> K[调用OpenAI Embedding]
    K --> L[写入Elasticsearch]
    L --> M[写入DocumentVector表]
    
    N[用户提问] --> O[问题向量化]
    O --> P[Elasticsearch混合检索]
    P --> Q[获取Top-K结果]
    Q --> R[拼装Prompt]
    R --> S[OpenAI流式生成]
    S --> T[WebSocket推送]

项目概述#

Manshu 后端承担「上传文档 → 解析入库 → 语义/关键词检索 → 基于检索结果与大模型对话」的完整链路,在本项目中负责 AI 与后端开发。

技术栈:Python + FastAPI 作为主框架,MySQL 存储业务与元数据,Redis 负责会话、限流与验证码,MinIO 存储文件与原始文档,Elasticsearch 承担向量与全文混合检索,Kafka 将解析与向量化拆分为异步任务,对话与向量化均使用 OpenAI(Embedding 默认 text-embedding-3-small,Chat 默认 gpt-3.5-turbo,均可通过配置切换)。部署采用 Docker + Uvicorn。API 统一前缀为 /api/v1,聊天采用 WebSocket,路径为 /api/v1/chat,JWT 通过 query 参数 token 传递。

类别 技术选型 用途
后端框架 Python + FastAPI REST API 与 WebSocket 服务
数据库 MySQL 业务数据与元数据存储
缓存 Redis 会话管理、限流、验证码
对象存储 MinIO 文件与原始文档存储
搜索引擎 Elasticsearch 向量与全文混合检索
消息队列 Kafka 文档解析与向量化异步任务
大模型 OpenAI Embedding(text-embedding-3-small)与 Chat(gpt-3.5-turbo)
文档解析 Apache Tika 多格式文档解析
部署 Docker + Uvicorn 容器化部署与 ASGI 服务器

项目背景#

随着内部文档体量增长,单纯关键词搜索已难以满足「按语义检索 + 基于检索结果由大模型生成回答」的需求;同时文档格式多样(PDF、Word、Excel、PPT、HTML、Markdown 等),需要统一的解析与入库流程。本项目将上传、解析、检索、对话串联为一条可配置、可观测的流水线,并实现基于权限与组织标签的多租户隔离;健康检查与结构化日志便于运维排查。

健康检查提供 /health(快速存活探测)与 /health/detailed(深度探测),后者会校验 MySQL、Redis、Elasticsearch、MinIO、Kafka 的连接及连接池状态。应用启动时按依赖顺序连接上述组件;若 Kafka 消费者启动失败,仅记录警告日志,应用仍可正常对外服务,仅文档解析与向量化能力暂时不可用。


系统架构与选型#

采用前后端分离架构:前端通过 HTTP 与 WebSocket 访问后端,后端提供 REST API 与 WebSocket 聊天接口。请求先经 JWT 校验与 Redis 限流,再进入业务逻辑;上传文件落盘 MinIO,元数据写入 MySQL,并向 Kafka 的 document_parse topic 投递解析任务。消费者侧使用 Apache Tika 解析文档、按块切分(块长 500、重叠 50)、调用 OpenAI Embedding,将向量与元数据分别写入 Elasticsearch 与 MySQL。用户提问时,先将问题向量化,在 Elasticsearch 中并行执行向量检索(dense_vector,1536 维,余弦相似度)与 IK 全文检索(建索引用 ik_max_word,查询用 ik_smart),两路结果经分数归一化后按配置权重(默认向量 0.7、全文 0.3)融合,将 Top-K 文档块拼入 Prompt,再调用 OpenAI 流式生成,经同一 WebSocket 推送给前端。

选型考量:Elasticsearch 在同一索引内同时支持向量检索与全文检索,dense_vector 配合 IK 分词即可实现可调权重的混合检索。解析与向量化耗时较长,通过 Kafka 异步处理既可避免阻塞上传接口,又便于后续扩展多消费者与重试策略。Tika 覆盖 PDF、Office、HTML、Markdown 等常见格式,解析失败时可对纯文本尝试 UTF-8 解码降级。FastAPI 对异步与 WebSocket 支持完善,并自动生成 OpenAPI 文档,便于与前端联调与接口契约管理。

关键技术选型理由#

  • Elasticsearch

    在同一索引内同时支持语义检索(向量)与关键词检索(全文),dense_vector 配合 IK 分词即可完成混合检索并调节权重,便于在「语义相似」与「关键词命中」之间做平衡。

  • Kafka

    解析与向量化耗时较长,若同步执行易导致上传接口超时。通过 Kafka 异步处理既可快速响应上传请求,又便于后续扩展多消费者、重试与死信处理。

  • Apache Tika

    可覆盖 PDF、Office、HTML、Markdown 等常见格式,解析失败时对纯文本尝试 UTF-8 解码降级,降低单文件解析失败对整体流程的影响。


功能模块#

健康检查机制#

系统提供两级健康检查,便于接入负载均衡与监控告警:

  • 基础检查/health — 快速存活探测,用于负载均衡健康检查
  • 详细检查/health/detailed — 深度探测,校验 MySQL、Redis、Elasticsearch、MinIO、Kafka 的连接及连接池状态

应用启动时会按依赖顺序连接上述组件;若 Kafka 消费者启动失败,仅记录警告日志,应用仍可正常对外提供除文档解析外的其他能力。

认证鉴权#

提供注册、登录、图形验证码与邮件验证码,以及 JWT 签发与校验。验证码与邮件验证码的过期时间、长度均可在配置中设置。基于 Redis 的接口级限流:图形验证码、邮件验证码、注册接口各有独立的限流窗口与次数(如每 IP 每分钟验证码请求次数),用于防止暴力尝试与恶意调用。登录成功后返回 JWT,后续请求通过 Authorization 头或 query 中的 token 携带。

核心功能

  • 图形验证码生成与校验(Redis 存储,可配置过期时间)
  • 邮件验证码发送(SMTP 配置,可配置长度与过期时间)
  • JWT Token 签发与校验(支持 Authorization 头或 query 参数)
  • 基于 Redis 的接口级限流(图形验证码、邮件验证码、注册接口独立限流窗口)

限流策略

  • 每 IP 每分钟验证码请求次数限制
  • 每 IP 每分钟邮件验证码请求次数限制
  • 每 IP 每分钟注册请求次数限制

文件与文档管理#

上传支持分片:各分片先上传至 MinIO 临时目录,使用 Redis 记录分片进度;全部完成后调用合并接口,生成正式文件并写入 MySQL(文件元数据、用户、组织标签、是否公开),再向 Kafka topic document_parse 投递一条解析任务。消费者从 MinIO 拉取文件、经 Tika 解析、分块、逐块调用 Embedding,将向量写入 Elasticsearch、元数据写入 DocumentVector 表。支持格式依赖 Tika,常见包括 PDF、Word、Excel、PPT、HTML、Markdown、TXT、RTF 等;解析失败时记录日志,必要时仅落库元数据且不重复消费同一条消息,避免单条失败导致队列阻塞。文档列表、删除等接口与权限、组织标签联动,仅可操作当前用户有权限的数据。

  1. 分片上传:各分片先上传至 MinIO 临时目录,使用 Redis 记录分片进度
  2. 文件合并:全部完成后调用合并接口,生成正式文件并写入 MySQL
  3. 任务投递:向 Kafka topic document_parse 投递解析任务
  4. 异步处理:消费者从 MinIO 拉取文件、Tika 解析、分块、逐块调用 Embedding、写入 Elasticsearch

支持格式:PDF、Word、Excel、PPT、HTML、Markdown、TXT、RTF 等(依赖 Tika)

容错机制:解析失败时记录日志,必要时仅落库元数据且不重复消费同一条消息,避免阻塞队列。

混合检索#

查询文本先经同一 Embedding 模型向量化,在 Elasticsearch 中并行执行向量检索与 IK 全文检索,两路分数归一化后按配置权重加权合并,并按 user_id、org_tag、is_public 做数据隔离,仅返回当前用户有权限的文档块。索引字段包括 file_md5、chunk_id、text_content、vector、user_id、org_tag、is_public、file_name、model_version,便于权限控制与结果溯源。

检索流程

  1. 问题向量化(使用 text-embedding-3-small,1536 维)
  2. 并行执行两路查询:
    • 向量检索:dense_vector 字段,余弦相似度
    • 全文检索:IK 分词(建索引用 ik_max_word,查询用 ik_smart)
  3. 分数归一化后按配置权重加权合并(默认向量 0.7、全文 0.3)
  4. 按 user_id、org_tag、is_public 做数据隔离

索引字段:file_md5、chunk_id、text_content、vector、user_id、org_tag、is_public、file_name、model_version

流式聊天#

WebSocket 路径为 /api/v1/chat,连接时在 query 中传递 token(JWT),可选传递 conversation_id(不传则创建新会话)。流程为:接收用户消息 → 混合检索获取上下文 → 拼装 Prompt → 调用 OpenAI 流式生成 → 经同一 WebSocket 推送响应。会话历史持久化至 MySQL,单会话最多保留最近 N 条消息(CONVERSATION_MAX_MESSAGES,默认 20),会话具备过期时间(CONVERSATION_TTL_DAYS,默认 7 天)。支持停止生成:通过 Redis 存储带 TTL 的停止令牌,前端发送 stop 指令时携带该 token。WebSocket 侧限制单用户最大连接数(默认 10)、单实例最大连接数(默认 1000),并设置空闲超时(默认 3600 秒)与定期心跳清理(默认每 300 秒清理不活跃连接),避免连接长期占用不释放。

对话流程

接收用户消息 → 混合检索获取上下文 → 拼装 Prompt → 
调用 OpenAI 流式生成 → 经同一 WebSocket 推送响应

会话管理

  • 会话历史持久化至 MySQL
  • 单会话最多保留最近 N 条消息(CONVERSATION_MAX_MESSAGES,默认 20)
  • 会话具备过期时间(CONVERSATION_TTL_DAYS,默认 7 天)
  • 支持停止生成:通过 Redis 存储带 TTL 的停止令牌

连接管理

  • 单用户最大连接数:10
  • 单实例最大连接数:1000
  • 空闲超时:3600 秒
  • 定期心跳清理:每 300 秒清理不活跃连接

管理#

独立的管理员路由(admin_router),与普通用户权限分离,负责用户管理、系统配置等,可按需扩展审计、操作日志等功能。


技术难点与实现#

异步文档处理#

难点:若在上传接口内同步执行解析与向量化,大文件易导致超时并阻塞接口,影响其他请求。

方案:上传仅负责写入 MinIO、落库元数据、投递 Kafka 消息;消费者内使用 Tika 解析、分块(块长 500、重叠 50)、调用 Embedding、写入 Elasticsearch。解析失败时记录日志并降级(Tika 失败则尝试 UTF-8 解码,仍失败则仅存元数据且不重复消费该消息),避免同一消息反复重试导致队列阻塞。上传接口可快速返回,处理在后台异步完成。

混合检索#

难点:仅使用向量检索对专有名词、编号等精确匹配不敏感;仅使用全文检索则语义泛化不足,难以兼顾「语义相似」与「关键词命中」。

方案:在 Elasticsearch 中为同一批文档块同时建立向量字段(1536 维、余弦相似度)与 IK 分词的文本字段,检索时并行执行两路查询,对分数归一化后按配置权重(SEARCH_VECTOR_WEIGHT / SEARCH_TEXT_WEIGHT)融合,便于后续调参。实际使用中,专有名词与语义类问题均可获得更相关结果,RAG 回答与知识库内容一致性更高。

在 Elasticsearch 中为同一批文档块同时建立:

  • 向量字段:1536 维、余弦相似度
  • 文本字段:IK 分词(ik_max_word / ik_smart)

检索时并行执行两路查询,对分数归一化后按配置权重(SEARCH_VECTOR_WEIGHT / SEARCH_TEXT_WEIGHT)融合。


总结#

从文档接入、解析、向量化到混合检索与流式对话,整条 RAG 链路已打通;上传与解析异步解耦、多租户权限与组织标签隔离已按需求实现。后续可扩展方向包括:多模型接入(支持其他 Embedding 与 Chat 模型)、更细粒度权限与审计、以及检索排序与重排策略的迭代优化。