GraphRAG 是 RagFlow 的知识图谱增强检索模块,通过构建实体-关系图谱来增强传统 RAG 的检索能力。该模块位于 graphrag/ 目录下。
特性 | 传统 RAG | GraphRAG |
|---|---|---|
数据结构 | 文本块 | 文本块 + 知识图谱 |
检索方式 | 向量相似度 | 向量 + 图遍历 |
关系理解 | 隐式(上下文) | 显式(实体关系) |
多跳推理 | 困难 | 支持 |
全局理解 | 弱 | 强(社区摘要) |
graphrag/
├── __init__.py
├── utils.py # 工具函数
├── search.py # 图谱检索
├── entity_resolution.py # 实体消解
├── entity_resolution_prompt.py # 消解提示词
├── query_analyze_prompt.py # 查询分析提示词
├── general/ # 通用图谱提取
│ ├── index.py # 索引构建
│ ├── graph_extractor.py # 图谱提取器
│ ├── graph_prompt.py # 提取提示词
│ ├── community_reports_extractor.py # 社区报告
│ ├── leiden.py # Leiden 社区发现
│ └── mind_map_extractor.py # 思维导图
└── light/ # 轻量级图谱
├── graph_extractor.py
└── graph_prompt.py
实体 (Entity)
├── 名称 (name)
├── 类型 (type): Person, Organization, Location, Event...
├── 描述 (description)
└── 属性 (attributes)
关系 (Relation)
├── 源实体 (source)
├── 目标实体 (target)
├── 关系类型 (type)
├── 描述 (description)
└── 权重 (weight)
社区 (Community)
├── 实体集合
├── 层级 (level)
└── 摘要报告 (report)
文档 → 分块 → 实体提取 → 关系提取 → 实体消解 → 社区发现 → 社区报告
↓ ↓ ↓ ↓ ↓
Entity Relation Merged Community Summary
Nodes Edges Graph Clusters Reports
# graphrag/general/index.py
async def run_graphrag(
row: dict,
language,
with_resolution: bool,
with_community: bool,
chat_model,
embedding_model,
callback,
):
# 1. 获取文档分块
# 2. 生成子图(实体和关系提取)
# 3. 合并到全局图
# 4. 实体消解(可选)
# 5. 社区发现和报告(可选)
# graphrag/general/graph_extractor.py
class GraphExtractor(Extractor):
"""图谱提取器"""
def __init__(
self,
llm_invoker,
language: str = "English",
entity_types: list[str] = None,
max_gleanings: int = None,
):
# 初始化提示词和参数
pass
asyncdef _process_single_content(self, chunk_key_dp, chunk_seq, num_chunks, out_results):
"""处理单个文本块"""
# 构建提示词
# 调用 LLM 提取
# 多轮提取(Gleaning)
# 解析结果
pass
# graphrag/general/graph_prompt.py
GRAPH_EXTRACTION_PROMPT = """
-Goal-
Given a text document, identify all entities and relationships.
-Steps-
1. Identify entities: entity_name, entity_type, entity_description
2. Identify relationships: source_entity, target_entity, relationship_description, relationship_strength
3. Return output in {language}
4. When finished, output {completion_delimiter}
Entity_types: {entity_types}
Text: {input_text}
Output:
"""
实体消解(Entity Resolution)用于合并指代同一实体的不同表述:
"苹果公司" ←→ "Apple Inc." ←→ "苹果"
↓ ↓ ↓
合并为同一实体节点
# graphrag/entity_resolution.py
class EntityResolution:
"""实体消解器"""
def __init__(self, llm, resolution_prompt=ENTITY_RESOLUTION_PROMPT):
pass
asyncdef resolve(self, entities: list[dict], graph: nx.Graph):
"""执行实体消解"""
# 1. 按类型分组
# 2. 对每个类型组进行消解
# 3. 执行合并
return graph
def _find_candidates(self, entities: list) -> list:
"""找出可能需要合并的实体对"""
pass
asyncdef _should_merge(self, ent1: dict, ent2: dict) -> bool:
"""使用 LLM 判断两个实体是否应该合并"""
pass
# graphrag/general/leiden.py
import networkx as nx
from graspologic.partition import hierarchical_leiden
def detect_communities(graph: nx.Graph, max_levels: int = 3) -> dict:
"""使用 Leiden 算法进行层级社区发现"""
# 转换为邻接矩阵
# 执行层级 Leiden
# 组织结果
return result
# graphrag/general/community_reports_extractor.py
class CommunityReportsExtractor:
"""社区报告生成器"""
def __init__(self, llm, max_tokens: int = 4096):
pass
async def extract_reports(
self,
graph: nx.Graph,
communities: dict,
level: int = 0
) -> list[dict]:
"""为每个社区生成摘要报告"""
# 收集社区信息
# 构建上下文
# 生成报告
return reports
# graphrag/search.py
class KGSearch(Dealer):
"""知识图谱检索器"""
def query_rewrite(self, llm, question, idxnms, kb_ids):
"""查询重写:提取关键词和实体类型"""
# 构建提示词并调用 LLM
# 解析返回的关键词和实体
return type_keywords, entities_from_query
def retrieval(
self,
question: str,
tenant_ids: str | list[str],
kb_ids: list[str],
emb_mdl,
llm,
max_token: int = 8196,
ent_topn: int = 6,
rel_topn: int = 6,
comm_topn: int = 1,
ent_sim_threshold: float = 0.3,
):
"""图谱检索主方法"""
# 1. 查询重写
# 2. 检索相关实体
# 3. 检索相关关系
# 4. 检索社区报告
# 5. 多跳扩展
return entities, relations, communities, token_count
class KGSearch:
def extend_by_n_hop(
self,
seed_entities: dict,
graph: nx.Graph,
n_hop: int = 2
) -> dict:
"""从种子实体出发,进行 N 跳扩展"""
# 从种子实体开始多跳遍历
# 收集邻居节点
return extended
轻量级图谱提取,适合快速构建:
# graphrag/light/graph_extractor.py
class GraphExtractor(Extractor):
"""轻量级图谱提取器"""
def __init__(self, llm_invoker, language="English", entity_types=None):
# 使用简化的提示词,不进行多轮提取
pass
特点:
完整的图谱提取,质量更高:
# graphrag/general/graph_extractor.py
class GraphExtractor(Extractor):
"""通用图谱提取器"""
def __init__(self, llm_invoker, language="English", entity_types=None, max_gleanings=1):
# 使用详细的提示词,支持多轮提取
pass
特点:
# graphrag/utils.py
def set_graph(tenant_id: str, kb_id: str, graph: nx.Graph):
"""保存图谱到存储"""
pass
def get_graph(tenant_id: str, kb_id: str) -> nx.Graph:
"""从存储加载图谱"""
pass
def graph_merge(base_graph: nx.Graph, sub_graph: nx.Graph) -> nx.Graph:
"""合并两个图谱"""
# 合并节点属性和边权重
return merged
# 实体和关系也会被索引到向量数据库
def index_entities(entities: list[dict], embedding_model, kb_id: str):
"""索引实体到向量数据库"""
# 计算描述的向量并插入文档存储
pass
# 通用实体类型
GENERAL_ENTITY_TYPES = [
"Person", # 人物
"Organization", # 组织
"Location", # 地点
"Event", # 事件
"Product", # 产品
"Technology", # 技术
"Concept", # 概念
]
# 领域特定类型(示例:医疗)
MEDICAL_ENTITY_TYPES = [
"Disease", # 疾病
"Symptom", # 症状
"Drug", # 药物
"Treatment", # 治疗方法
"BodyPart", # 身体部位
]
场景 | 模式 | 实体消解 | 社区发现 | 说明 |
|---|---|---|---|---|
快速原型 | Light | 否 | 否 | 快速验证 |
生产环境 | General | 是 | 是 | 完整功能 |
大规模文档 | Light | 是 | 否 | 平衡速度和质量 |
知识密集型 | General | 是 | 是 | 最佳效果 |
# 1. 并发提取
async def extract_parallel(chunks, extractor, max_concurrent=10):
"""并发处理多个文本块"""
pass
# 2. 批量索引
def batch_index_entities(entities, batch_size=100):
"""批量索引实体"""
pass
# 3. 缓存 LLM 结果
def get_llm_cache(model_name, system, history, gen_conf):
"""缓存 LLM 调用结果"""
pass
本章介绍了 RagFlow 的 GraphRAG 模块:
GraphRAG 通过显式建模实体关系,增强了 RAG 系统的推理能力,特别适合需要多跳推理的复杂问答场景。