From 0bb7596d9a0826f55b4f35247a88766447db010b Mon Sep 17 00:00:00 2001 From: KingSkyLi <15566300566@163.com> Date: Sat, 14 Sep 2024 16:24:45 +0800 Subject: [PATCH 1/6] Graph RAG add document and chunk type --- dbgpt/storage/graph_store/base.py | 10 +- dbgpt/storage/graph_store/tugraph_store.py | 221 +++++++++++++++--- .../knowledge_graph/community_summary.py | 106 ++++++++- .../knowledge_graph/knowledge_graph.py | 4 + 4 files changed, 299 insertions(+), 42 deletions(-) diff --git a/dbgpt/storage/graph_store/base.py b/dbgpt/storage/graph_store/base.py index 24a4b467b..8a3a0493f 100644 --- a/dbgpt/storage/graph_store/base.py +++ b/dbgpt/storage/graph_store/base.py @@ -5,7 +5,7 @@ from dbgpt._private.pydantic import BaseModel, ConfigDict, Field from dbgpt.core import Embeddings -from dbgpt.storage.graph_store.graph import Direction, Graph +from dbgpt.storage.graph_store.graph import Direction, Graph, Vertex logger = logging.getLogger(__name__) @@ -44,6 +44,10 @@ def get_vertex_type(self) -> str: def get_edge_type(self) -> str: """Get the edge type.""" + @abstractmethod + def get_document_vertex(self, doc_name:str) -> Vertex: + """Add graph.""" + @abstractmethod def insert_triplet(self, sub: str, rel: str, obj: str): """Add triplet.""" @@ -60,6 +64,10 @@ def get_triplets(self, sub: str) -> List[Tuple[str, str]]: def delete_triplet(self, sub: str, rel: str, obj: str): """Delete triplet.""" + @abstractmethod + def delete_document(self, doc_name: str): + """Delete document.""" + @abstractmethod def truncate(self): """Truncate Graph.""" diff --git a/dbgpt/storage/graph_store/tugraph_store.py b/dbgpt/storage/graph_store/tugraph_store.py index 3fdd2df8b..469531945 100644 --- a/dbgpt/storage/graph_store/tugraph_store.py +++ b/dbgpt/storage/graph_store/tugraph_store.py @@ -36,11 +36,23 @@ class TuGraphStoreConfig(GraphStoreConfig): ) vertex_type: str = Field( default="entity", - description="The type of vertex, `entity` by default.", + description="The type of entity vertex, `entity` by default.", + ) + document_type: str = Field( + default="document", + description="The type of document vertex, `entity` by default.", + ) + chunk_type: str = Field( + default="chunk", + description="The type of chunk vertex, `relation` by default.", ) edge_type: str = Field( default="relation", - description="The type of edge, `relation` by default.", + description="The type of relation edge, `relation` by default.", + ) + include_type: str = Field( + default="include", + description="The type of include edge, `include` by default.", ) plugin_names: List[str] = Field( default=["leiden"], @@ -73,6 +85,9 @@ def __init__(self, config: TuGraphStoreConfig) -> None: self._graph_name = config.name self._vertex_type = os.getenv("TUGRAPH_VERTEX_TYPE", config.vertex_type) self._edge_type = os.getenv("TUGRAPH_EDGE_TYPE", config.edge_type) + self._document_type = os.getenv("TUGRAPH_DOCUMENT_TYPE", config.document_type) + self._chunk_type = os.getenv("TUGRAPH_DOCUMENT_TYPE", config.chunk_type) + self._include_type = os.getenv("TUGRAPH_INCLUDE_TYPE", config.include_type) self.conn = TuGraphConnector.from_uri_db( host=self._host, @@ -91,6 +106,23 @@ def get_vertex_type(self) -> str: def get_edge_type(self) -> str: """Get the edge type.""" return self._edge_type + + def get_document_vertex(slef,doc_name:str) -> Vertex: + gql = f'''MATCH (n) WHERE n.id = {doc_name} RETURN n''' + graph = slef.query(gql) + vertex = graph.get_vertex(doc_name) + return vertex + + def delete_document(slef, doc_name:str): + # del_relation_gql = f'''MATCH (n:document)-[r:include]-(m:chunk) WHERE r._doc_name = {doc_name} DELETE r''' + # del_chunk_gql = f'''MATCH (n:document)-[r:include]-(m:chunk) WHERE n.id = {doc_name} DELETE m''' + # del_doc_gql = f'''MATCH (n:document) WHERE n.id = {doc_name} DELETE n''' + # del_alone_nodes_gql = f'''MATCH (n:document) WHERE n.id = {doc_name} DELETE n''' + # slef.conn.run(del_relation_gql) + # slef.conn.run(del_chunk_gql) + # slef.conn.run(del_doc_gql) + # slef.conn.run(del_alone_nodes_gql) + pass def _create_graph(self, graph_name: str): self.conn.create_graph(graph_name=graph_name) @@ -102,8 +134,15 @@ def _check_label(self, elem_type: str): result = self.conn.get_table_names() if elem_type == "vertex": return self._vertex_type in result["vertex_tables"] + if elem_type == "chunk": + return self._chunk_type in result["vertex_tables"] + if elem_type == "document": + return self._document_type in result["vertex_tables"] + if elem_type == "edge": return self._edge_type in result["edge_tables"] + if elem_type == "include": + return self._include_type in result["edge_tables"] def _add_vertex_index(self, field_name): gql = f"CALL db.addIndex('{self._vertex_type}', '{field_name}', false)" @@ -161,7 +200,8 @@ def _create_schema(self): f"CALL db.createLabel(" f"'vertex', '{self._vertex_type}', " f"'id', ['id',string,false]," - f"['name',string,false])" + f"['name',string,false])", + ) self.conn.run(create_vertex_gql) @@ -182,6 +222,39 @@ def _create_schema(self): ["description",STRING,true])""" self.conn.run(create_edge_gql) + if not self._check_label("document"): + if self._summary_enabled: + create_document_gql = ( + f"CALL db.createLabel(" + f"'vertex', '{self._document_type}', " + f"'id', ['id',string,false]," + f"['_community_id',string,true]," + f"['name',string,false])" + ) + self.conn.run(create_document_gql) + + if not self._check_label("chunk"): + if self._summary_enabled: + create_document_gql = ( + f"CALL db.createLabel(" + f"'vertex', '{self._chunk_type}', " + f"'id', ['id',string,false]," + f"['_community_id',string,true]," + f"['name',string,false])" + ) + self.conn.run(create_document_gql) + + if not self._check_label("include"): + if self._summary_enabled: + create_include_gql = f"""CALL db.createLabel( + 'edge', '{self._include_type}', + '[["{self._document_type}","{self._chunk_type}"],["{self._chunk_type}","{self._chunk_type}"],["{self._chunk_type}","{self._vertex_type}"]]', + ["id",STRING,false], + ["name",STRING,false], + ["description",STRING,true])""" + self.conn.run(create_include_gql) + + def _format_query_data(self, data, white_prop_list: List[str]): nodes_list = [] rels_list: List[Any] = [] @@ -313,10 +386,14 @@ def escape_quotes(value: str) -> str: nodes: Iterator[Vertex] = graph.vertices() edges: Iterator[Edge] = graph.edges() - node_list = [] - edge_list = [] - - def parser(node_list): + entity_list = [] + chunk_list = [] + document_list = [] + relation_list = [] + document_include_chunk_list = [] + chunk_include_chunk_list = [] + chunk_include_entity_list = [] + def parser(entity_list): formatted_nodes = [ "{" + ", ".join( @@ -324,42 +401,120 @@ def parser(node_list): for k, v in node.items() ) + "}" - for node in node_list + for node in entity_list ] return f"""{', '.join(formatted_nodes)}""" for node in nodes: - node_list.append( - { - "id": escape_quotes(node.vid), - "name": escape_quotes(node.name), - "description": escape_quotes(node.get_prop("description")) or "", - "_document_id": "0", - "_chunk_id": "0", - "_community_id": "0", - } - ) - node_query = ( - f"""CALL db.upsertVertex("{self._vertex_type}", [{parser(node_list)}])""" + if node.get_prop('vertex_type') == 'chunk': + chunk_list.append( + { + "id": escape_quotes(node.vid), + "name": escape_quotes(node.name) + } + ) + elif node.get_prop('vertex_type') == 'document': + document_list.append( + { + "id": escape_quotes(node.vid), + "name": escape_quotes(node.name) + } + ) + else: + entity_list.append( + { + "id": escape_quotes(node.vid), + "name": escape_quotes(node.name), + "description": escape_quotes(node.get_prop("description")) or "", + "_document_id": "0", + "_chunk_id": "0", + "_community_id": "0", + } + ) + + vertex_query = ( + f"""CALL db.upsertVertex("{self._vertex_type}", [{parser(entity_list)}])""" + ) + chunk_query = ( + f"""CALL db.upsertVertex("{self._chunk_type}", [{parser(chunk_list)}])""" + ) + document_query = ( + f"""CALL db.upsertVertex("{self._document_type}", [{parser(document_list)}])""" ) for edge in edges: - edge_list.append( - { - "sid": escape_quotes(edge.sid), - "tid": escape_quotes(edge.tid), - "id": escape_quotes(edge.name), - "name": escape_quotes(edge.name), - "description": escape_quotes(edge.get_prop("description")), - } + if edge.get_prop('edge_type') == 'document_include_chunk': + document_include_chunk_list.append( + { + "sid": escape_quotes(edge.sid), + "tid": escape_quotes(edge.tid), + "id": escape_quotes(edge.name), + "name": escape_quotes(edge.name), + "description": escape_quotes(edge.get_prop("description")) or "", + } + ) + elif edge.get_prop('edge_type') == 'chunk_include_chunk': + chunk_include_chunk_list.append( + { + "sid": escape_quotes(edge.sid), + "tid": escape_quotes(edge.tid), + "id": escape_quotes(edge.name), + "name": escape_quotes(edge.name), + "description": escape_quotes(edge.get_prop("description")) or "", + } + ) + elif edge.get_prop('edge_type') == 'chunk_include_entity': + chunk_include_entity_list.append( + { + "sid": escape_quotes(edge.sid), + "tid": escape_quotes(edge.tid), + "id": escape_quotes(edge.name), + "name": escape_quotes(edge.name), + "description": escape_quotes(edge.get_prop("description")) or "", + } + ) + else: + relation_list.append( + { + "sid": escape_quotes(edge.sid), + "tid": escape_quotes(edge.tid), + "id": escape_quotes(edge.name), + "name": escape_quotes(edge.name), + "description": escape_quotes(edge.get_prop("description")) or "", + } ) - edge_query = f"""CALL db.upsertEdge( - "{self._edge_type}", + relation_query = ( + f"""CALL db.upsertEdge("{self._edge_type}", {{type:"{self._vertex_type}", key:"sid"}}, {{type:"{self._vertex_type}", key:"tid"}}, - [{parser(edge_list)}])""" - self.conn.run(query=node_query) - self.conn.run(query=edge_query) + [{parser(relation_list)}])""" + ) + document_include_chunk_query = ( + f"""CALL db.upsertEdge("{self._include_type}", + {{type:"{self._document_type}", key:"sid"}}, + {{type:"{self._chunk_type}", key:"tid"}}, + [{parser(document_include_chunk_list)}])""" + ) + chunk_include_chunk_query = ( + f"""CALL db.upsertEdge("{self._include_type}", + {{type:"{self._chunk_type}", key:"sid"}}, + {{type:"{self._chunk_type}", key:"tid"}}, + [{parser(chunk_include_chunk_list)}])""" + ) + chunk_include_entity_query = ( + f"""CALL db.upsertEdge("{self._include_type}", + {{type:"{self._chunk_type}", key:"sid"}}, + {{type:"{self._vertex_type}", key:"tid"}}, + [{parser(chunk_include_entity_list)}])""" + ) + self.conn.run(query=vertex_query) + self.conn.run(query=chunk_query) + self.conn.run(query=document_query) + + self.conn.run(query=relation_query) + self.conn.run(query=document_include_chunk_query) + self.conn.run(query=chunk_include_chunk_query) + self.conn.run(query=chunk_include_entity_query) def truncate(self): """Truncate Graph.""" diff --git a/dbgpt/storage/knowledge_graph/community_summary.py b/dbgpt/storage/knowledge_graph/community_summary.py index a5bf272ac..5f06cdda3 100644 --- a/dbgpt/storage/knowledge_graph/community_summary.py +++ b/dbgpt/storage/knowledge_graph/community_summary.py @@ -2,14 +2,16 @@ import logging import os +import uuid from typing import List, Optional - +import asyncio from dbgpt._private.pydantic import ConfigDict, Field from dbgpt.core import Chunk from dbgpt.rag.transformer.community_summarizer import CommunitySummarizer from dbgpt.rag.transformer.graph_extractor import GraphExtractor from dbgpt.storage.knowledge_graph.community.community_store import CommunityStore from dbgpt.storage.knowledge_graph.community.factory import CommunityStoreAdapterFactory +from dbgpt.storage.graph_store.graph import Edge, Graph, MemoryGraph, Vertex from dbgpt.storage.knowledge_graph.knowledge_graph import ( BuiltinKnowledgeGraph, BuiltinKnowledgeGraphConfig, @@ -132,20 +134,107 @@ def community_store_configure(name: str, cfg: VectorStoreConfig): def get_config(self) -> BuiltinKnowledgeGraphConfig: """Get the knowledge graph config.""" return self._config + + def _parse_chunks(slef, chunks: List[Chunk]): + data = [] + for chunk_index, chunk in enumerate(chunks): + parent = None + directory_keys = list(chunk.metadata.keys())[:-1] + parent_level = directory_keys[-2] if len(directory_keys) > 1 else None + self_level = directory_keys[-1] if directory_keys else 'Header0' + + obj = { + 'id': chunk.chunk_id, + 'title': chunk.metadata.get(self_level, 'none_header_chunk'), + 'directory_keys': directory_keys, + 'level': self_level, + 'content': chunk.content, + 'parent_id': None, + 'parent_title':None, + 'type':'chunk' + } + + if parent_level: + for parent_direct in directory_keys[:-1][::-1]: + parent_name = chunk.metadata.get(parent_direct, None) + for n in range(chunk_index-1, -1, -1): + metadata = chunks[n].metadata + keys = list(metadata.keys())[:-1] + if metadata and parent_direct == keys[-1] and parent_name == metadata.get(parent_direct): + parent = chunks[n] + obj['parent_id'] = parent.chunk_id + obj['parent_title'] = parent_name + break + if chunk_index - n > len(directory_keys): + break + if obj['parent_id']: + break + + if not obj['parent_id']: + obj['parent_id'] = 'document' + + data.append(obj) + + return data + + + + async def aload_document(self, chunks: List[Chunk]) -> List[str]: """Extract and persist graph.""" - # todo add doc node - for chunk in chunks: - # todo add chunk node - # todo add relation doc-chunk + + # check document + doc_name = chunks[0].metadata['source'] or 'Text_Node' + hash_id = str(uuid.uuid4()) + # if chunks[0].metadata['source']: + # source_name =os.path.splitext(chunks[0].metadata['source'])[0] + # hash_id = chunks[0].metadata['hash_id'] + # vertex = self._graph_store.get_document_vertex(doc_name) + # if vertex and hash_id and hash_id != vertex.get_prop('hash_id'): + # self.delete_by_ids(doc_name) - # extract graphs and save - graphs = await self._graph_extractor.extract(chunk.content) + data_list = self._parse_chunks(chunks) + total_graph = MemoryGraph() + + for data in data_list: + chunk_src = Vertex(f"""{data['parent_id']}""",name=data["parent_title"],vertex_type=data["type"]) + chunk_dst = Vertex(f"""{data["id"]}""",name=data["title"],vertex_type=data["type"]) + chunk_include_chunk = Edge(chunk_src.vid,chunk_dst.vid,name=f"include",edge_type="chunk_include_chunk") + if data['parent_id'] == 'document': + chunk_src = Vertex(f"""{hash_id}""",name=doc_name,vertex_type='document') + chunk_include_chunk = Edge(chunk_src.vid,chunk_dst.vid,name=f"include",edge_type="document_include_chunk") + total_graph.upsert_vertex(chunk_src) + total_graph.upsert_vertex(chunk_dst) + total_graph.append_edge(chunk_include_chunk) + graphs = await self._graph_extractor.extract(data["content"]) for graph in graphs: - self._graph_store.insert_graph(graph) + for vertex in graph.vertices(): + print(vertex.get_prop('vertex_type')) + total_graph.upsert_vertex(vertex) + chunk_include_entity = Edge(chunk_dst.vid,vertex.vid,name=f"include",edge_type="chunk_include_entity") + total_graph.append_edge(chunk_include_entity) + for edge in graph.edges(): + total_graph.append_edge(edge) + + self._graph_store.insert_graph(total_graph) + + # for chunk in chunks: + # # extract graphs and save + # graphs = await self._graph_extractor.extract(chunk.content) + # for graph in graphs: + # self._graph_store.insert_graph(graph) + # todo self._graph_store.insert_text_link() + # 1. self._graph_store.insert_chunk(chunk nodes, chunk2chunk edeges) + # 2. self._graph_store.insert_doc(doc nodes,doc2chunk edges) + + # tasks = [self._graph_extractor.extract(chunk.content) for chunk in chunks] + # results = await asyncio.gather(*tasks) + # for result in results: + # self._graph_store.insert_graph(result[0]) # build communities and save + await self._community_store.build_communities() return [chunk.chunk_id for chunk in chunks] @@ -200,6 +289,7 @@ def delete_vector_name(self, index_name: str): self._graph_extractor.drop() + HYBRID_SEARCH_PT_CN = ( "## 角色\n" "你非常擅长结合提示词模板提供的[上下文]信息与[知识图谱]信息," diff --git a/dbgpt/storage/knowledge_graph/knowledge_graph.py b/dbgpt/storage/knowledge_graph/knowledge_graph.py index 066e2667d..99b99c4b0 100644 --- a/dbgpt/storage/knowledge_graph/knowledge_graph.py +++ b/dbgpt/storage/knowledge_graph/knowledge_graph.py @@ -172,3 +172,7 @@ def delete_vector_name(self, index_name: str): logger.info("Drop triplet extractor") self._triplet_extractor.drop() + + # def delete_by_ids(self, ids: str) -> List[str]: + # self._graph_store.delete_document(doc_name = ids) + # return From 961f7ab85f81ac921454dcb9e9c9c2f540795245 Mon Sep 17 00:00:00 2001 From: KingSkyLi <15566300566@163.com> Date: Fri, 20 Sep 2024 15:54:54 +0800 Subject: [PATCH 2/6] fixed insert_graph --- dbgpt/rag/transformer/graph_extractor.py | 4 +- dbgpt/storage/graph_store/graph.py | 20 +- dbgpt/storage/graph_store/tugraph_store.py | 218 +++++++----------- .../knowledge_graph/community_summary.py | 21 +- 4 files changed, 104 insertions(+), 159 deletions(-) diff --git a/dbgpt/rag/transformer/graph_extractor.py b/dbgpt/rag/transformer/graph_extractor.py index 18e867683..092a1d193 100644 --- a/dbgpt/rag/transformer/graph_extractor.py +++ b/dbgpt/rag/transformer/graph_extractor.py @@ -65,7 +65,7 @@ def _parse_response(self, text: str, limit: Optional[int] = None) -> List[Graph] match = re.match(r"\((.*?)#(.*?)\)", line) if match: name, summary = [part.strip() for part in match.groups()] - graph.upsert_vertex(Vertex(name, description=summary)) + graph.upsert_vertex(Vertex(name, description=summary, vertex_type='entity')) elif current_section == "Relationships": match = re.match(r"\((.*?)#(.*?)#(.*?)#(.*?)\)", line) if match: @@ -74,7 +74,7 @@ def _parse_response(self, text: str, limit: Optional[int] = None) -> List[Graph] ] edge_count += 1 graph.append_edge( - Edge(source, target, name, description=summary) + Edge(source, target, name, description=summary, edge_type='relation') ) if limit and edge_count >= limit: diff --git a/dbgpt/storage/graph_store/graph.py b/dbgpt/storage/graph_store/graph.py index 555bcc14d..da4e5c232 100644 --- a/dbgpt/storage/graph_store/graph.py +++ b/dbgpt/storage/graph_store/graph.py @@ -188,11 +188,11 @@ def get_neighbor_edges( """Get neighbor edges.""" @abstractmethod - def vertices(self) -> Iterator[Vertex]: + def vertices(self, vertex_prop:Optional[str] = None) -> Iterator[Vertex]: """Get vertex iterator.""" @abstractmethod - def edges(self) -> Iterator[Edge]: + def edges(self, edge_prop:Optional[str] = None) -> Iterator[Edge]: """Get edge iterator.""" @abstractmethod @@ -335,13 +335,21 @@ def unique_elements(elements): return itertools.islice(es, limit) if limit else es - def vertices(self) -> Iterator[Vertex]: + def vertices(self, vertex_type: Optional[str] = None) -> Iterator[Vertex]: """Return vertices.""" - return iter(self._vs.values()) + return ( + item for item in self._vs.values() + if vertex_type is None or item.get_prop('vertex_type') == vertex_type + ) - def edges(self) -> Iterator[Edge]: + def edges(self, edge_type: Optional[str] = None) -> Iterator[Edge]: """Return edges.""" - return iter(e for nbs in self._oes.values() for es in nbs.values() for e in es) + return ( + e for nbs in self._oes.values() + for es in nbs.values() + for e in es + if edge_type is None or e.get_prop('edge_type') == edge_type + ) def del_vertices(self, *vids: str): """Delete specified vertices.""" diff --git a/dbgpt/storage/graph_store/tugraph_store.py b/dbgpt/storage/graph_store/tugraph_store.py index 469531945..a09240742 100644 --- a/dbgpt/storage/graph_store/tugraph_store.py +++ b/dbgpt/storage/graph_store/tugraph_store.py @@ -86,7 +86,7 @@ def __init__(self, config: TuGraphStoreConfig) -> None: self._vertex_type = os.getenv("TUGRAPH_VERTEX_TYPE", config.vertex_type) self._edge_type = os.getenv("TUGRAPH_EDGE_TYPE", config.edge_type) self._document_type = os.getenv("TUGRAPH_DOCUMENT_TYPE", config.document_type) - self._chunk_type = os.getenv("TUGRAPH_DOCUMENT_TYPE", config.chunk_type) + self._chunk_type = os.getenv("TUGRAPH_CHUNK_TYPE", config.chunk_type) self._include_type = os.getenv("TUGRAPH_INCLUDE_TYPE", config.include_type) self.conn = TuGraphConnector.from_uri_db( @@ -114,14 +114,6 @@ def get_document_vertex(slef,doc_name:str) -> Vertex: return vertex def delete_document(slef, doc_name:str): - # del_relation_gql = f'''MATCH (n:document)-[r:include]-(m:chunk) WHERE r._doc_name = {doc_name} DELETE r''' - # del_chunk_gql = f'''MATCH (n:document)-[r:include]-(m:chunk) WHERE n.id = {doc_name} DELETE m''' - # del_doc_gql = f'''MATCH (n:document) WHERE n.id = {doc_name} DELETE n''' - # del_alone_nodes_gql = f'''MATCH (n:document) WHERE n.id = {doc_name} DELETE n''' - # slef.conn.run(del_relation_gql) - # slef.conn.run(del_chunk_gql) - # slef.conn.run(del_doc_gql) - # slef.conn.run(del_alone_nodes_gql) pass def _create_graph(self, graph_name: str): @@ -181,6 +173,7 @@ def _upload_plugin(self): self.conn.run(gql) def _create_schema(self): + # This part of the code needs optimization. if not self._check_label("vertex"): if self._summary_enabled: create_vertex_gql = ( @@ -201,7 +194,6 @@ def _create_schema(self): f"'vertex', '{self._vertex_type}', " f"'id', ['id',string,false]," f"['name',string,false])", - ) self.conn.run(create_vertex_gql) @@ -239,6 +231,7 @@ def _create_schema(self): f"CALL db.createLabel(" f"'vertex', '{self._chunk_type}', " f"'id', ['id',string,false]," + f"['content',string,true]," f"['_community_id',string,true]," f"['name',string,false])" ) @@ -337,6 +330,23 @@ def process_other(value): ] return {"nodes": nodes, "edges": rels} + def _escape_quotes(self, value: str) -> str: + """Escape single and double quotes in a string for queries.""" + if value is not None: + return value.replace("'", "").replace('"', "") + + def _parser(self, entity_list): + formatted_nodes = [ + "{" + + ", ".join( + f'{k}: "{v}"' if isinstance(v, str) else f"{k}: {v}" + for k, v in node.items() + ) + + "}" + for node in entity_list + ] + return f"""{', '.join(formatted_nodes)}""" + def get_config(self): """Get the graph store config.""" return self._config @@ -376,145 +386,79 @@ def escape_quotes(value: str) -> str: self.conn.run(query=node_query) self.conn.run(query=edge_query) - def insert_graph(self, graph: Graph) -> None: - """Add graph.""" - - def escape_quotes(value: str) -> str: - """Escape single and double quotes in a string for queries.""" - if value is not None: - return value.replace("'", "").replace('"', "") - - nodes: Iterator[Vertex] = graph.vertices() - edges: Iterator[Edge] = graph.edges() - entity_list = [] - chunk_list = [] - document_list = [] - relation_list = [] - document_include_chunk_list = [] - chunk_include_chunk_list = [] - chunk_include_entity_list = [] - def parser(entity_list): - formatted_nodes = [ - "{" - + ", ".join( - f'{k}: "{v}"' if isinstance(v, str) else f"{k}: {v}" - for k, v in node.items() - ) - + "}" - for node in entity_list - ] - return f"""{', '.join(formatted_nodes)}""" - - for node in nodes: - if node.get_prop('vertex_type') == 'chunk': - chunk_list.append( - { - "id": escape_quotes(node.vid), - "name": escape_quotes(node.name) - } - ) - elif node.get_prop('vertex_type') == 'document': - document_list.append( - { - "id": escape_quotes(node.vid), - "name": escape_quotes(node.name) - } - ) - else: - entity_list.append( - { - "id": escape_quotes(node.vid), - "name": escape_quotes(node.name), - "description": escape_quotes(node.get_prop("description")) or "", + def _upsert_entities(self, entities): + entity_list = [{ + "id": self._escape_quotes(entity.vid), + "name": self._escape_quotes(entity.name), + "description": self._escape_quotes(entity.get_prop("description")) or "", "_document_id": "0", "_chunk_id": "0", "_community_id": "0", - } - ) - - vertex_query = ( - f"""CALL db.upsertVertex("{self._vertex_type}", [{parser(entity_list)}])""" + } for entity in entities] + entity_query = ( + f"""CALL db.upsertVertex("{self._vertex_type}", [{self._parser(entity_list)}])""" ) + self.conn.run(query=entity_query) + + def _upsert_chunks(self, chunks): + chunk_list = [{ "id": self._escape_quotes(chunk.vid),"name": self._escape_quotes(chunk.name),"content": self._escape_quotes(chunk.get_prop('content'))} for chunk in chunks] chunk_query = ( - f"""CALL db.upsertVertex("{self._chunk_type}", [{parser(chunk_list)}])""" + f"""CALL db.upsertVertex("{self._chunk_type}", [{self._parser(chunk_list)}])""" ) + self.conn.run(query=chunk_query) + + def _upsert_documents(self, documents): + document_list = [{ "id": self._escape_quotes(document.vid),"name": self._escape_quotes(document.name), "content": self._escape_quotes(document.get_prop('content')) or ''} for document in documents] document_query = ( - f"""CALL db.upsertVertex("{self._document_type}", [{parser(document_list)}])""" + f"""CALL db.upsertVertex("{self._document_type}", [{self._parser(document_list)}])""" ) - for edge in edges: - if edge.get_prop('edge_type') == 'document_include_chunk': - document_include_chunk_list.append( - { - "sid": escape_quotes(edge.sid), - "tid": escape_quotes(edge.tid), - "id": escape_quotes(edge.name), - "name": escape_quotes(edge.name), - "description": escape_quotes(edge.get_prop("description")) or "", - } - ) - elif edge.get_prop('edge_type') == 'chunk_include_chunk': - chunk_include_chunk_list.append( - { - "sid": escape_quotes(edge.sid), - "tid": escape_quotes(edge.tid), - "id": escape_quotes(edge.name), - "name": escape_quotes(edge.name), - "description": escape_quotes(edge.get_prop("description")) or "", - } - ) - elif edge.get_prop('edge_type') == 'chunk_include_entity': - chunk_include_entity_list.append( - { - "sid": escape_quotes(edge.sid), - "tid": escape_quotes(edge.tid), - "id": escape_quotes(edge.name), - "name": escape_quotes(edge.name), - "description": escape_quotes(edge.get_prop("description")) or "", - } - ) - else: - relation_list.append( - { - "sid": escape_quotes(edge.sid), - "tid": escape_quotes(edge.tid), - "id": escape_quotes(edge.name), - "name": escape_quotes(edge.name), - "description": escape_quotes(edge.get_prop("description")) or "", - } - ) + self.conn.run(query=document_query) + def _upsert_edge(self, edges, edge_type, src_type, dst_type): + edge_list = [{ + "sid": self._escape_quotes(edge.sid), + "tid": self._escape_quotes(edge.tid), + "id": self._escape_quotes(edge.name), + "name": self._escape_quotes(edge.name), + "description": self._escape_quotes(edge.get_prop("description")) or "", + } for edge in edges] relation_query = ( - f"""CALL db.upsertEdge("{self._edge_type}", - {{type:"{self._vertex_type}", key:"sid"}}, - {{type:"{self._vertex_type}", key:"tid"}}, - [{parser(relation_list)}])""" + f"""CALL db.upsertEdge("{edge_type}", + {{type:"{src_type}", key:"sid"}}, + {{type:"{dst_type}", key:"tid"}}, + [{self._parser(edge_list)}])""" ) - document_include_chunk_query = ( - f"""CALL db.upsertEdge("{self._include_type}", - {{type:"{self._document_type}", key:"sid"}}, - {{type:"{self._chunk_type}", key:"tid"}}, - [{parser(document_include_chunk_list)}])""" - ) - chunk_include_chunk_query = ( - f"""CALL db.upsertEdge("{self._include_type}", - {{type:"{self._chunk_type}", key:"sid"}}, - {{type:"{self._chunk_type}", key:"tid"}}, - [{parser(chunk_include_chunk_list)}])""" - ) - chunk_include_entity_query = ( - f"""CALL db.upsertEdge("{self._include_type}", - {{type:"{self._chunk_type}", key:"sid"}}, - {{type:"{self._vertex_type}", key:"tid"}}, - [{parser(chunk_include_entity_list)}])""" - ) - self.conn.run(query=vertex_query) - self.conn.run(query=chunk_query) - self.conn.run(query=document_query) - self.conn.run(query=relation_query) - self.conn.run(query=document_include_chunk_query) - self.conn.run(query=chunk_include_chunk_query) - self.conn.run(query=chunk_include_entity_query) + + + def _upsert_chunk_include_chunk(self,edges): + pass + + def _upsert_chunk_include_entity(self,edges): + pass + + def _upsert_relation(self,edges): + pass + + def insert_graph(self, graph: Graph) -> None: + # This part of the code needs optimization. + """Add graph.""" + + documents: Iterator[Vertex] = graph.vertices('document') + doc_include_chunk: Iterator[Edge] = graph.edges('document_include_chunk') + chunks: Iterator[Vertex] = graph.vertices('chunk') + chunk_include_chunk: Iterator[Edge] = graph.edges('chunk_include_chunk') + chunk_include_entity: Iterator[Edge] = graph.edges('chunk_include_entity') + entities: Iterator[Vertex] = graph.vertices('entity') + relaiton: Iterator[Edge] = graph.edges('relaiton') + self._upsert_entities(entities) + self._upsert_chunks(chunks) + self._upsert_documents(documents) + self._upsert_edge(doc_include_chunk, self._include_type, self._document_type, self._chunk_type) + self._upsert_edge(chunk_include_chunk, self._include_type, self._chunk_type, self._chunk_type) + self._upsert_edge(chunk_include_entity, self._include_type, self._chunk_type, self._vertex_type) + self._upsert_edge(relaiton, self._edge_type, self._vertex_type, self._vertex_type) + def truncate(self): """Truncate Graph.""" diff --git a/dbgpt/storage/knowledge_graph/community_summary.py b/dbgpt/storage/knowledge_graph/community_summary.py index 5f06cdda3..62f2d85d6 100644 --- a/dbgpt/storage/knowledge_graph/community_summary.py +++ b/dbgpt/storage/knowledge_graph/community_summary.py @@ -187,22 +187,23 @@ async def aload_document(self, chunks: List[Chunk]) -> List[str]: # check document doc_name = chunks[0].metadata['source'] or 'Text_Node' hash_id = str(uuid.uuid4()) + # need hash_id of doc # if chunks[0].metadata['source']: # source_name =os.path.splitext(chunks[0].metadata['source'])[0] # hash_id = chunks[0].metadata['hash_id'] # vertex = self._graph_store.get_document_vertex(doc_name) - # if vertex and hash_id and hash_id != vertex.get_prop('hash_id'): + # if hash_id and vertex and hash_id != vertex.get_prop('hash_id'): # self.delete_by_ids(doc_name) data_list = self._parse_chunks(chunks) total_graph = MemoryGraph() for data in data_list: - chunk_src = Vertex(f"""{data['parent_id']}""",name=data["parent_title"],vertex_type=data["type"]) - chunk_dst = Vertex(f"""{data["id"]}""",name=data["title"],vertex_type=data["type"]) + chunk_src = Vertex(f"""{data['parent_id']}""",name=data["parent_title"],vertex_type=data["type"],content=data["content"]) + chunk_dst = Vertex(f"""{data["id"]}""",name=data["title"],vertex_type=data["type"],content=data["content"]) chunk_include_chunk = Edge(chunk_src.vid,chunk_dst.vid,name=f"include",edge_type="chunk_include_chunk") if data['parent_id'] == 'document': - chunk_src = Vertex(f"""{hash_id}""",name=doc_name,vertex_type='document') + chunk_src = Vertex(f"""{hash_id}""",name=doc_name,vertex_type='document',content=data["content"]) chunk_include_chunk = Edge(chunk_src.vid,chunk_dst.vid,name=f"include",edge_type="document_include_chunk") total_graph.upsert_vertex(chunk_src) total_graph.upsert_vertex(chunk_dst) @@ -219,20 +220,12 @@ async def aload_document(self, chunks: List[Chunk]) -> List[str]: self._graph_store.insert_graph(total_graph) - # for chunk in chunks: - # # extract graphs and save - # graphs = await self._graph_extractor.extract(chunk.content) - # for graph in graphs: - # self._graph_store.insert_graph(graph) - - # todo self._graph_store.insert_text_link() - # 1. self._graph_store.insert_chunk(chunk nodes, chunk2chunk edeges) - # 2. self._graph_store.insert_doc(doc nodes,doc2chunk edges) - + # use asyncio.gather # tasks = [self._graph_extractor.extract(chunk.content) for chunk in chunks] # results = await asyncio.gather(*tasks) # for result in results: # self._graph_store.insert_graph(result[0]) + # build communities and save await self._community_store.build_communities() From bc5ffda9e9de2e2ad1e1d5bfdce6b1373d9e6381 Mon Sep 17 00:00:00 2001 From: KingSkyLi <15566300566@163.com> Date: Wed, 25 Sep 2024 11:25:34 +0800 Subject: [PATCH 3/6] get source file name --- .../knowledge_graph/community_summary.py | 33 ++++++++----------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/dbgpt/storage/knowledge_graph/community_summary.py b/dbgpt/storage/knowledge_graph/community_summary.py index 62f2d85d6..b1b7ccbc0 100644 --- a/dbgpt/storage/knowledge_graph/community_summary.py +++ b/dbgpt/storage/knowledge_graph/community_summary.py @@ -185,16 +185,10 @@ async def aload_document(self, chunks: List[Chunk]) -> List[str]: """Extract and persist graph.""" # check document - doc_name = chunks[0].metadata['source'] or 'Text_Node' + file_path = chunks[0].metadata['source'] or 'Text_Node' + doc_name = os.path.basename(file_path) hash_id = str(uuid.uuid4()) - # need hash_id of doc - # if chunks[0].metadata['source']: - # source_name =os.path.splitext(chunks[0].metadata['source'])[0] - # hash_id = chunks[0].metadata['hash_id'] - # vertex = self._graph_store.get_document_vertex(doc_name) - # if hash_id and vertex and hash_id != vertex.get_prop('hash_id'): - # self.delete_by_ids(doc_name) - + data_list = self._parse_chunks(chunks) total_graph = MemoryGraph() @@ -208,16 +202,17 @@ async def aload_document(self, chunks: List[Chunk]) -> List[str]: total_graph.upsert_vertex(chunk_src) total_graph.upsert_vertex(chunk_dst) total_graph.append_edge(chunk_include_chunk) - graphs = await self._graph_extractor.extract(data["content"]) - for graph in graphs: - for vertex in graph.vertices(): - print(vertex.get_prop('vertex_type')) - total_graph.upsert_vertex(vertex) - chunk_include_entity = Edge(chunk_dst.vid,vertex.vid,name=f"include",edge_type="chunk_include_entity") - total_graph.append_edge(chunk_include_entity) - for edge in graph.edges(): - total_graph.append_edge(edge) - + if os.getenv('ONLY_EXTRACT_DOCUMENT_STRUCTURE').lower() != 'true': + graphs = await self._graph_extractor.extract(data["content"]) + for graph in graphs: + for vertex in graph.vertices(): + print(vertex.get_prop('vertex_type')) + total_graph.upsert_vertex(vertex) + chunk_include_entity = Edge(chunk_dst.vid,vertex.vid,name=f"include",edge_type="chunk_include_entity") + total_graph.append_edge(chunk_include_entity) + for edge in graph.edges(): + total_graph.append_edge(edge) + self._graph_store.insert_graph(total_graph) # use asyncio.gather From d1717357c043bc87a4553a97d29671505f74f92b Mon Sep 17 00:00:00 2001 From: KingSkyLi <15566300566@163.com> Date: Fri, 27 Sep 2024 12:20:14 +0800 Subject: [PATCH 4/6] add delete document; --- dbgpt/storage/graph_store/tugraph_store.py | 17 +++++++++++++---- dbgpt/storage/knowledge_graph/base.py | 4 +++- .../knowledge_graph/community_summary.py | 4 +++- .../storage/knowledge_graph/knowledge_graph.py | 6 +++--- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/dbgpt/storage/graph_store/tugraph_store.py b/dbgpt/storage/graph_store/tugraph_store.py index a09240742..e815b55dd 100644 --- a/dbgpt/storage/graph_store/tugraph_store.py +++ b/dbgpt/storage/graph_store/tugraph_store.py @@ -113,8 +113,15 @@ def get_document_vertex(slef,doc_name:str) -> Vertex: vertex = graph.get_vertex(doc_name) return vertex - def delete_document(slef, doc_name:str): - pass + def delete_document(self, chunk_ids:str): + chunkids_list = [uuid.strip() for uuid in chunk_ids.split(',')] + del_chunk_gql = f"MATCH(m:{self._document_type})-[r]->(n:{self._chunk_type}) WHERE n.id IN {chunkids_list} DELETE n" + del_relation_gql = f"MATCH(m:{self._vertex_type})-[r:{self._edge_type}]-(n:{self._vertex_type}) WHERE r._chunk_id IN {chunkids_list} DELETE r" + delete_only_vertex = "MATCH (n) WHERE NOT EXISTS((n)-[]-()) DELETE n" + self.conn.run(del_chunk_gql) + self.conn.run(del_relation_gql) + self.conn.run(delete_only_vertex) + def _create_graph(self, graph_name: str): self.conn.create_graph(graph_name=graph_name) @@ -211,6 +218,7 @@ def _create_schema(self): "{self._vertex_type}"]]', ["id",STRING,false], ["name",STRING,false], + ["_chunk_id",STRING,true], ["description",STRING,true])""" self.conn.run(create_edge_gql) @@ -421,6 +429,7 @@ def _upsert_edge(self, edges, edge_type, src_type, dst_type): "id": self._escape_quotes(edge.name), "name": self._escape_quotes(edge.name), "description": self._escape_quotes(edge.get_prop("description")) or "", + "_chunk_id": self._escape_quotes(edge.get_prop("_chunk_id")) or "", } for edge in edges] relation_query = ( f"""CALL db.upsertEdge("{edge_type}", @@ -450,14 +459,14 @@ def insert_graph(self, graph: Graph) -> None: chunk_include_chunk: Iterator[Edge] = graph.edges('chunk_include_chunk') chunk_include_entity: Iterator[Edge] = graph.edges('chunk_include_entity') entities: Iterator[Vertex] = graph.vertices('entity') - relaiton: Iterator[Edge] = graph.edges('relaiton') + relation: Iterator[Edge] = graph.edges('relation') self._upsert_entities(entities) self._upsert_chunks(chunks) self._upsert_documents(documents) self._upsert_edge(doc_include_chunk, self._include_type, self._document_type, self._chunk_type) self._upsert_edge(chunk_include_chunk, self._include_type, self._chunk_type, self._chunk_type) self._upsert_edge(chunk_include_entity, self._include_type, self._chunk_type, self._vertex_type) - self._upsert_edge(relaiton, self._edge_type, self._vertex_type, self._vertex_type) + self._upsert_edge(relation, self._edge_type, self._vertex_type, self._vertex_type) def truncate(self): diff --git a/dbgpt/storage/knowledge_graph/base.py b/dbgpt/storage/knowledge_graph/base.py index e47094bba..1447c22e6 100644 --- a/dbgpt/storage/knowledge_graph/base.py +++ b/dbgpt/storage/knowledge_graph/base.py @@ -29,4 +29,6 @@ def query_graph(self, limit: Optional[int] = None) -> Graph: def delete_by_ids(self, ids: str) -> List[str]: """Delete document by ids.""" - raise Exception("Delete document not supported by knowledge graph") + # raise Exception("Delete document not supported by knowledge graph") + + diff --git a/dbgpt/storage/knowledge_graph/community_summary.py b/dbgpt/storage/knowledge_graph/community_summary.py index b1b7ccbc0..6024a7661 100644 --- a/dbgpt/storage/knowledge_graph/community_summary.py +++ b/dbgpt/storage/knowledge_graph/community_summary.py @@ -206,11 +206,11 @@ async def aload_document(self, chunks: List[Chunk]) -> List[str]: graphs = await self._graph_extractor.extract(data["content"]) for graph in graphs: for vertex in graph.vertices(): - print(vertex.get_prop('vertex_type')) total_graph.upsert_vertex(vertex) chunk_include_entity = Edge(chunk_dst.vid,vertex.vid,name=f"include",edge_type="chunk_include_entity") total_graph.append_edge(chunk_include_entity) for edge in graph.edges(): + edge.set_prop('_chunk_id',chunk_dst.vid) total_graph.append_edge(edge) self._graph_store.insert_graph(total_graph) @@ -276,6 +276,8 @@ def delete_vector_name(self, index_name: str): logger.info("Drop triplet extractor") self._graph_extractor.drop() + + HYBRID_SEARCH_PT_CN = ( diff --git a/dbgpt/storage/knowledge_graph/knowledge_graph.py b/dbgpt/storage/knowledge_graph/knowledge_graph.py index 99b99c4b0..280d8c7f2 100644 --- a/dbgpt/storage/knowledge_graph/knowledge_graph.py +++ b/dbgpt/storage/knowledge_graph/knowledge_graph.py @@ -173,6 +173,6 @@ def delete_vector_name(self, index_name: str): logger.info("Drop triplet extractor") self._triplet_extractor.drop() - # def delete_by_ids(self, ids: str) -> List[str]: - # self._graph_store.delete_document(doc_name = ids) - # return + def delete_by_ids(self, ids: str) -> List[str]: + self._graph_store.delete_document(chunk_ids = ids) + return [] From ecbc3fd0bfd1acc0cdbb9581d4e5c3438b2f0638 Mon Sep 17 00:00:00 2001 From: KingSkyLi <15566300566@163.com> Date: Fri, 27 Sep 2024 15:14:49 +0800 Subject: [PATCH 5/6] change create label function, graph_vis api, local query --- dbgpt/app/knowledge/service.py | 6 +- dbgpt/storage/graph_store/base.py | 8 + dbgpt/storage/graph_store/tugraph_store.py | 218 +++++++++++------- .../knowledge_graph/community_summary.py | 3 +- .../knowledge_graph/knowledge_graph.py | 1 + 5 files changed, 150 insertions(+), 86 deletions(-) diff --git a/dbgpt/app/knowledge/service.py b/dbgpt/app/knowledge/service.py index 0aecac6e7..f40001b5b 100644 --- a/dbgpt/app/knowledge/service.py +++ b/dbgpt/app/knowledge/service.py @@ -650,12 +650,12 @@ def query_graph(self, space_name, limit): { "id": node.vid, "communityId": node.get_prop("_community_id"), - "name": node.vid, - "type": "", + "name": node.name, + "type": node.get_prop("type") or "" } ) for edge in graph.edges(): res["edges"].append( - {"source": edge.sid, "target": edge.tid, "name": edge.name, "type": ""} + {"source": edge.sid, "target": edge.tid, "name": edge.name, "type": edge.get_prop("type") or ""} ) return res diff --git a/dbgpt/storage/graph_store/base.py b/dbgpt/storage/graph_store/base.py index 8a3a0493f..e448c63f0 100644 --- a/dbgpt/storage/graph_store/base.py +++ b/dbgpt/storage/graph_store/base.py @@ -95,6 +95,14 @@ def explore( ) -> Graph: """Explore on graph.""" + def explore_text_link( + self, + subs: List[str], + depth: Optional[int] = None, + limit: Optional[int] = None, + ) -> Graph: + """Explore text link on graph.""" + @abstractmethod def query(self, query: str, **args) -> Graph: """Execute a query.""" diff --git a/dbgpt/storage/graph_store/tugraph_store.py b/dbgpt/storage/graph_store/tugraph_store.py index e815b55dd..2209f1729 100644 --- a/dbgpt/storage/graph_store/tugraph_store.py +++ b/dbgpt/storage/graph_store/tugraph_store.py @@ -125,20 +125,113 @@ def delete_document(self, chunk_ids:str): def _create_graph(self, graph_name: str): self.conn.create_graph(graph_name=graph_name) - self._create_schema() + document_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "_community_id", + "type": "STRING", + "optional": True, + "index": True + } + ] + self._create_schema(label_name=self._document_type,label_type='VERTEX',data=json.dumps({"label":self._document_type,"type":"VERTEX", "primary":"id","properties":document_proerties})) + chunk_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "_community_id", + "type": "STRING", + "optional": True, + "index": True + }, { + "name": "content", + "type": "STRING", + "optional": True, + "index": True + } + ] + self._create_schema(label_name=self._chunk_type,label_type='VERTEX',data=json.dumps({"label":self._chunk_type,"type":"VERTEX", "primary":"id","properties":chunk_proerties})) + vertex_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "_community_id", + "type": "STRING", + "optional": True, + "index": True + }, { + "name": "description", + "type": "STRING", + "optional": True, + "index": True + } + ] + self._create_schema(label_name=self._vertex_type,label_type='VERTEX',data=json.dumps({"label":self._vertex_type,"type":"VERTEX", "primary":"id","properties":vertex_proerties})) + edge_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "_chunk_id", + "type": "STRING", + "optional": True, + "index": True + }, { + "name": "description", + "type": "STRING", + "optional": True, + "index": True + } + ] + self._create_schema(label_name=self._edge_type,label_type='EDGE',data=json.dumps({"label":self._edge_type,"type":"EDGE", "constraints":[[self._vertex_type,self._vertex_type]],"properties":edge_proerties})) + include_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "description", + "type": "STRING", + "optional": True, + "index": True + } + ] + self._create_schema(label_name=self._include_type,label_type='EDGE',data=json.dumps({"label":self._include_type,"type":"EDGE", "constraints":[[self._document_type,self._chunk_type],[self._chunk_type,self._vertex_type]],"properties":include_proerties})) if self._summary_enabled: self._upload_plugin() def _check_label(self, elem_type: str): result = self.conn.get_table_names() - if elem_type == "vertex": + if elem_type == "entity": return self._vertex_type in result["vertex_tables"] if elem_type == "chunk": return self._chunk_type in result["vertex_tables"] if elem_type == "document": return self._document_type in result["vertex_tables"] - - if elem_type == "edge": + if elem_type == "relation": return self._edge_type in result["edge_tables"] if elem_type == "include": return self._include_type in result["edge_tables"] @@ -179,82 +272,13 @@ def _upload_plugin(self): ) self.conn.run(gql) - def _create_schema(self): - # This part of the code needs optimization. - if not self._check_label("vertex"): - if self._summary_enabled: - create_vertex_gql = ( - f"CALL db.createLabel(" - f"'vertex', '{self._vertex_type}', " - f"'id', ['id',string,false]," - f"['name',string,false]," - f"['_document_id',string,true]," - f"['_chunk_id',string,true]," - f"['_community_id',string,true]," - f"['description',string,true])" - ) - self.conn.run(create_vertex_gql) - self._add_vertex_index("_community_id") + def _create_schema(self,label_name:str, label_type:str, data:Any): + if not self._check_label(label_name): + if label_type == 'VERTEX': + gql = f'''CALL db.createVertexLabelByJson('{data}')''' else: - create_vertex_gql = ( - f"CALL db.createLabel(" - f"'vertex', '{self._vertex_type}', " - f"'id', ['id',string,false]," - f"['name',string,false])", - ) - self.conn.run(create_vertex_gql) - - if not self._check_label("edge"): - create_edge_gql = f"""CALL db.createLabel( - 'edge', '{self._edge_type}', - '[["{self._vertex_type}", - "{self._vertex_type}"]]', - ["id",STRING,false], - ["name",STRING,false])""" - if self._summary_enabled: - create_edge_gql = f"""CALL db.createLabel( - 'edge', '{self._edge_type}', - '[["{self._vertex_type}", - "{self._vertex_type}"]]', - ["id",STRING,false], - ["name",STRING,false], - ["_chunk_id",STRING,true], - ["description",STRING,true])""" - self.conn.run(create_edge_gql) - - if not self._check_label("document"): - if self._summary_enabled: - create_document_gql = ( - f"CALL db.createLabel(" - f"'vertex', '{self._document_type}', " - f"'id', ['id',string,false]," - f"['_community_id',string,true]," - f"['name',string,false])" - ) - self.conn.run(create_document_gql) - - if not self._check_label("chunk"): - if self._summary_enabled: - create_document_gql = ( - f"CALL db.createLabel(" - f"'vertex', '{self._chunk_type}', " - f"'id', ['id',string,false]," - f"['content',string,true]," - f"['_community_id',string,true]," - f"['name',string,false])" - ) - self.conn.run(create_document_gql) - - if not self._check_label("include"): - if self._summary_enabled: - create_include_gql = f"""CALL db.createLabel( - 'edge', '{self._include_type}', - '[["{self._document_type}","{self._chunk_type}"],["{self._chunk_type}","{self._chunk_type}"],["{self._chunk_type}","{self._vertex_type}"]]', - ["id",STRING,false], - ["name",STRING,false], - ["description",STRING,true])""" - self.conn.run(create_include_gql) - + gql = f'''CALL db.createEdgeLabelByJson('{data}')''' + self.conn.run(gql) def _format_query_data(self, data, white_prop_list: List[str]): nodes_list = [] @@ -273,14 +297,16 @@ def get_filtered_properties(properties, white_list): def process_node(node: graph.Node): node_id = node._properties.get("id") node_name = node._properties.get("name") + node_type = next(iter(node._labels)) node_properties = get_filtered_properties(node._properties, _white_list) nodes_list.append( - {"id": node_id, "name": node_name, "properties": node_properties} + {"id": node_id,"type":node_type, "name": node_name, "properties": node_properties} ) def process_relationship(rel: graph.Relationship): name = rel._properties.get("name", "") rel_nodes = rel.nodes + rel_type = rel.type src_id = rel_nodes[0]._properties.get("id") dst_id = rel_nodes[1]._properties.get("id") for node in rel_nodes: @@ -297,6 +323,7 @@ def process_relationship(rel: graph.Relationship): "src_id": src_id, "dst_id": dst_id, "name": name, + "type":rel_type, "properties": edge_properties, } ) @@ -313,6 +340,7 @@ def process_other(value): { "id": "json_node", "name": "json_node", + "type": "json_node", "properties": {"description": value}, } ) @@ -329,11 +357,11 @@ def process_other(value): else: process_other(value) nodes = [ - Vertex(node["id"], node["name"], **node["properties"]) + Vertex(node["id"], node["name"], **{"type": node["type"], **node["properties"]}) for node in nodes_list ] rels = [ - Edge(edge["src_id"], edge["dst_id"], edge["name"], **edge["properties"]) + Edge(edge["src_id"], edge["dst_id"],edge["name"], **{"type": edge["type"], **edge["properties"]}) for edge in rels_list ] return {"nodes": nodes, "edges": rels} @@ -543,6 +571,32 @@ def explore( f"WHERE n.id IN {subs} RETURN p {limit_string}" ) return self.query(query) + + def explore_text_link( + self, + subs:List[str], + depth: Optional[int] = None, + limit: Optional[int] = None, + ) -> Graph: + """Explore the graph text link.""" + if not subs: + return MemoryGraph() + depth_string = f"1..{depth}" + if depth is None: + depth_string = ".." + limit_string = f"LIMIT {limit}" + if limit is None: + limit_string = "" + graph = MemoryGraph() + for sub in subs: + query = f"MATCH p=(n:{self._document_type})-[r:{self._include_type}*{depth_string}]-(m:{self._chunk_type})WHERE m.content CONTAINS '{sub}' RETURN p {limit_string}" + result = self.query(query) + for vertex in result.vertices(): + graph.upsert_vertex(vertex) + for edge in result.edges(): + graph.append_edge(edge) + + return graph def query(self, query: str, **args) -> MemoryGraph: """Execute a query on graph.""" diff --git a/dbgpt/storage/knowledge_graph/community_summary.py b/dbgpt/storage/knowledge_graph/community_summary.py index 6024a7661..f9462e082 100644 --- a/dbgpt/storage/knowledge_graph/community_summary.py +++ b/dbgpt/storage/knowledge_graph/community_summary.py @@ -246,13 +246,14 @@ async def asimilar_search_with_scores( # local search: extract keywords and explore subgraph keywords = await self._keyword_extractor.extract(text) subgraph = self._graph_store.explore(keywords, limit=topk).format() + document_subgraph = self._graph_store.explore_text_link(keywords, limit=topk).format() logger.info(f"Search subgraph from {len(keywords)} keywords") if not summaries and not subgraph: return [] # merge search results into context - content = HYBRID_SEARCH_PT_CN.format(context=context, graph=subgraph) + content = HYBRID_SEARCH_PT_CN.format(context=context, graph=subgraph, document_subgraph = document_subgraph) return [Chunk(content=content)] def truncate(self) -> List[str]: diff --git a/dbgpt/storage/knowledge_graph/knowledge_graph.py b/dbgpt/storage/knowledge_graph/knowledge_graph.py index 280d8c7f2..d22ebbb07 100644 --- a/dbgpt/storage/knowledge_graph/knowledge_graph.py +++ b/dbgpt/storage/knowledge_graph/knowledge_graph.py @@ -117,6 +117,7 @@ async def asimilar_search_with_scores( # extract keywords and explore graph store keywords = await self._keyword_extractor.extract(text) subgraph = self._graph_store.explore(keywords, limit=topk).format() + logger.info(f"Search subgraph from {len(keywords)} keywords") if not subgraph: From e3f9c09b79fb9bfbde5d908864c26ae909aff4a2 Mon Sep 17 00:00:00 2001 From: KingSkyLi <15566300566@163.com> Date: Fri, 27 Sep 2024 17:56:31 +0800 Subject: [PATCH 6/6] add next edge; --- dbgpt/storage/graph_store/tugraph_store.py | 32 ++++++++++++++++--- .../knowledge_graph/community_summary.py | 16 +++++----- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/dbgpt/storage/graph_store/tugraph_store.py b/dbgpt/storage/graph_store/tugraph_store.py index 2209f1729..32a856014 100644 --- a/dbgpt/storage/graph_store/tugraph_store.py +++ b/dbgpt/storage/graph_store/tugraph_store.py @@ -54,6 +54,10 @@ class TuGraphStoreConfig(GraphStoreConfig): default="include", description="The type of include edge, `include` by default.", ) + next_type:str = Field( + default="next", + description="The type of next edge, `next` by default.", + ) plugin_names: List[str] = Field( default=["leiden"], description=( @@ -88,6 +92,7 @@ def __init__(self, config: TuGraphStoreConfig) -> None: self._document_type = os.getenv("TUGRAPH_DOCUMENT_TYPE", config.document_type) self._chunk_type = os.getenv("TUGRAPH_CHUNK_TYPE", config.chunk_type) self._include_type = os.getenv("TUGRAPH_INCLUDE_TYPE", config.include_type) + self._next_type = os.getenv("TUGRAPH_NEXT_TYPE", config.next_type) self.conn = TuGraphConnector.from_uri_db( host=self._host, @@ -215,11 +220,25 @@ def _create_graph(self, graph_name: str): }, { "name": "description", "type": "STRING", - "optional": True, - "index": True + "optional": True + } + ] + self._create_schema(label_name=self._include_type,label_type='EDGE',data=json.dumps({"label":self._include_type,"type":"EDGE", "constraints":[[self._document_type,self._chunk_type],[self._chunk_type,self._chunk_type],[self._chunk_type,self._vertex_type]],"properties":include_proerties})) + next_proerties = [{ + "name": "id", + "type": "STRING", + "optional": False + }, { + "name": "name", + "type": "STRING", + "optional": False, + }, { + "name": "description", + "type": "STRING", + "optional": True } ] - self._create_schema(label_name=self._include_type,label_type='EDGE',data=json.dumps({"label":self._include_type,"type":"EDGE", "constraints":[[self._document_type,self._chunk_type],[self._chunk_type,self._vertex_type]],"properties":include_proerties})) + self._create_schema(label_name=self._next_type,label_type='EDGE',data=json.dumps({"label":self._next_type,"type":"EDGE", "constraints":[[self._chunk_type,self._chunk_type]],"properties":next_proerties})) if self._summary_enabled: self._upload_plugin() @@ -235,6 +254,8 @@ def _check_label(self, elem_type: str): return self._edge_type in result["edge_tables"] if elem_type == "include": return self._include_type in result["edge_tables"] + if elem_type == "next": + return self._next_type in result["edge_tables"] def _add_vertex_index(self, field_name): gql = f"CALL db.addIndex('{self._vertex_type}', '{field_name}', false)" @@ -485,8 +506,9 @@ def insert_graph(self, graph: Graph) -> None: doc_include_chunk: Iterator[Edge] = graph.edges('document_include_chunk') chunks: Iterator[Vertex] = graph.vertices('chunk') chunk_include_chunk: Iterator[Edge] = graph.edges('chunk_include_chunk') - chunk_include_entity: Iterator[Edge] = graph.edges('chunk_include_entity') + chunk_next_chunk: Iterator[Edge] = graph.edges('chunk_next_chunk') entities: Iterator[Vertex] = graph.vertices('entity') + chunk_include_entity: Iterator[Edge] = graph.edges('chunk_include_entity') relation: Iterator[Edge] = graph.edges('relation') self._upsert_entities(entities) self._upsert_chunks(chunks) @@ -494,9 +516,11 @@ def insert_graph(self, graph: Graph) -> None: self._upsert_edge(doc_include_chunk, self._include_type, self._document_type, self._chunk_type) self._upsert_edge(chunk_include_chunk, self._include_type, self._chunk_type, self._chunk_type) self._upsert_edge(chunk_include_entity, self._include_type, self._chunk_type, self._vertex_type) + self._upsert_edge(chunk_next_chunk, self._next_type, self._chunk_type, self._chunk_type) self._upsert_edge(relation, self._edge_type, self._vertex_type, self._vertex_type) + def truncate(self): """Truncate Graph.""" gql = "MATCH (n) DELETE n" diff --git a/dbgpt/storage/knowledge_graph/community_summary.py b/dbgpt/storage/knowledge_graph/community_summary.py index f9462e082..258144abf 100644 --- a/dbgpt/storage/knowledge_graph/community_summary.py +++ b/dbgpt/storage/knowledge_graph/community_summary.py @@ -151,7 +151,8 @@ def _parse_chunks(slef, chunks: List[Chunk]): 'content': chunk.content, 'parent_id': None, 'parent_title':None, - 'type':'chunk' + 'type':'chunk', + 'chunk_index':chunk_index } if parent_level: @@ -172,15 +173,9 @@ def _parse_chunks(slef, chunks: List[Chunk]): if not obj['parent_id']: obj['parent_id'] = 'document' - data.append(obj) - return data - - - - async def aload_document(self, chunks: List[Chunk]) -> List[str]: """Extract and persist graph.""" @@ -192,16 +187,21 @@ async def aload_document(self, chunks: List[Chunk]) -> List[str]: data_list = self._parse_chunks(chunks) total_graph = MemoryGraph() - for data in data_list: + for index,data in enumerate(data_list): chunk_src = Vertex(f"""{data['parent_id']}""",name=data["parent_title"],vertex_type=data["type"],content=data["content"]) chunk_dst = Vertex(f"""{data["id"]}""",name=data["title"],vertex_type=data["type"],content=data["content"]) chunk_include_chunk = Edge(chunk_src.vid,chunk_dst.vid,name=f"include",edge_type="chunk_include_chunk") + chunk_next_chunk = None + if index >= 1: + chunk_next_chunk = Edge(data_list[index - 1]["id"],data_list[index]["id"],name="next",edge_type="chunk_next_chunk") if data['parent_id'] == 'document': chunk_src = Vertex(f"""{hash_id}""",name=doc_name,vertex_type='document',content=data["content"]) chunk_include_chunk = Edge(chunk_src.vid,chunk_dst.vid,name=f"include",edge_type="document_include_chunk") total_graph.upsert_vertex(chunk_src) total_graph.upsert_vertex(chunk_dst) total_graph.append_edge(chunk_include_chunk) + if chunk_next_chunk: + total_graph.append_edge(chunk_next_chunk) if os.getenv('ONLY_EXTRACT_DOCUMENT_STRUCTURE').lower() != 'true': graphs = await self._graph_extractor.extract(data["content"]) for graph in graphs: