示例“由”与“由_研发”重复考虑到LLM查重每一次都要调用新的LLM采用向量库查重prompt优化新增规则4. 同一语义只能用一个词不能出现训练于和在...中训练这种重复第一阶段纯向量去重失败最直觉的方案是用 embedding 模型计算相似度超过阈值就跳过scores cosine_similarity([new_vec], exist_vecs)[0] if scores.max() 0.9: continue # 跳过重复问题mxbai-embed-large对中文短句效果差由vs由_研发这种短词相似度计算不稳定阈值难以调整0.9太严0.7又会误杀第二阶段动态Schema约束核心思路不在存入时去重而是在提取时就输出标准化的词。每次提取时把已有图谱的实体名和关系词表传给LLM强制复用图谱越用越标准从源头消灭歧义。第三阶段向量召回 LLM标准化兜底仅靠Prompt约束还不够需要一道兜底机制。但直接把整个图谱塞给LLM判断图谱大了会超出context window。方案向量先缩小范围LLM只看候选项def normalize_by_llm(new_data): 向量召回候选后LLM做最终标准化节点关系一起处理 # 为每个新节点找候选 node_candidates {} for node in new_data.get(nodes, []): similar get_top_k_similar(node, _node_cache, top_k5) if similar: node_candidates[node] [name for name, _ in similar] # 为每个新关系找候选 rel_candidates {} for edge in new_data.get(edges, []): if isinstance(edge, list) and len(edge) 3: rel edge[1] similar get_top_k_similar(rel, _rel_cache, top_k5) if similar: rel_candidates[rel] [name for name, _ in similar] # 没有候选说明是全新图谱直接返回 if not node_candidates and not rel_candidates: return new_data prompt f 你是知识图谱专家对新数据进行标准化处理。 新数据{json.dumps(new_data, ensure_asciiFalse)} 节点候选映射如果新节点与候选是同一实体用候选名替换 {json.dumps(node_candidates, ensure_asciiFalse)} 关系候选映射如果新关系与候选语义相同用候选名替换 {json.dumps(rel_candidates, ensure_asciiFalse)} 规则 1. 只有确定是同一实体/语义才替换不确定保留原名 2. 关系名不超过4个字 3. 只输出JSON无任何解释 格式 {{ nodes: [实体A, 实体B], edges: [[实体A, 关系, 实体B]] }} response llm.invoke(prompt) try: match re.search(r\{.*\}, response, re.DOTALL) if not match: return new_data return json.loads(match.group()) except: return new_datadef get_top_k_similar(query, candidates_cache, top_k5): if not candidates_cache: return [] query_vec embed_model.embed_query(query) names list(candidates_cache.keys()) vecs list(candidates_cache.values()) scores cosine_similarity([query_vec], vecs)[0] top_indices np.argsort(scores)[::-1][:top_k] # 只返回相似度0.5的过滤掉不相关的候选 return [(names[i], scores[i]) for i in top_indices if scores[i] 0.5]第四阶段内存缓存加速每次请求都从Neo4j加载所有节点计算向量太慢用内存缓存解决# 内存缓存 _node_cache {} # {name: vector} _rel_cache {} # {rel: vector} def load_cache_from_neo4j(session): 首次运行时从neo4j加载已有数据到缓存 if not _node_cache: existing_nodes [r[name] for r in session.run(MATCH (e:Entity) RETURN e.name AS name)] if existing_nodes: print(f[缓存] 加载 {len(existing_nodes)} 个节点向量...) vecs embed_model.embed_documents(existing_nodes) _node_cache.update(dict(zip(existing_nodes, vecs))) if not _rel_cache: existing_rels list(set( r[rel] for r in session.run(MATCH ()-[r]-() RETURN type(r) AS rel) )) if existing_rels: print(f[缓存] 加载 {len(existing_rels)} 个关系向量...) vecs embed_model.embed_documents(existing_rels) _rel_cache.update(dict(zip(existing_rels, vecs)))新节点/关系存入后立即更新缓存保持同步。