Manshu RAG 知识库与对话后端#
#RAG; #FastAPI
项目仓库:Manshu RAG 知识库与对话后端
下文流程图概括了「文档上传与解析」与「用户提问与流式回答」两条主链路。
graph TD
A[用户上传文档] --> B[分片上传MinIO]
B --> C[记录分片进度Redis]
C --> D{所有分片完成?}
D -->|否| B
D -->|是| E[合并文件]
E --> F[写入MySQL元数据]
F --> G[投递Kafka消息]
G --> H[Kafka消费者]
H --> I[Tika解析文档]
I --> J[文本分块]
J --> K[调用OpenAI Embedding]
K --> L[写入Elasticsearch]
L --> M[写入DocumentVector表]
N[用户提问] --> O[问题向量化]
O --> P[Elasticsearch混合检索]
P --> Q[获取Top-K结果]
Q --> R[拼装Prompt]
R --> S[OpenAI流式生成]
S --> T[WebSocket推送]
项目概述#
Manshu 后端承担「上传文档 → 解析入库 → 语义/关键词检索 → 基于检索结果与大模型对话」的完整链路,在本项目中负责 AI 与后端开发。
技术栈:Python + FastAPI 作为主框架,MySQL 存储业务与元数据,Redis 负责会话、限流与验证码,MinIO 存储文件与原始文档,Elasticsearch 承担向量与全文混合检索,Kafka 将解析与向量化拆分为异步任务,对话与向量化均使用 OpenAI(Embedding 默认 text-embedding-3-small,Chat 默认 gpt-3.5-turbo,均可通过配置切换)。部署采用 Docker + Uvicorn。API 统一前缀为 /api/v1,聊天采用 WebSocket,路径为 /api/v1/chat,JWT 通过 query 参数 token 传递。
| 类别 | 技术选型 | 用途 |
|---|---|---|
| 后端框架 | Python + FastAPI | REST API 与 WebSocket 服务 |
| 数据库 | MySQL | 业务数据与元数据存储 |
| 缓存 | Redis | 会话管理、限流、验证码 |
| 对象存储 | MinIO | 文件与原始文档存储 |
| 搜索引擎 | Elasticsearch | 向量与全文混合检索 |
| 消息队列 | Kafka | 文档解析与向量化异步任务 |
| 大模型 | OpenAI | Embedding(text-embedding-3-small)与 Chat(gpt-3.5-turbo) |
| 文档解析 | Apache Tika | 多格式文档解析 |
| 部署 | Docker + Uvicorn | 容器化部署与 ASGI 服务器 |
项目背景#
随着内部文档体量增长,单纯关键词搜索已难以满足「按语义检索 + 基于检索结果由大模型生成回答」的需求;同时文档格式多样(PDF、Word、Excel、PPT、HTML、Markdown 等),需要统一的解析与入库流程。本项目将上传、解析、检索、对话串联为一条可配置、可观测的流水线,并实现基于权限与组织标签的多租户隔离;健康检查与结构化日志便于运维排查。
健康检查提供 /health(快速存活探测)与 /health/detailed(深度探测),后者会校验 MySQL、Redis、Elasticsearch、MinIO、Kafka 的连接及连接池状态。应用启动时按依赖顺序连接上述组件;若 Kafka 消费者启动失败,仅记录警告日志,应用仍可正常对外服务,仅文档解析与向量化能力暂时不可用。
系统架构与选型#
采用前后端分离架构:前端通过 HTTP 与 WebSocket 访问后端,后端提供 REST API 与 WebSocket 聊天接口。请求先经 JWT 校验与 Redis 限流,再进入业务逻辑;上传文件落盘 MinIO,元数据写入 MySQL,并向 Kafka 的 document_parse topic 投递解析任务。消费者侧使用 Apache Tika 解析文档、按块切分(块长 500、重叠 50)、调用 OpenAI Embedding,将向量与元数据分别写入 Elasticsearch 与 MySQL。用户提问时,先将问题向量化,在 Elasticsearch 中并行执行向量检索(dense_vector,1536 维,余弦相似度)与 IK 全文检索(建索引用 ik_max_word,查询用 ik_smart),两路结果经分数归一化后按配置权重(默认向量 0.7、全文 0.3)融合,将 Top-K 文档块拼入 Prompt,再调用 OpenAI 流式生成,经同一 WebSocket 推送给前端。
选型考量:Elasticsearch 在同一索引内同时支持向量检索与全文检索,dense_vector 配合 IK 分词即可实现可调权重的混合检索。解析与向量化耗时较长,通过 Kafka 异步处理既可避免阻塞上传接口,又便于后续扩展多消费者与重试策略。Tika 覆盖 PDF、Office、HTML、Markdown 等常见格式,解析失败时可对纯文本尝试 UTF-8 解码降级。FastAPI 对异步与 WebSocket 支持完善,并自动生成 OpenAPI 文档,便于与前端联调与接口契约管理。
关键技术选型理由#
-
Elasticsearch
在同一索引内同时支持语义检索(向量)与关键词检索(全文),
dense_vector配合 IK 分词即可完成混合检索并调节权重,便于在「语义相似」与「关键词命中」之间做平衡。 -
Kafka
解析与向量化耗时较长,若同步执行易导致上传接口超时。通过 Kafka 异步处理既可快速响应上传请求,又便于后续扩展多消费者、重试与死信处理。
-
Apache Tika
可覆盖 PDF、Office、HTML、Markdown 等常见格式,解析失败时对纯文本尝试 UTF-8 解码降级,降低单文件解析失败对整体流程的影响。
功能模块#
健康检查机制#
系统提供两级健康检查,便于接入负载均衡与监控告警:
- 基础检查:
/health— 快速存活探测,用于负载均衡健康检查 - 详细检查:
/health/detailed— 深度探测,校验 MySQL、Redis、Elasticsearch、MinIO、Kafka 的连接及连接池状态
应用启动时会按依赖顺序连接上述组件;若 Kafka 消费者启动失败,仅记录警告日志,应用仍可正常对外提供除文档解析外的其他能力。
认证鉴权#
提供注册、登录、图形验证码与邮件验证码,以及 JWT 签发与校验。验证码与邮件验证码的过期时间、长度均可在配置中设置。基于 Redis 的接口级限流:图形验证码、邮件验证码、注册接口各有独立的限流窗口与次数(如每 IP 每分钟验证码请求次数),用于防止暴力尝试与恶意调用。登录成功后返回 JWT,后续请求通过 Authorization 头或 query 中的 token 携带。
核心功能:
- 图形验证码生成与校验(Redis 存储,可配置过期时间)
- 邮件验证码发送(SMTP 配置,可配置长度与过期时间)
- JWT Token 签发与校验(支持 Authorization 头或 query 参数)
- 基于 Redis 的接口级限流(图形验证码、邮件验证码、注册接口独立限流窗口)
限流策略:
- 每 IP 每分钟验证码请求次数限制
- 每 IP 每分钟邮件验证码请求次数限制
- 每 IP 每分钟注册请求次数限制
文件与文档管理#
上传支持分片:各分片先上传至 MinIO 临时目录,使用 Redis 记录分片进度;全部完成后调用合并接口,生成正式文件并写入 MySQL(文件元数据、用户、组织标签、是否公开),再向 Kafka topic document_parse 投递一条解析任务。消费者从 MinIO 拉取文件、经 Tika 解析、分块、逐块调用 Embedding,将向量写入 Elasticsearch、元数据写入 DocumentVector 表。支持格式依赖 Tika,常见包括 PDF、Word、Excel、PPT、HTML、Markdown、TXT、RTF 等;解析失败时记录日志,必要时仅落库元数据且不重复消费同一条消息,避免单条失败导致队列阻塞。文档列表、删除等接口与权限、组织标签联动,仅可操作当前用户有权限的数据。
- 分片上传:各分片先上传至 MinIO 临时目录,使用 Redis 记录分片进度
- 文件合并:全部完成后调用合并接口,生成正式文件并写入 MySQL
- 任务投递:向 Kafka topic
document_parse投递解析任务 - 异步处理:消费者从 MinIO 拉取文件、Tika 解析、分块、逐块调用 Embedding、写入 Elasticsearch
支持格式:PDF、Word、Excel、PPT、HTML、Markdown、TXT、RTF 等(依赖 Tika)
容错机制:解析失败时记录日志,必要时仅落库元数据且不重复消费同一条消息,避免阻塞队列。
混合检索#
查询文本先经同一 Embedding 模型向量化,在 Elasticsearch 中并行执行向量检索与 IK 全文检索,两路分数归一化后按配置权重加权合并,并按 user_id、org_tag、is_public 做数据隔离,仅返回当前用户有权限的文档块。索引字段包括 file_md5、chunk_id、text_content、vector、user_id、org_tag、is_public、file_name、model_version,便于权限控制与结果溯源。
检索流程:
- 问题向量化(使用 text-embedding-3-small,1536 维)
- 并行执行两路查询:
- 向量检索:dense_vector 字段,余弦相似度
- 全文检索:IK 分词(建索引用 ik_max_word,查询用 ik_smart)
- 分数归一化后按配置权重加权合并(默认向量 0.7、全文 0.3)
- 按 user_id、org_tag、is_public 做数据隔离
索引字段:file_md5、chunk_id、text_content、vector、user_id、org_tag、is_public、file_name、model_version
流式聊天#
WebSocket 路径为 /api/v1/chat,连接时在 query 中传递 token(JWT),可选传递 conversation_id(不传则创建新会话)。流程为:接收用户消息 → 混合检索获取上下文 → 拼装 Prompt → 调用 OpenAI 流式生成 → 经同一 WebSocket 推送响应。会话历史持久化至 MySQL,单会话最多保留最近 N 条消息(CONVERSATION_MAX_MESSAGES,默认 20),会话具备过期时间(CONVERSATION_TTL_DAYS,默认 7 天)。支持停止生成:通过 Redis 存储带 TTL 的停止令牌,前端发送 stop 指令时携带该 token。WebSocket 侧限制单用户最大连接数(默认 10)、单实例最大连接数(默认 1000),并设置空闲超时(默认 3600 秒)与定期心跳清理(默认每 300 秒清理不活跃连接),避免连接长期占用不释放。
对话流程:
接收用户消息 → 混合检索获取上下文 → 拼装 Prompt →
调用 OpenAI 流式生成 → 经同一 WebSocket 推送响应
会话管理:
- 会话历史持久化至 MySQL
- 单会话最多保留最近 N 条消息(CONVERSATION_MAX_MESSAGES,默认 20)
- 会话具备过期时间(CONVERSATION_TTL_DAYS,默认 7 天)
- 支持停止生成:通过 Redis 存储带 TTL 的停止令牌
连接管理:
- 单用户最大连接数:10
- 单实例最大连接数:1000
- 空闲超时:3600 秒
- 定期心跳清理:每 300 秒清理不活跃连接
管理#
独立的管理员路由(admin_router),与普通用户权限分离,负责用户管理、系统配置等,可按需扩展审计、操作日志等功能。
技术难点与实现#
异步文档处理#
难点:若在上传接口内同步执行解析与向量化,大文件易导致超时并阻塞接口,影响其他请求。
方案:上传仅负责写入 MinIO、落库元数据、投递 Kafka 消息;消费者内使用 Tika 解析、分块(块长 500、重叠 50)、调用 Embedding、写入 Elasticsearch。解析失败时记录日志并降级(Tika 失败则尝试 UTF-8 解码,仍失败则仅存元数据且不重复消费该消息),避免同一消息反复重试导致队列阻塞。上传接口可快速返回,处理在后台异步完成。
混合检索#
难点:仅使用向量检索对专有名词、编号等精确匹配不敏感;仅使用全文检索则语义泛化不足,难以兼顾「语义相似」与「关键词命中」。
方案:在 Elasticsearch 中为同一批文档块同时建立向量字段(1536 维、余弦相似度)与 IK 分词的文本字段,检索时并行执行两路查询,对分数归一化后按配置权重(SEARCH_VECTOR_WEIGHT / SEARCH_TEXT_WEIGHT)融合,便于后续调参。实际使用中,专有名词与语义类问题均可获得更相关结果,RAG 回答与知识库内容一致性更高。
在 Elasticsearch 中为同一批文档块同时建立:
- 向量字段:1536 维、余弦相似度
- 文本字段:IK 分词(ik_max_word / ik_smart)
检索时并行执行两路查询,对分数归一化后按配置权重(SEARCH_VECTOR_WEIGHT / SEARCH_TEXT_WEIGHT)融合。
总结#
从文档接入、解析、向量化到混合检索与流式对话,整条 RAG 链路已打通;上传与解析异步解耦、多租户权限与组织标签隔离已按需求实现。后续可扩展方向包括:多模型接入(支持其他 Embedding 与 Chat 模型)、更细粒度权限与审计、以及检索排序与重排策略的迭代优化。